diff --git a/pinos/client/connection.c b/pinos/client/connection.c index b5d490e62..bb38acec8 100644 --- a/pinos/client/connection.c +++ b/pinos/client/connection.c @@ -1216,10 +1216,6 @@ pinos_connection_parse_message (PinosConnection *conn, connection_parse_port_update (conn, message); break; - case PINOS_MESSAGE_PORT_STATUS_CHANGE: - pinos_log_warn ("implement iter of %d", conn->in.type); - break; - case PINOS_MESSAGE_NODE_STATE_CHANGE: if (conn->in.size < sizeof (PinosMessageNodeStateChange)) return false; @@ -1410,10 +1406,6 @@ pinos_connection_add_message (PinosConnection *conn, connection_add_port_update (conn, dest_id, message); break; - case PINOS_MESSAGE_PORT_STATUS_CHANGE: - p = connection_add_message (conn, dest_id, type, 0); - break; - case PINOS_MESSAGE_NODE_STATE_CHANGE: p = connection_add_message (conn, dest_id, type, sizeof (PinosMessageNodeStateChange)); memcpy (p, message, sizeof (PinosMessageNodeStateChange)); diff --git a/pinos/client/connection.h b/pinos/client/connection.h index fb15edb3d..21722082a 100644 --- a/pinos/client/connection.h +++ b/pinos/client/connection.h @@ -65,8 +65,6 @@ typedef enum { PINOS_MESSAGE_NODE_UPDATE, PINOS_MESSAGE_PORT_UPDATE, PINOS_MESSAGE_NODE_STATE_CHANGE, - - PINOS_MESSAGE_PORT_STATUS_CHANGE, PINOS_MESSAGE_NODE_EVENT, /* server to client */ @@ -229,8 +227,6 @@ typedef struct { SpaNodeState state; } PinosMessageNodeStateChange; -/* PINOS_MESSAGE_PORT_STATUS_CHANGE */ - /* PINOS_MESSAGE_NODE_EVENT */ typedef struct { SpaNodeEvent *event; diff --git a/pinos/client/context.c b/pinos/client/context.c index 0189335df..fbaf41db6 100644 --- a/pinos/client/context.c +++ b/pinos/client/context.c @@ -543,6 +543,9 @@ pinos_context_destroy (PinosContext *context) pinos_log_debug ("context %p: destroy", context); pinos_signal_emit (&context->destroy_signal, context); + if (context->state != PINOS_CONTEXT_STATE_UNCONNECTED) + pinos_context_disconnect (context); + spa_list_for_each_safe (stream, t1, &context->stream_list, link) pinos_stream_destroy (stream); spa_list_for_each_safe (proxy, t2, &context->proxy_list, link) diff --git a/pinos/client/introspect.c b/pinos/client/introspect.c index 8374e7eb3..4de186325 100644 --- a/pinos/client/introspect.c +++ b/pinos/client/introspect.c @@ -238,13 +238,23 @@ pinos_node_info_update (PinosNodeInfo *info, free ((void*)info->name); info->name = update->name ? strdup (update->name) : NULL; } - if (update->change_mask & (1 << 1)) { + if (update->change_mask & (1 << 1)) + info->max_inputs = update->max_inputs; + if (update->change_mask & (1 << 2)) + info->n_input_formats = update->n_input_formats; + + if (update->change_mask & (1 << 3)) + info->max_outputs = update->max_outputs; + if (update->change_mask & (1 << 4)) + info->n_output_formats = update->n_output_formats; + + if (update->change_mask & (1 << 5)) { info->state = update->state; if (info->error) free ((void*)info->error); info->error = update->error ? strdup (update->error) : NULL; } - if (update->change_mask & (1 << 2)) { + if (update->change_mask & (1 << 6)) { if (info->props) pinos_spa_dict_destroy (info->props); info->props = pinos_spa_dict_copy (update->props); diff --git a/pinos/client/introspect.h b/pinos/client/introspect.h index bd7de68b9..73e623ccc 100644 --- a/pinos/client/introspect.h +++ b/pinos/client/introspect.h @@ -255,6 +255,14 @@ struct _PinosNodeInfo { uint32_t id; uint64_t change_mask; const char *name; + unsigned int max_inputs; + unsigned int n_inputs; + unsigned int n_input_formats; + SpaFormat **input_formats; + unsigned int max_outputs; + unsigned int n_outputs; + unsigned int n_output_formats; + SpaFormat **output_formats; PinosNodeState state; const char *error; SpaDict *props; diff --git a/pinos/client/stream.c b/pinos/client/stream.c index a008301c4..780ce9827 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -96,6 +96,26 @@ typedef struct int64_t last_monotonic; } PinosStreamImpl; +static void +clear_memid (MemId *mid) +{ + if (mid->ptr != NULL) + munmap (mid->ptr, mid->size + mid->offset); + mid->ptr = NULL; + close (mid->fd); +} + +static void +clear_mems (PinosStream *stream) +{ + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + MemId *mid; + + pinos_array_for_each (mid, &impl->mem_ids) + clear_memid (mid); + impl->mem_ids.size = 0; +} + static void clear_buffers (PinosStream *stream) { @@ -104,7 +124,9 @@ clear_buffers (PinosStream *stream) pinos_array_for_each (bid, &impl->buffer_ids) { pinos_signal_emit (&stream->remove_buffer, stream, bid->id); + free (bid->buf); bid->buf = NULL; + bid->used = false; } impl->buffer_ids.size = 0; impl->in_order = true; @@ -229,13 +251,19 @@ pinos_stream_destroy (PinosStream *stream) spa_list_remove (&stream->link); + if (impl->node_proxy) + pinos_signal_remove (&impl->node_proxy_destroy); + if (impl->format) free (impl->format); if (stream->error) free (stream->error); + clear_buffers (stream); pinos_array_clear (&impl->buffer_ids); + + clear_mems (stream); pinos_array_clear (&impl->mem_ids); if (stream->properties) @@ -499,6 +527,10 @@ unhandle_socket (PinosStream *stream) pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source); impl->rtsocket_source = NULL; } + if (impl->timeout_source) { + pinos_loop_destroy_source (stream->context->loop, impl->timeout_source); + impl->timeout_source = NULL; + } } static void @@ -540,7 +572,7 @@ handle_socket (PinosStream *stream, int rtfd) impl->rtsocket_source = pinos_loop_add_io (stream->context->loop, impl->rtfd, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP, - false, + true, on_rtsocket_condition, stream); @@ -691,6 +723,7 @@ stream_dispatch_func (void *object, if (m) { pinos_log_debug ("update mem %u, fd %d, flags %d, off %zd, size %zd", p->mem_id, p->memfd, p->flags, p->offset, p->size); + clear_memid (m); } else { m = pinos_array_add (&impl->mem_ids, sizeof (MemId)); pinos_log_debug ("add mem %u, fd %d, flags %d, off %zd, size %zd", @@ -719,12 +752,11 @@ stream_dispatch_func (void *object, MemId *mid = find_mem (stream, p->buffers[i].mem_id); if (mid == NULL) { - pinos_log_warn ("unknown memory id %u", mid->id); + pinos_log_warn ("unknown memory id %u", p->buffers[i].mem_id); continue; } if (mid->ptr == NULL) { - //mid->ptr = mmap (NULL, mid->size, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, mid->offset); mid->ptr = mmap (NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, mid->fd, 0); if (mid->ptr == MAP_FAILED) { mid->ptr = NULL; @@ -794,6 +826,7 @@ stream_dispatch_func (void *object, if (p->n_buffers) { add_state_change (stream, SPA_NODE_STATE_PAUSED, false); } else { + clear_mems (stream); add_state_change (stream, SPA_NODE_STATE_READY, false); } add_async_complete (stream, p->seq, SPA_RESULT_OK, true); @@ -855,6 +888,7 @@ on_node_proxy_destroy (PinosListener *listener, impl->disconnecting = false; impl->node_proxy = NULL; + pinos_signal_remove (&impl->node_proxy_destroy); stream_set_state (this, PINOS_STREAM_STATE_UNCONNECTED, NULL); } diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index 4fbd16391..372d50bfd 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -458,9 +458,23 @@ on_remove_buffer (PinosListener *listener, GST_LOG_OBJECT (pinossrc, "remove buffer"); buf = g_hash_table_lookup (pinossrc->buf_ids, GINT_TO_POINTER (id)); - GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + if (buf) { + GList *walk; - g_hash_table_remove (pinossrc->buf_ids, GINT_TO_POINTER (id)); + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + + walk = pinossrc->queue.head; + while (walk) { + GList *next = walk->next; + + if (walk->data == buf) { + gst_buffer_unref (buf); + g_queue_delete_link (&pinossrc->queue, walk); + } + walk = next; + } + g_hash_table_remove (pinossrc->buf_ids, GINT_TO_POINTER (id)); + } } static void @@ -474,12 +488,12 @@ on_new_buffer (PinosListener *listener, SpaMetaHeader *h; guint i; - GST_LOG_OBJECT (pinossrc, "got new buffer"); buf = g_hash_table_lookup (pinossrc->buf_ids, GINT_TO_POINTER (id)); if (buf == NULL) { g_warning ("unknown buffer %d", id); return; } + GST_LOG_OBJECT (pinossrc, "got new buffer %p", buf); data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), process_mem_data_quark); @@ -500,6 +514,7 @@ on_new_buffer (PinosListener *listener, mem->offset = d->chunk->offset + data->offset; mem->size = d->chunk->size; } + gst_buffer_ref (buf); g_queue_push_tail (&pinossrc->queue, buf); pinos_thread_main_loop_signal (pinossrc->main_loop, FALSE); @@ -904,6 +919,7 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer) goto streaming_stopped; *buffer = g_queue_pop_head (&pinossrc->queue); + GST_DEBUG ("popped buffer %p", *buffer); if (*buffer != NULL) break; @@ -1074,6 +1090,8 @@ gst_pinos_src_close (GstPinosSrc * pinossrc) pinos_thread_main_loop_stop (pinossrc->main_loop); + g_hash_table_remove_all (pinossrc->buf_ids); + pinos_stream_destroy (pinossrc->stream); pinossrc->stream = NULL; diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index c6d14bf8f..d53d293ee 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -803,50 +803,6 @@ spa_proxy_node_port_alloc_buffers (SpaNode *node, return SPA_RESULT_NOT_IMPLEMENTED; } -#if 0 -static void -copy_meta_in (SpaProxy *this, SpaProxyPort *port, uint32_t buffer_id) -{ - ProxyBuffer *b = &port->buffers[buffer_id]; - unsigned int i; - - for (i = 0; i < b->outbuf->n_metas; i++) { - SpaMeta *sm = &b->buffer.metas[i]; - SpaMeta *dm = &b->outbuf->metas[i]; - memcpy (dm->data, sm->data, dm->size); - } - for (i = 0; i < b->outbuf->n_datas; i++) { - b->outbuf->datas[i].size = b->buffer.datas[i].size; - if (b->outbuf->datas[i].type == SPA_DATA_TYPE_MEMPTR) { - spa_log_info (this->log, "memcpy in %zd", b->buffer.datas[i].size); - memcpy (b->outbuf->datas[i].data, b->datas[i].data, b->buffer.datas[i].size); - } - } -} -#endif - -#if 0 -static void -copy_meta_out (SpaProxy *this, SpaProxyPort *port, uint32_t buffer_id) -{ - ProxyBuffer *b = &port->buffers[buffer_id]; - unsigned int i; - - for (i = 0; i < b->outbuf->n_metas; i++) { - SpaMeta *sm = &b->outbuf->metas[i]; - SpaMeta *dm = &b->buffer.metas[i]; - memcpy (dm->data, sm->data, dm->size); - } - for (i = 0; i < b->outbuf->n_datas; i++) { - b->buffer.datas[i].size = b->outbuf->datas[i].size; - if (b->datas[i].type == SPA_DATA_TYPE_MEMPTR) { - spa_log_info (this->log, "memcpy out %zd", b->outbuf->datas[i].size); - memcpy (b->datas[i].data, b->outbuf->datas[i].data, b->outbuf->datas[i].size); - } - } -} -#endif - static SpaResult spa_proxy_node_port_reuse_buffer (SpaNode *node, uint32_t port_id, @@ -1011,12 +967,6 @@ client_node_dispatch_func (void *object, break; } - case PINOS_MESSAGE_PORT_STATUS_CHANGE: - { - spa_log_warn (this->log, "proxy %p: command not implemented %d", this, type); - break; - } - case PINOS_MESSAGE_NODE_STATE_CHANGE: { PinosMessageNodeStateChange *sc = message; diff --git a/pinos/server/node.c b/pinos/server/node.c index 642e92d1a..b5b7c45d6 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -277,8 +277,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) int i; bool processed = false; -// pinos_log_debug ("node %p: need input", this); - for (i = 0; i < this->transport->area->n_inputs; i++) { PinosLink *link; PinosPort *inport, *outport; @@ -319,8 +317,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) int i; bool processed = false; -// pinos_log_debug ("node %p: have output", this); - for (i = 0; i < this->transport->area->n_outputs; i++) { PinosLink *link; PinosPort *inport, *outport; @@ -346,6 +342,12 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) if ((res = spa_node_process_input (inport->node->node)) < 0) pinos_log_warn ("node %p: got process input %d", inport->node, res); } + + if ((res = spa_node_port_reuse_buffer (this->node, + outport->port_id, + po->buffer_id)) < 0) + pinos_log_warn ("node %p: error reuse buffer: %d", this, res); + po->buffer_id = SPA_ID_INVALID; } if (processed) { @@ -355,25 +357,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) break; } case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: - { - SpaResult res; - SpaNodeEventReuseBuffer *rb = (SpaNodeEventReuseBuffer *) event; - PinosPort *port = this->input_port_map[rb->port_id]; - PinosLink *link; - -// pinos_log_debug ("node %p: reuse buffer %u", this, rb->buffer_id); - - spa_list_for_each (link, &port->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - if ((res = spa_node_port_reuse_buffer (link->rt.output->node->node, - link->rt.output->port_id, - rb->buffer_id)) < 0) - pinos_log_warn ("node %p: error reuse buffer: %d", node, res); - } break; - } + case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: send_clock_update (this); break; @@ -436,6 +421,10 @@ node_bind_func (PinosGlobal *global, info.id = global->id; info.change_mask = ~0; info.name = this->name; + info.max_inputs = this->transport->area->max_inputs; + info.n_inputs = this->transport->area->n_inputs; + info.max_outputs = this->transport->area->max_outputs; + info.n_outputs = this->transport->area->n_outputs; info.state = this->state; info.error = this->error; info.props = this->properties ? &this->properties->dict : NULL; diff --git a/pinos/server/node.h b/pinos/server/node.h index 535abba13..1fd0ea8f0 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -50,8 +50,6 @@ struct _PinosNode { SpaList link; PinosGlobal *global; - bool unlinking; - char *name; PinosProperties *properties; PinosNodeState state; diff --git a/pinos/server/port.c b/pinos/server/port.c index 6c2f7c72d..fd5d1bd42 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -282,11 +282,6 @@ do_remove_link (SpaLoop *loop, link->rt.output = NULL; } -#if 0 - if (spa_list_is_empty (&port->rt.links)) - pinos_port_pause (port); -#endif - res = pinos_loop_invoke (this->core->main_loop->loop, do_remove_link_done, seq, diff --git a/pinos/tools/pinos-monitor.c b/pinos/tools/pinos-monitor.c index 97e39bbe8..46014799b 100644 --- a/pinos/tools/pinos-monitor.c +++ b/pinos/tools/pinos-monitor.c @@ -111,12 +111,14 @@ dump_node_info (PinosContext *c, printf ("\ttype: %s\n", PINOS_NODE_URI); if (data->print_all) { printf ("%c\tname: \"%s\"\n", MARK_CHANGE (0), info->name); - printf ("%c\tstate: \"%s\"", MARK_CHANGE (1), pinos_node_state_as_string (info->state)); + printf ("%c\tinputs: %u/%u\n", MARK_CHANGE (1), info->n_inputs, info->max_inputs); + printf ("%c\toutputs: %u/%u\n", MARK_CHANGE (2), info->n_outputs, info->max_outputs); + printf ("%c\tstate: \"%s\"", MARK_CHANGE (3), pinos_node_state_as_string (info->state)); if (info->state == PINOS_NODE_STATE_ERROR && info->error) printf (" \"%s\"\n", info->error); else printf ("\n"); - print_properties (info->props, MARK_CHANGE (2)); + print_properties (info->props, MARK_CHANGE (4)); } } diff --git a/spa/tools/spa-inspect.c b/spa/tools/spa-inspect.c index f727b2ce0..efbdedd5d 100644 --- a/spa/tools/spa-inspect.c +++ b/spa/tools/spa-inspect.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -37,10 +38,11 @@ typedef struct { typedef struct { URI uri; - SpaSupport support[2]; + SpaSupport support[4]; unsigned int n_support; SpaIDMap *map; SpaLog *log; + SpaLoop loop; } AppData; static void @@ -154,6 +156,22 @@ inspect_factory (AppData *data, const SpaHandleFactory *factory) } } +static SpaResult +do_add_source (SpaLoop *loop, + SpaSource *source) +{ + return SPA_RESULT_OK; +} +static SpaResult +do_update_source (SpaSource *source) +{ + return SPA_RESULT_OK; +} +static void +do_remove_source (SpaSource *source) +{ +} + int main (int argc, char *argv[]) { @@ -170,12 +188,20 @@ main (int argc, char *argv[]) data.map = spa_id_map_get_default(); data.log = NULL; + data.loop.size = sizeof (SpaLoop); + data.loop.add_source = do_add_source; + data.loop.update_source = do_update_source; + data.loop.remove_source = do_remove_source; data.support[0].uri = SPA_ID_MAP_URI; data.support[0].data = data.map; data.support[1].uri = SPA_LOG_URI; data.support[1].data = data.log; - data.n_support = 2; + data.support[2].uri = SPA_LOOP__MainLoop; + data.support[2].data = &data.loop; + data.support[3].uri = SPA_LOOP__DataLoop; + data.support[3].data = &data.loop; + data.n_support = 4; data.uri.node = spa_id_map_get_id (data.map, SPA_NODE_URI); data.uri.clock = spa_id_map_get_id (data.map, SPA_CLOCK_URI);