diff --git a/pinos/client/introspect.h b/pinos/client/introspect.h index ca6429e3f..bad88de0c 100644 --- a/pinos/client/introspect.h +++ b/pinos/client/introspect.h @@ -28,6 +28,7 @@ G_BEGIN_DECLS /** * PinosNodeState: * @PINOS_NODE_STATE_ERROR: the node is in error + * @PINOS_NODE_STATE_CREATING: the node is being created * @PINOS_NODE_STATE_SUSPENDED: the node is suspended, the device might * be closed * @PINOS_NODE_STATE_INITIALIZING: the node is initializing, the device is @@ -40,10 +41,11 @@ G_BEGIN_DECLS */ typedef enum { PINOS_NODE_STATE_ERROR = -1, - PINOS_NODE_STATE_SUSPENDED = 0, - PINOS_NODE_STATE_INITIALIZING = 1, - PINOS_NODE_STATE_IDLE = 2, - PINOS_NODE_STATE_RUNNING = 3, + PINOS_NODE_STATE_CREATING = 0, + PINOS_NODE_STATE_SUSPENDED = 1, + PINOS_NODE_STATE_INITIALIZING = 2, + PINOS_NODE_STATE_IDLE = 3, + PINOS_NODE_STATE_RUNNING = 4, } PinosNodeState; const gchar * pinos_node_state_as_string (PinosNodeState state); diff --git a/pinos/client/ringbuffer.c b/pinos/client/ringbuffer.c index fc965836a..655c0bb3f 100644 --- a/pinos/client/ringbuffer.c +++ b/pinos/client/ringbuffer.c @@ -177,7 +177,7 @@ pinos_ringbuffer_constructed (GObject * obj) } priv->data = mmap (NULL, priv->fdsize, PROT_READ | PROT_WRITE, MAP_SHARED, priv->fd, 0); - spa_ringbuffer_init (&priv->data->rbuf, (guint8 *)priv->data + sizeof (PinosRingbufferData), priv->size); + spa_ringbuffer_init (&priv->data->rbuf, priv->size); G_OBJECT_CLASS (pinos_ringbuffer_parent_class)->constructed (obj); } diff --git a/pinos/client/stream.c b/pinos/client/stream.c index bc275bcda..d36edde9c 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -833,8 +833,6 @@ handle_node_event (PinosStream *stream, switch (event->type) { case SPA_NODE_EVENT_TYPE_INVALID: - case SPA_NODE_EVENT_TYPE_PORT_ADDED: - case SPA_NODE_EVENT_TYPE_PORT_REMOVED: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: @@ -974,7 +972,6 @@ parse_control (PinosStream *stream, switch (cmd) { case SPA_CONTROL_CMD_NODE_UPDATE: case SPA_CONTROL_CMD_PORT_UPDATE: - case SPA_CONTROL_CMD_PORT_REMOVED: case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: case SPA_CONTROL_CMD_NODE_STATE_CHANGE: g_warning ("got unexpected control %d", cmd); @@ -1472,21 +1469,7 @@ pinos_stream_finish_format (PinosStream *stream, static gboolean do_start (PinosStream *stream) { - PinosStreamPrivate *priv = stream->priv; - SpaControlBuilder builder; - SpaControl control; - - control_builder_init (stream, &builder); - add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: failed to write control", stream); - - spa_control_clear (&control); - g_object_unref (stream); - return FALSE; } diff --git a/pinos/server/daemon.c b/pinos/server/daemon.c index 9d8bf10e3..1dbcf0986 100644 --- a/pinos/server/daemon.c +++ b/pinos/server/daemon.c @@ -83,13 +83,14 @@ handle_client_vanished (PinosClient *client, gpointer user_data) static PinosClient * sender_get_client (PinosDaemon *daemon, - const gchar *sender) + const gchar *sender, + gboolean create) { PinosDaemonPrivate *priv = daemon->priv; PinosClient *client; client = g_hash_table_lookup (priv->clients, sender); - if (client == NULL) { + if (client == NULL && create) { client = pinos_client_new (daemon, sender, NULL); g_debug ("daemon %p: new client %p for %s", daemon, client, sender); @@ -157,7 +158,7 @@ handle_create_node (PinosDaemon1 *interface, if (node == NULL) goto no_node; - client = sender_get_client (daemon, sender); + client = sender_get_client (daemon, sender, TRUE); pinos_client_add_object (client, G_OBJECT (node)); g_signal_connect (node, @@ -205,6 +206,7 @@ on_link_state_notify (GObject *obj, } break; case PINOS_LINK_STATE_UNLINKED: + g_warning ("link %p: unlinked", link); break; case PINOS_LINK_STATE_INIT: case PINOS_LINK_STATE_NEGOTIATING: @@ -216,21 +218,20 @@ on_link_state_notify (GObject *obj, } static void -on_port_added (PinosNode *node, PinosDirection direction, uint32_t port_id, PinosClient *client) +on_port_added (PinosNode *node, PinosDirection direction, PinosDaemon *this) { - PinosDaemon *this; + PinosClient *client; PinosProperties *props; PinosNode *target; const gchar *path; GError *error = NULL; PinosLink *link; - this = pinos_node_get_daemon (node); - props = pinos_node_get_properties (node); path = pinos_properties_get (props, "pinos.target.node"); if (path) { + const gchar *sender; guint target_port; guint node_port; @@ -269,9 +270,12 @@ on_port_added (PinosNode *node, PinosDirection direction, uint32_t port_id, Pino if (link == NULL) goto error; - pinos_client_add_object (client, G_OBJECT (link)); + sender = pinos_node_get_sender (node); + if (sender && (client = sender_get_client (this, sender, FALSE))) { + pinos_client_add_object (client, G_OBJECT (link)); + } - g_signal_connect (link, "notify::state", (GCallback) on_link_state_notify, client); + g_signal_connect (link, "notify::state", (GCallback) on_link_state_notify, daemon); pinos_link_activate (link); g_object_unref (link); @@ -286,10 +290,60 @@ error: } static void -on_port_removed (PinosNode *node, uint32_t port_id, PinosClient *client) +on_port_removed (PinosNode *node, PinosDaemon *daemon) { } +static void +handle_node_connections (PinosDaemon *daemon, + PinosNode *node) +{ + PinosDirection direction; + + direction = node->have_inputs ? PINOS_DIRECTION_INPUT : PINOS_DIRECTION_OUTPUT; + + on_port_added (node, direction, daemon); +} + +static void +on_node_state_change (PinosNode *node, + PinosNodeState old, + PinosNodeState state, + PinosDaemon *daemon) +{ + g_debug ("daemon %p: node %p state change %s -> %s", daemon, node, + pinos_node_state_as_string (old), + pinos_node_state_as_string (state)); + + if (old == PINOS_NODE_STATE_CREATING && state == PINOS_NODE_STATE_SUSPENDED) + handle_node_connections (daemon, node); +} + +static void +on_node_added (PinosDaemon *daemon, PinosNode *node) +{ + PinosNodeState state; + + g_debug ("daemon %p: node %p added", daemon, node); + + g_signal_connect (node, "state-change", (GCallback) on_node_state_change, daemon); + + state = pinos_node_get_state (node); + if (state > PINOS_NODE_STATE_CREATING) { + handle_node_connections (daemon, node); + } + g_signal_connect (node, "port-added", (GCallback) on_port_added, daemon); + g_signal_connect (node, "port-removed", (GCallback) on_port_removed, daemon); +} + +static void +on_node_removed (PinosDaemon *daemon, PinosNode *node) +{ + g_debug ("daemon %p: node %p removed", daemon, node); + + g_signal_handlers_disconnect_by_data (node, daemon); +} + static gboolean handle_create_client_node (PinosDaemon1 *interface, GDBusMethodInvocation *invocation, @@ -322,12 +376,9 @@ handle_create_client_node (PinosDaemon1 *interface, if (socket == NULL) goto no_socket; - client = sender_get_client (daemon, sender); + client = sender_get_client (daemon, sender, TRUE); pinos_client_add_object (client, G_OBJECT (node)); - g_signal_connect (node, "port-added", (GCallback) on_port_added, client); - g_signal_connect (node, "port-removed", (GCallback) on_port_removed, client); - object_path = pinos_node_get_object_path (PINOS_NODE (node)); g_debug ("daemon %p: add client-node %p, %s", daemon, node, object_path); g_object_unref (node); @@ -544,6 +595,7 @@ pinos_daemon_add_node (PinosDaemon *daemon, priv = daemon->priv; priv->nodes = g_list_prepend (priv->nodes, node); + on_node_added (daemon, node); } /** @@ -563,6 +615,7 @@ pinos_daemon_remove_node (PinosDaemon *daemon, g_return_if_fail (PINOS_IS_NODE (node)); priv = daemon->priv; + on_node_removed (daemon, node); priv->nodes = g_list_remove (priv->nodes, node); } @@ -604,12 +657,12 @@ pinos_daemon_find_node (PinosDaemon *daemon, g_debug ("node path \"%s\"", pinos_node_get_object_path (n)); - if (!g_str_has_suffix (pinos_node_get_object_path (n), name)) - continue; + if (have_name && g_str_has_suffix (pinos_node_get_object_path (n), name)) { + g_debug ("name \"%s\" matches node %p", name, n); + best = n; + break; + } - g_debug ("name \"%s\" matches node %p", name, n); - best = n; - break; } if (best == NULL) { g_set_error (error, diff --git a/pinos/server/link.c b/pinos/server/link.c index 3ead648e1..46ecd23d2 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -232,8 +232,10 @@ pinos_link_update_state (PinosLink *link, PinosLinkState state) if (state != priv->state) { g_clear_error (&priv->error); + g_debug ("link %p: update state %s -> %s", link, + pinos_link_state_as_string (priv->state), + pinos_link_state_as_string (state)); priv->state = state; - g_debug ("link %p: got state %s", link, pinos_link_state_as_string (state)); g_object_notify (G_OBJECT (link), "state"); } } @@ -280,6 +282,7 @@ again: goto error; } } + g_debug ("Try filter:"); spa_debug_format (filter); if ((res = spa_node_port_enum_formats (this->output_node->node, @@ -297,6 +300,7 @@ again: "error output enum formats: %d", res); goto error; } + g_debug ("Got filtered:"); spa_debug_format (format); spa_format_fixate (format); } else if (in_state == SPA_NODE_STATE_CONFIGURE && out_state > SPA_NODE_STATE_CONFIGURE) { @@ -543,17 +547,17 @@ do_start (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) if (in_state < SPA_NODE_STATE_PAUSED || out_state < SPA_NODE_STATE_PAUSED) return SPA_RESULT_OK; + else if (in_state == SPA_NODE_STATE_STREAMING && out_state == SPA_NODE_STATE_STREAMING) { + pinos_link_update_state (this, PINOS_LINK_STATE_RUNNING); + } else { + pinos_link_update_state (this, PINOS_LINK_STATE_PAUSED); - pinos_link_update_state (this, PINOS_LINK_STATE_PAUSED); - - if (in_state == SPA_NODE_STATE_PAUSED) - pinos_node_set_state (this->input_node, PINOS_NODE_STATE_RUNNING); - - if (out_state == SPA_NODE_STATE_PAUSED) - pinos_node_set_state (this->output_node, PINOS_NODE_STATE_RUNNING); - - pinos_link_update_state (this, PINOS_LINK_STATE_RUNNING); + if (in_state == SPA_NODE_STATE_PAUSED) + pinos_node_set_state (this->input_node, PINOS_NODE_STATE_RUNNING); + if (out_state == SPA_NODE_STATE_PAUSED) + pinos_node_set_state (this->output_node, PINOS_NODE_STATE_RUNNING); + } return res; } @@ -594,13 +598,12 @@ do_check_states (PinosLink *this) } static void -on_node_state_notify (GObject *obj, - GParamSpec *pspec, - gpointer user_data) +on_async_complete_notify (PinosNode *node, + guint seq, + guint res, + PinosLink *this) { - PinosLink *this = user_data; - - g_debug ("link %p: node %p state change", this, obj); + g_debug ("link %p: node %p async complete %d %d", this, node, seq, res); g_idle_add ((GSourceFunc) do_check_states, this); } @@ -646,8 +649,8 @@ pinos_link_constructed (GObject * object) g_signal_connect (this->input_node, "remove", (GCallback) on_node_remove, this); g_signal_connect (this->output_node, "remove", (GCallback) on_node_remove, this); - g_signal_connect (this->input_node, "notify::node-state", (GCallback) on_node_state_notify, this); - g_signal_connect (this->output_node, "notify::node-state", (GCallback) on_node_state_notify, this); + g_signal_connect (this->input_node, "async-complete", (GCallback) on_async_complete_notify, this); + g_signal_connect (this->output_node, "async-complete", (GCallback) on_async_complete_notify, this); g_signal_connect (this, "notify", (GCallback) on_property_notify, this); @@ -891,6 +894,7 @@ pinos_link_activate (PinosLink *this) { g_return_val_if_fail (PINOS_IS_LINK (this), FALSE); + spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue)); check_states (this); return TRUE; @@ -899,5 +903,6 @@ pinos_link_activate (PinosLink *this) gboolean pinos_link_deactivate (PinosLink *this) { + spa_ringbuffer_clear (&this->ringbuffer); return TRUE; } diff --git a/pinos/server/link.h b/pinos/server/link.h index 5e954e78d..9ed6a8f10 100644 --- a/pinos/server/link.h +++ b/pinos/server/link.h @@ -29,6 +29,7 @@ typedef struct _PinosLinkClass PinosLinkClass; typedef struct _PinosLinkPrivate PinosLinkPrivate; #include +#include #define PINOS_TYPE_LINK (pinos_link_get_type ()) #define PINOS_IS_LINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), PINOS_TYPE_LINK)) @@ -54,6 +55,10 @@ struct _PinosLink { guint input_id; uint32_t input_port; + uint32_t queue[16]; + SpaRingbuffer ringbuffer; + gint in_ready; + PinosLinkPrivate *priv; }; diff --git a/pinos/server/node.c b/pinos/server/node.c index f07dcd45d..b99ee4aa3 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -50,6 +50,7 @@ struct _PinosNodePrivate gchar *object_path; gchar *name; + gboolean async_init; unsigned int max_input_ports; unsigned int max_output_ports; unsigned int n_input_ports; @@ -77,6 +78,8 @@ struct _PinosNodePrivate guint n_used_output_links; GArray *input_links; guint n_used_input_links; + + SpaNodeEventAsyncComplete ac; }; G_DEFINE_TYPE (PinosNode, pinos_node, G_TYPE_OBJECT); @@ -88,48 +91,54 @@ enum PROP_SENDER, PROP_OBJECT_PATH, PROP_NAME, - PROP_STATE, PROP_PROPERTIES, PROP_NODE, - PROP_NODE_STATE, }; enum { SIGNAL_REMOVE, + SIGNAL_STATE_CHANGE, SIGNAL_PORT_ADDED, SIGNAL_PORT_REMOVED, + SIGNAL_ASYNC_COMPLETE, LAST_SIGNAL }; +static void init_complete (PinosNode *this); + static guint signals[LAST_SIGNAL] = { 0 }; -static PinosDirection -get_port_direction (PinosNode *node, guint id) -{ - PinosNodePrivate *priv = node->priv; - PinosDirection direction; - - direction = id < priv->max_input_ports ? PINOS_DIRECTION_INPUT : PINOS_DIRECTION_OUTPUT; - - return direction; -} - static void update_port_ids (PinosNode *node, gboolean create) { PinosNodePrivate *priv = node->priv; + uint32_t *in_ports, *out_ports; + guint n_input_ports, n_output_ports; + guint i, j; if (node->node == NULL) return; + n_input_ports = priv->n_input_ports; + n_output_ports = priv->n_output_ports; + + in_ports = g_alloca (sizeof (uint32_t) * n_input_ports); + out_ports = g_alloca (sizeof (uint32_t) * n_output_ports); + memcpy (in_ports, priv->input_port_ids, sizeof (uint32_t) * n_input_ports); + memcpy (out_ports, priv->output_port_ids, sizeof (uint32_t) * n_output_ports); + spa_node_get_n_ports (node->node, &priv->n_input_ports, &priv->max_input_ports, &priv->n_output_ports, &priv->max_output_ports); - g_debug ("node %p: update_port ids %u, %u", node, priv->max_input_ports, priv->max_output_ports); + node->have_inputs = priv->n_input_ports > 0; + node->have_outputs = priv->n_output_ports > 0; + + g_debug ("node %p: update_port ids %u/%u, %u/%u", node, + priv->n_input_ports, priv->max_input_ports, priv->n_output_ports, priv->max_output_ports); priv->input_port_ids = g_realloc_n (priv->input_port_ids, priv->max_input_ports, sizeof (uint32_t)); priv->output_port_ids = g_realloc_n (priv->output_port_ids, priv->max_output_ports, sizeof (uint32_t)); @@ -139,6 +148,45 @@ update_port_ids (PinosNode *node, gboolean create) priv->input_port_ids, priv->max_output_ports, priv->output_port_ids); + + i = j = 0; + while (true) { + if (i < priv->n_input_ports && j < n_input_ports && priv->input_port_ids[i] == in_ports[j]) { + i++; + j++; + } else if ((i < priv->n_input_ports && j < n_input_ports && + priv->input_port_ids[i] < in_ports[j]) || i < priv->n_input_ports) { + g_debug ("node %p: input port added %d", node, priv->input_port_ids[i]); + if (!priv->async_init) + g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, PINOS_DIRECTION_INPUT); + i++; + } else if (j < n_input_ports) { + g_debug ("node %p: input port removed %d", node, in_ports[j]); + if (!priv->async_init) + g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, PINOS_DIRECTION_INPUT); + j++; + } else + break; + } + i = j = 0; + while (true) { + if (i < priv->n_output_ports && j < n_output_ports && priv->output_port_ids[i] == out_ports[j]) { + i++; + j++; + } else if ((i < priv->n_output_ports && j < n_output_ports && + priv->output_port_ids[i] < out_ports[j]) || i < priv->n_output_ports) { + g_debug ("node %p: output port added %d", node, priv->output_port_ids[i]); + if (!priv->async_init) + g_signal_emit (node, signals[SIGNAL_PORT_ADDED], 0, PINOS_DIRECTION_OUTPUT); + i++; + } else if (j < n_output_ports) { + g_debug ("node %p: output port removed %d", node, out_ports[j]); + if (!priv->async_init) + g_signal_emit (node, signals[SIGNAL_PORT_REMOVED], 0, PINOS_DIRECTION_OUTPUT); + j++; + } else + break; + } } static void * @@ -261,18 +309,20 @@ start_thread (PinosNode *this) } static void -stop_thread (PinosNode *this) +stop_thread (PinosNode *this, gboolean in_thread) { PinosNodePrivate *priv = this->priv; if (priv->running) { priv->running = false; - wakeup_thread (this); - pthread_join (priv->thread, NULL); + if (!in_thread) { + wakeup_thread (this); + pthread_join (priv->thread, NULL); + } } } -static void +static SpaResult pause_node (PinosNode *this) { SpaResult res; @@ -285,9 +335,11 @@ pause_node (PinosNode *this) cmd.size = 0; if ((res = spa_node_send_command (this->node, &cmd)) < 0) g_debug ("got error %d", res); + + return res; } -static void +static SpaResult start_node (PinosNode *this) { SpaResult res; @@ -300,9 +352,11 @@ start_node (PinosNode *this) cmd.size = 0; if ((res = spa_node_send_command (this->node, &cmd)) < 0) g_debug ("got error %d", res); + + return res; } -static void +static SpaResult suspend_node (PinosNode *this) { SpaResult res; @@ -311,6 +365,8 @@ suspend_node (PinosNode *this) if ((res = spa_node_port_set_format (this->node, 0, 0, NULL)) < 0) g_warning ("error unset format output: %d", res); + + return res; } static void @@ -348,53 +404,85 @@ static gboolean node_set_state (PinosNode *this, PinosNodeState state) { + SpaResult res = SPA_RESULT_OK; + g_debug ("node %p: set state %s", this, pinos_node_state_as_string (state)); switch (state) { + case PINOS_NODE_STATE_CREATING: + return FALSE; + case PINOS_NODE_STATE_SUSPENDED: - suspend_node (this); + res = suspend_node (this); break; case PINOS_NODE_STATE_INITIALIZING: break; case PINOS_NODE_STATE_IDLE: - pause_node (this); + res = pause_node (this); break; case PINOS_NODE_STATE_RUNNING: send_clock_update (this); - start_node (this); + res = start_node (this); break; case PINOS_NODE_STATE_ERROR: break; } + if (SPA_RESULT_IS_ERROR (res)) + return FALSE; + pinos_node_update_state (this, state); + return TRUE; } - -typedef struct { - PinosNode *node; - PinosDirection direction; - guint port_id; -} PortData; - static gboolean -do_signal_port_added (PortData *data) +do_read_link (PinosNode *this, PinosLink *link) { - g_signal_emit (data->node, signals[SIGNAL_PORT_ADDED], 0, data->direction, data->port_id); - g_slice_free (PortData, data); - return FALSE; + SpaRingbufferArea areas[2]; + SpaResult res; + gboolean pushed = FALSE; + + spa_ringbuffer_get_read_areas (&link->ringbuffer, areas); + + if (areas[0].len > 0) { + SpaPortInputInfo iinfo[1]; + + if (link->in_ready <= 0) + return FALSE; + + link->in_ready--; + + iinfo[0].port_id = link->input_port; + iinfo[0].buffer_id = link->queue[areas[0].offset]; + iinfo[0].flags = SPA_PORT_INPUT_FLAG_NONE; + + if ((res = spa_node_port_push_input (link->input_node->node, 1, iinfo)) < 0) + g_warning ("node %p: error pushing buffer: %d, %d", this, res, iinfo[0].status); + else + pushed = TRUE; + + spa_ringbuffer_read_advance (&link->ringbuffer, 1); + } + return pushed; } -static gboolean -do_signal_port_removed (PortData *data) +static void +do_handle_async_complete (PinosNode *this) { - g_signal_emit (data->node, signals[SIGNAL_PORT_REMOVED], 0, data->port_id); - g_slice_free (PortData, data); - return FALSE; + PinosNodePrivate *priv = this->priv; + SpaNodeEventAsyncComplete *ac = &priv->ac; + + g_debug ("node %p: async complete %u %d", this, ac->seq, ac->res); + + if (priv->async_init) { + init_complete (this); + priv->async_init = FALSE; + } + g_signal_emit (this, signals[SIGNAL_ASYNC_COMPLETE], 0, ac->seq, ac->res); } static void @@ -402,6 +490,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) { PinosNode *this = user_data; PinosNodePrivate *priv = this->priv; + gboolean in_thread = pthread_equal (priv->thread, pthread_self()); switch (event->type) { case SPA_NODE_EVENT_TYPE_INVALID: @@ -415,39 +504,14 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: { SpaNodeEventAsyncComplete *ac = event->data; - g_debug ("async complete %u %d", ac->seq, ac->res); - if (SPA_RESULT_IS_OK (ac->res)) - g_object_notify (G_OBJECT (this), "node-state"); + + priv->ac = *ac; + g_main_context_invoke (NULL, + (GSourceFunc) do_handle_async_complete, + this); break; } - case SPA_NODE_EVENT_TYPE_PORT_ADDED: - { - SpaNodeEventPortAdded *pa = event->data; - PortData *data; - - update_port_ids (this, FALSE); - - data = g_slice_new (PortData); - data->node = this; - data->direction = get_port_direction (this, pa->port_id); - data->port_id = pa->port_id; - g_main_context_invoke (NULL, (GSourceFunc) do_signal_port_added, data); - break; - } - case SPA_NODE_EVENT_TYPE_PORT_REMOVED: - { - SpaNodeEventPortRemoved *pr = event->data; - PortData *data; - - update_port_ids (this, FALSE); - - data = g_slice_new (PortData); - data->node = this; - data->port_id = pr->port_id; - g_main_context_invoke (NULL, (GSourceFunc) do_signal_port_removed, data); - break; - } case SPA_NODE_EVENT_TYPE_ADD_POLL: { SpaPollItem *poll = event->data; @@ -457,9 +521,11 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) priv->n_poll++; if (poll->n_fds) priv->rebuild_fds = true; - wakeup_thread (this); - start_thread (this); + if (!in_thread) { + wakeup_thread (this); + start_thread (this); + } break; } case SPA_NODE_EVENT_TYPE_UPDATE_POLL: @@ -473,7 +539,9 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } if (poll->n_fds) priv->rebuild_fds = true; - wakeup_thread (this); + + if (!in_thread) + wakeup_thread (this); break; } case SPA_NODE_EVENT_TYPE_REMOVE_POLL: @@ -492,14 +560,28 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) } if (priv->n_poll > 0) { priv->rebuild_fds = true; - wakeup_thread (this); + if (!in_thread) + wakeup_thread (this); } else { - stop_thread (this); + stop_thread (this, in_thread); } break; } case SPA_NODE_EVENT_TYPE_NEED_INPUT: { + guint i; + SpaNodeEventNeedInput *ni = event->data; + + for (i = 0; i < priv->input_links->len; i++) { + NodeLink *link = &g_array_index (priv->input_links, NodeLink, i); + PinosLink *pl = link->link; + + if (pl == NULL || pl->input_node->node != node || pl->input_port != ni->port_id) + continue; + + pl->in_ready++; + do_read_link (this, pl); + } break; } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: @@ -508,6 +590,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) SpaPortOutputInfo oinfo[1] = { 0, }; SpaResult res; guint i; + gboolean pushed = FALSE; oinfo[0].port_id = ho->port_id; @@ -519,23 +602,23 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) for (i = 0; i < priv->output_links->len; i++) { NodeLink *link = &g_array_index (priv->output_links, NodeLink, i); PinosLink *pl = link->link; - SpaPortInputInfo iinfo[1]; + SpaRingbufferArea areas[2]; if (pl == NULL || pl->output_node->node != node || pl->output_port != oinfo[0].port_id) continue; - if (pl->input_node->node->state != SPA_NODE_STATE_STREAMING) { - if ((res = spa_node_port_reuse_buffer (node, oinfo[0].port_id, oinfo[0].buffer_id)) < 0) - g_warning ("node %p: error reuse buffer: %d", node, res); - continue; + spa_ringbuffer_get_write_areas (&pl->ringbuffer, areas); + if (areas[0].len > 0) { + pl->queue[areas[0].offset] = oinfo[0].buffer_id; + spa_ringbuffer_write_advance (&pl->ringbuffer, 1); + + pushed = do_read_link (this, pl); } - - iinfo[0].port_id = pl->input_port; - iinfo[0].buffer_id = oinfo[0].buffer_id; - iinfo[0].flags = SPA_PORT_INPUT_FLAG_NONE; - - if ((res = spa_node_port_push_input (pl->input_node->node, 1, iinfo)) < 0) - g_warning ("node %p: error pushing buffer: %d, %d", this, res, iinfo[0].status); + } + if (!pushed) { + g_debug ("node %p: discarded buffer %u", this, oinfo[0].buffer_id); + if ((res = spa_node_port_reuse_buffer (node, oinfo[0].port_id, oinfo[0].buffer_id)) < 0) + g_warning ("node %p: error reuse buffer: %d", node, res); } break; } @@ -606,10 +689,6 @@ pinos_node_get_property (GObject *_object, g_value_set_string (value, priv->name); break; - case PROP_STATE: - g_value_set_enum (value, priv->state); - break; - case PROP_PROPERTIES: g_value_set_boxed (value, priv->properties); break; @@ -618,10 +697,6 @@ pinos_node_get_property (GObject *_object, g_value_set_pointer (value, node->node); break; - case PROP_NODE_STATE: - g_value_set_uint (value, node->node->state); - break; - default: G_OBJECT_WARN_INVALID_PROPERTY_ID (node, prop_id, pspec); break; @@ -722,6 +797,18 @@ on_property_notify (GObject *obj, } } +static void +init_complete (PinosNode *this) +{ + PinosNodePrivate *priv = this->priv; + + update_port_ids (this, FALSE); + g_debug ("node %p: init completed", this); + priv->async_init = FALSE; + on_property_notify (G_OBJECT (this), NULL, this); + pinos_node_update_state (this, PINOS_NODE_STATE_SUSPENDED); +} + static void pinos_node_constructed (GObject * obj) { @@ -754,13 +841,14 @@ pinos_node_constructed (GObject * obj) if ((res = spa_node_set_event_callback (this->node, on_node_event, this)) < 0) g_warning ("node %p: error setting callback", this); - update_port_ids (this, TRUE); - if (priv->sender == NULL) priv->sender = g_strdup (pinos_daemon_get_sender (priv->daemon)); - on_property_notify (G_OBJECT (this), NULL, this); - + if (this->node->state > SPA_NODE_STATE_INIT) { + init_complete (this); + } else { + priv->async_init = TRUE; + } node_register_object (this); } @@ -772,7 +860,7 @@ pinos_node_dispose (GObject * obj) g_debug ("node %p: dispose", node); pinos_node_set_state (node, PINOS_NODE_STATE_SUSPENDED); - stop_thread (node); + stop_thread (node, FALSE); node_unregister_object (node); @@ -855,16 +943,6 @@ pinos_node_class_init (PinosNodeClass * klass) G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, - PROP_STATE, - g_param_spec_enum ("state", - "State", - "The state of the node", - PINOS_TYPE_NODE_STATE, - PINOS_NODE_STATE_SUSPENDED, - G_PARAM_READABLE | - G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_PROPERTIES, g_param_spec_boxed ("properties", @@ -884,17 +962,6 @@ pinos_node_class_init (PinosNodeClass * klass) G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, - PROP_NODE_STATE, - g_param_spec_uint ("node-state", - "Node State", - "The state of the SPA node", - 0, - G_MAXUINT, - SPA_NODE_STATE_INIT, - G_PARAM_READABLE | - G_PARAM_STATIC_STRINGS)); - signals[SIGNAL_REMOVE] = g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, @@ -905,6 +972,17 @@ pinos_node_class_init (PinosNodeClass * klass) G_TYPE_NONE, 0, G_TYPE_NONE); + signals[SIGNAL_STATE_CHANGE] = g_signal_new ("state-change", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 2, + PINOS_TYPE_NODE_STATE, + PINOS_TYPE_NODE_STATE); signals[SIGNAL_PORT_ADDED] = g_signal_new ("port-added", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, @@ -913,9 +991,8 @@ pinos_node_class_init (PinosNodeClass * klass) NULL, g_cclosure_marshal_generic, G_TYPE_NONE, - 2, - PINOS_TYPE_DIRECTION, - G_TYPE_UINT); + 1, + PINOS_TYPE_DIRECTION); signals[SIGNAL_PORT_REMOVED] = g_signal_new ("port-removed", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, @@ -925,6 +1002,17 @@ pinos_node_class_init (PinosNodeClass * klass) g_cclosure_marshal_generic, G_TYPE_NONE, 1, + PINOS_TYPE_DIRECTION); + signals[SIGNAL_ASYNC_COMPLETE] = g_signal_new ("async-complete", + G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, + 0, + NULL, + NULL, + g_cclosure_marshal_generic, + G_TYPE_NONE, + 2, + G_TYPE_UINT, G_TYPE_UINT); node_class->set_state = node_set_state; @@ -940,8 +1028,8 @@ pinos_node_init (PinosNode * node) g_signal_connect (priv->iface, "handle-remove", (GCallback) handle_remove, node); - priv->state = PINOS_NODE_STATE_SUSPENDED; - pinos_node1_set_state (priv->iface, PINOS_NODE_STATE_SUSPENDED); + priv->state = PINOS_NODE_STATE_CREATING; + pinos_node1_set_state (priv->iface, priv->state); priv->input_links = g_array_new (FALSE, TRUE, sizeof (NodeLink)); priv->output_links = g_array_new (FALSE, TRUE, sizeof (NodeLink)); @@ -1273,6 +1361,7 @@ pinos_node_link (PinosNode *output_node, goto no_output_ports; input_port = get_free_node_port (input_node, PINOS_DIRECTION_INPUT); + g_debug ("node %p: port %u, %u", input_node, input_port, input_node->priv->n_input_ports); if (input_port == SPA_ID_INVALID && input_node->priv->n_input_ports > 0) input_port = input_node->priv->input_port_ids[0]; else @@ -1403,15 +1492,19 @@ pinos_node_update_state (PinosNode *node, PinosNodeState state) { PinosNodePrivate *priv; + PinosNodeState old; g_return_if_fail (PINOS_IS_NODE (node)); priv = node->priv; - if (priv->state != state) { - g_debug ("node %p: update state to %s", node, pinos_node_state_as_string (state)); + old = priv->state; + if (old != state) { + g_debug ("node %p: update state from %s -> %s", node, + pinos_node_state_as_string (old), + pinos_node_state_as_string (state)); priv->state = state; pinos_node1_set_state (priv->iface, state); - g_object_notify (G_OBJECT (node), "state"); + g_signal_emit (node, signals[SIGNAL_STATE_CHANGE], 0, old, priv->state); } } diff --git a/pinos/server/node.h b/pinos/server/node.h index beeb64f74..2c695eba3 100644 --- a/pinos/server/node.h +++ b/pinos/server/node.h @@ -56,6 +56,9 @@ struct _PinosNode { bool live; SpaClock *clock; + gboolean have_inputs; + gboolean have_outputs; + PinosNodePrivate *priv; }; diff --git a/spa/include/spa/control.h b/spa/include/spa/control.h index f56850162..181985067 100644 --- a/spa/include/spa/control.h +++ b/spa/include/spa/control.h @@ -59,10 +59,9 @@ typedef enum { /* client to server */ SPA_CONTROL_CMD_NODE_UPDATE = 1, SPA_CONTROL_CMD_PORT_UPDATE = 2, - SPA_CONTROL_CMD_PORT_REMOVED = 3, - SPA_CONTROL_CMD_NODE_STATE_CHANGE = 4, + SPA_CONTROL_CMD_NODE_STATE_CHANGE = 3, - SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 5, + SPA_CONTROL_CMD_PORT_STATUS_CHANGE = 4, /* server to client */ SPA_CONTROL_CMD_ADD_PORT = 32, @@ -109,11 +108,6 @@ typedef struct { const SpaPortInfo *info; } SpaControlCmdPortUpdate; -/* SPA_CONTROL_CMD_PORT_REMOVED */ -typedef struct { - uint32_t port_id; -} SpaControlCmdPortRemoved; - /* SPA_CONTROL_CMD_PORT_STATUS_CHANGE */ /* SPA_CONTROL_CMD_NODE_STATE_CHANGE */ diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 6d1762123..f9b6714e3 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -34,8 +34,6 @@ typedef struct _SpaNodeEvent SpaNodeEvent; * SpaEventType: * @SPA_NODE_EVENT_TYPE_INVALID: invalid event, should be ignored * @SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: an async operation completed - * @SPA_NODE_EVENT_TYPE_PORT_ADDED: a new port is added - * @SPA_NODE_EVENT_TYPE_PORT_REMOVED: a port is removed * @SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: emited when an async node has output that can be pulled * @SPA_NODE_EVENT_TYPE_NEED_INPUT: emited when more data can be pushed to an async node * @SPA_NODE_EVENT_TYPE_REUSE_BUFFER: emited when a buffer can be reused @@ -47,12 +45,11 @@ typedef struct _SpaNodeEvent SpaNodeEvent; * @SPA_NODE_EVENT_TYPE_ERROR: emited when error occured * @SPA_NODE_EVENT_TYPE_BUFFERING: emited when buffering is in progress * @SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: emited when a keyframe refresh is needed + * @SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE: the element asks for a clock update */ typedef enum { SPA_NODE_EVENT_TYPE_INVALID = 0, SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE, - SPA_NODE_EVENT_TYPE_PORT_ADDED, - SPA_NODE_EVENT_TYPE_PORT_REMOVED, SPA_NODE_EVENT_TYPE_HAVE_OUTPUT, SPA_NODE_EVENT_TYPE_NEED_INPUT, SPA_NODE_EVENT_TYPE_REUSE_BUFFER, diff --git a/spa/include/spa/ringbuffer.h b/spa/include/spa/ringbuffer.h index 7b7320d1e..a0b4e8c78 100644 --- a/spa/include/spa/ringbuffer.h +++ b/spa/include/spa/ringbuffer.h @@ -29,20 +29,18 @@ typedef struct _SpaRingbuffer SpaRingbuffer; #include typedef struct { - uint8_t *data; - size_t len; + off_t offset; + size_t len; } SpaRingbufferArea; /** * SpaRingbuffer: - * @data: pointer to data * @readindex: the current read index * @writeindex: the current write index * @size: the size of the ringbuffer * @size_mask: mask if @size is power of 2 */ struct _SpaRingbuffer { - uint8_t *data; volatile size_t readindex; volatile size_t writeindex; size_t size; @@ -50,7 +48,7 @@ struct _SpaRingbuffer { }; SpaResult spa_ringbuffer_init (SpaRingbuffer *rbuf, - uint8_t *data, size_t size); + size_t size); SpaResult spa_ringbuffer_clear (SpaRingbuffer *rbuf); diff --git a/spa/lib/control.c b/spa/lib/control.c index 594754dc9..f06aaf9f6 100644 --- a/spa/lib/control.c +++ b/spa/lib/control.c @@ -492,7 +492,6 @@ spa_control_iter_set_data (SpaControlIter *iter, size_t size) { struct stack_iter *si = SCSI (iter); - SpaResult res = SPA_RESULT_OK; if (!is_valid_iter (iter)) return SPA_RESULT_INVALID_ARGUMENTS; @@ -526,12 +525,6 @@ spa_control_iter_parse_cmd (SpaControlIter *iter, iter_parse_port_update (si, command); break; - case SPA_CONTROL_CMD_PORT_REMOVED: - if (si->size < sizeof (SpaControlCmdPortRemoved)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdPortRemoved)); - break; - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: fprintf (stderr, "implement iter of %d\n", si->cmd); break; @@ -1245,11 +1238,6 @@ spa_control_builder_add_cmd (SpaControlBuilder *builder, builder_add_port_update (sb, command); break; - case SPA_CONTROL_CMD_PORT_REMOVED: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdPortRemoved)); - memcpy (p, command, sizeof (SpaControlCmdPortRemoved)); - break; - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: p = builder_add_cmd (sb, cmd, 0); break; diff --git a/spa/lib/ringbuffer.c b/spa/lib/ringbuffer.c index 28d762af2..548c9a34f 100644 --- a/spa/lib/ringbuffer.c +++ b/spa/lib/ringbuffer.c @@ -22,8 +22,8 @@ /** * spa_ringbuffer_init: * @rbuf: a #SpaRingbuffer - * @data: pointer to data - * @size: size of @data + * @data: pointer to an array + * @size: the number of elements in @data * * Initialize a #SpaRingbuffer with @data and @size. * When size is a power of 2, size_mask will be set with the mask to @@ -34,12 +34,11 @@ */ SpaResult spa_ringbuffer_init (SpaRingbuffer *rbuf, - uint8_t *data, size_t size) + size_t size) { - if (rbuf == NULL || data == NULL || size == 0) + if (rbuf == NULL || size == 0) return SPA_RESULT_INVALID_ARGUMENTS; - rbuf->data = data; rbuf->size = size; rbuf->readindex = 0; rbuf->writeindex = 0; @@ -93,13 +92,14 @@ spa_ringbuffer_get_read_areas (SpaRingbuffer *rbuf, avail = (rbuf->size_mask ? avail & rbuf->size_mask : avail % rbuf->size); } end = r + avail; + + areas[0].offset = r; + areas[1].offset = 0; + if (end > rbuf->size) { - areas[0].data = &rbuf->data[r]; areas[0].len = rbuf->size - r; - areas[1].data = rbuf->data; areas[1].len = end - rbuf->size; } else { - areas[0].data = &rbuf->data[r]; areas[0].len = avail; areas[1].len = 0; } @@ -146,20 +146,21 @@ spa_ringbuffer_get_write_areas (SpaRingbuffer *rbuf, if (w > r) { avail = (r - w + rbuf->size); avail = (rbuf->size_mask ? avail & rbuf->size_mask : avail % rbuf->size); - avail -= 1; } else if (w < r) { - avail = r - w - 1; + avail = r - w; } else { - avail = rbuf->size - 1; + avail = rbuf->size; } + avail -= 1; end = w + avail; + + areas[0].offset = w; + areas[1].offset = 0; + if (end > rbuf->size) { - areas[0].data = &rbuf->data[w]; areas[0].len = rbuf->size - w; - areas[1].data = rbuf->data; areas[1].len = end - rbuf->size; } else { - areas[0].data = &rbuf->data[w]; areas[0].len = avail; areas[1].len = 0; } diff --git a/spa/plugins/alsa/alsa-monitor.c b/spa/plugins/alsa/alsa-monitor.c index cc30e5951..35f8ea506 100644 --- a/spa/plugins/alsa/alsa-monitor.c +++ b/spa/plugins/alsa/alsa-monitor.c @@ -173,14 +173,11 @@ fill_item (ALSAItem *item, struct udev_device *udevice) item->info_items[i].key = "device.product.id"; item->info_items[i++].value = str; } - str = udev_device_get_property_value (item->udevice, "ID_V4L_PRODUCT"); + str = udev_device_get_property_value (item->udevice, "ID_MODEL_FROM_DATABASE"); if (!(str && *str)) { - str = udev_device_get_property_value (item->udevice, "ID_MODEL_FROM_DATABASE"); + str = udev_device_get_property_value (item->udevice, "ID_MODEL_ENC"); if (!(str && *str)) { - str = udev_device_get_property_value (item->udevice, "ID_MODEL_ENC"); - if (!(str && *str)) { - str = udev_device_get_property_value (item->udevice, "ID_MODEL"); - } + str = udev_device_get_property_value (item->udevice, "ID_MODEL"); } } if (str && *str) { @@ -194,8 +191,8 @@ fill_item (ALSAItem *item, struct udev_device *udevice) item->info_items[i].key = "device.serial"; item->info_items[i++].value = str; } - if ((str = udev_device_get_property_value (item->udevice, "ID_V4L_CAPABILITIES")) && *str) { - item->info_items[i].key = "device.capabilities"; + if ((str = udev_device_get_property_value (item->udevice, "SOUND_FORM_FACTOR")) && *str) { + item->info_items[i].key = "device.form_factor"; item->info_items[i++].value = str; } item->info.n_items = i; diff --git a/spa/plugins/remote/proxy.c b/spa/plugins/remote/proxy.c index 577cfd9c0..ab4560a0b 100644 --- a/spa/plugins/remote/proxy.c +++ b/spa/plugins/remote/proxy.c @@ -338,9 +338,7 @@ static void do_update_port (SpaProxy *this, SpaControlCmdPortUpdate *pu) { - SpaNodeEvent event; SpaProxyPort *port; - SpaNodeEventPortAdded pa; unsigned int i; port = &this->ports[pu->port_id]; @@ -369,12 +367,6 @@ do_update_port (SpaProxy *this, this->n_inputs++; else this->n_outputs++; - - event.type = SPA_NODE_EVENT_TYPE_PORT_ADDED; - event.size = sizeof (pa); - event.data = &pa; - pa.port_id = pu->port_id; - this->event_cb (&this->node, &event, this->user_data); } } @@ -382,9 +374,7 @@ static void do_uninit_port (SpaProxy *this, uint32_t port_id) { - SpaNodeEvent event; SpaProxyPort *port; - SpaNodeEventPortRemoved pr; fprintf (stderr, "proxy %p: removing port %d\n", this, port_id); port = &this->ports[port_id]; @@ -398,12 +388,6 @@ do_uninit_port (SpaProxy *this, if (port->format) spa_format_unref (port->format); port->format = NULL; - - event.type = SPA_NODE_EVENT_TYPE_PORT_REMOVED; - event.size = sizeof (pr); - event.data = ≺ - pr.port_id = port_id; - this->event_cb (&this->node, &event, this->user_data); } static SpaResult @@ -934,8 +918,6 @@ handle_node_event (SpaProxy *this, case SPA_NODE_EVENT_TYPE_INVALID: break; - case SPA_NODE_EVENT_TYPE_PORT_ADDED: - case SPA_NODE_EVENT_TYPE_PORT_REMOVED: case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT: @@ -976,10 +958,6 @@ parse_control (SpaProxy *this, fprintf (stderr, "proxy %p: got unexpected control %d\n", this, cmd); break; - case SPA_CONTROL_CMD_PORT_REMOVED: - fprintf (stderr, "proxy %p: command not implemented %d\n", this, cmd); - break; - case SPA_CONTROL_CMD_NODE_UPDATE: { SpaControlCmdNodeUpdate nu; @@ -1045,12 +1023,11 @@ parse_control (SpaProxy *this, if (spa_control_iter_parse_cmd (&it, &sc) < 0) break; + fprintf (stderr, "proxy %p: got node state change %d -> %d\n", this, old, sc.state); this->node.state = sc.state; if (old == SPA_NODE_STATE_INIT) send_async_complete (this, 0, SPA_RESULT_OK); - fprintf (stderr, "proxy %p: got node state change %d\n", this, this->node.state); - break; } diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index ea7d61765..fa488b6f1 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -28,6 +28,7 @@ #include #include #include +#include typedef struct _SpaV4l2Source SpaV4l2Source; @@ -105,8 +106,7 @@ typedef struct { struct v4l2_requestbuffers reqbuf; SpaMemory *alloc_mem; V4l2Buffer *alloc_buffers; - V4l2Buffer *ready; - uint32_t ready_count; + SpaQueue ready; SpaPollFd fds[1]; SpaPollItem poll; @@ -683,16 +683,13 @@ spa_v4l2_source_node_port_pull_output (SpaNode *node, have_error = true; continue; } - if (state->ready_count == 0) { + + SPA_QUEUE_POP_HEAD (&state->ready, V4l2Buffer, next, b); + if (b == NULL) { info[i].status = SPA_RESULT_UNEXPECTED; have_error = true; continue; } - - b = state->ready; - state->ready = b->next; - state->ready_count--; - b->outstanding = true; info[i].buffer_id = b->outbuf->id; @@ -844,6 +841,8 @@ v4l2_source_init (const SpaHandleFactory *factory, this->props[1].props.prop_info = prop_info; reset_v4l2_source_props (&this->props[1]); + SPA_QUEUE_INIT (&this->state[0].ready); + this->state[0].info.flags = SPA_PORT_INFO_FLAG_LIVE; this->state[0].status.flags = SPA_PORT_STATUS_FLAG_NONE; diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index efdff82db..d7a872c60 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -848,9 +848,7 @@ mmap_read (SpaV4l2Source *this) d = SPA_BUFFER_DATAS (b->outbuf); d[0].mem.size = buf.bytesused; - b->next = state->ready; - state->ready = b; - state->ready_count++; + SPA_QUEUE_PUSH_TAIL (&state->ready, V4l2Buffer, next, b); return SPA_RESULT_OK; }