Remove unused events, drained and marker can be done with
ASYNC_COMPLETED messages
Handle result of idle callback to disable the poll item
Identify poll items with a unique id.
Remove set_state vfunc
push_event -> send_command, commands are to do something, events are the
result of something.
Add poll item in v4l2 as soon as we have the fd but disable the item
until streaming starts.
This commit is contained in:
Wim Taymans 2016-10-21 14:57:01 +02:00
parent 9b2b4b9b5c
commit 1bd751372e
21 changed files with 188 additions and 190 deletions

View file

@ -821,8 +821,6 @@ handle_node_event (PinosStream *stream,
case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
case SPA_NODE_EVENT_TYPE_DRAINED:
case SPA_NODE_EVENT_TYPE_MARKER:
case SPA_NODE_EVENT_TYPE_ERROR: case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING: case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
@ -842,8 +840,6 @@ handle_rtnode_event (PinosStream *stream,
switch (event->type) { switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID: case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE: case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_DRAINED:
case SPA_NODE_EVENT_TYPE_MARKER:
case SPA_NODE_EVENT_TYPE_ERROR: case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING: case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:

View file

@ -983,21 +983,21 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_proxy_node_port_push_event (SpaNode *node, spa_proxy_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
SpaProxy *this; SpaProxy *this;
if (node == NULL || event == NULL) if (node == NULL || command == NULL)
return SPA_RESULT_INVALID_ARGUMENTS; return SPA_RESULT_INVALID_ARGUMENTS;
this = SPA_CONTAINER_OF (node, SpaProxy, node); this = SPA_CONTAINER_OF (node, SpaProxy, node);
switch (event->type) { switch (command->type) {
default: default:
spa_log_warn (this->log, "unhandled event %d\n", event->type); spa_log_warn (this->log, "unhandled command %d\n", command->type);
break; break;
} }
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
@ -1015,8 +1015,6 @@ handle_node_event (SpaProxy *this,
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT:
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
case SPA_NODE_EVENT_TYPE_DRAINED:
case SPA_NODE_EVENT_TYPE_MARKER:
case SPA_NODE_EVENT_TYPE_ERROR: case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING: case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
@ -1237,7 +1235,7 @@ static const SpaNode proxy_node = {
spa_proxy_node_port_push_input, spa_proxy_node_port_push_input,
spa_proxy_node_port_pull_output, spa_proxy_node_port_pull_output,
spa_proxy_node_port_reuse_buffer, spa_proxy_node_port_reuse_buffer,
spa_proxy_node_port_push_event, spa_proxy_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -41,6 +41,8 @@ struct _PinosDataLoopPrivate
SpaPollFd fds[32]; SpaPollFd fds[32];
unsigned int n_fds; unsigned int n_fds;
uint32_t counter;
gboolean running; gboolean running;
pthread_t thread; pthread_t thread;
}; };
@ -78,7 +80,8 @@ loop (void *user_data)
ndata.fds = NULL; ndata.fds = NULL;
ndata.n_fds = 0; ndata.n_fds = 0;
ndata.user_data = p->user_data; ndata.user_data = p->user_data;
p->idle_cb (&ndata); if (p->idle_cb (&ndata) < 0)
p->enabled = false;
n_idle++; n_idle++;
} }
} }
@ -199,6 +202,7 @@ do_add_item (SpaPoll *poll,
gboolean in_thread = pthread_equal (priv->thread, pthread_self()); gboolean in_thread = pthread_equal (priv->thread, pthread_self());
unsigned int i; unsigned int i;
item->id = ++priv->counter;
g_debug ("data-loop %p: %d: add pollid %d, n_poll %d, n_fds %d", this, in_thread, item->id, priv->n_poll, item->n_fds); g_debug ("data-loop %p: %d: add pollid %d, n_poll %d, n_fds %d", this, in_thread, item->id, priv->n_poll, item->n_fds);
priv->poll[priv->n_poll] = *item; priv->poll[priv->n_poll] = *item;
priv->n_poll++; priv->n_poll++;
@ -210,8 +214,8 @@ do_add_item (SpaPoll *poll,
start_thread (this); start_thread (this);
} }
for (i = 0; i < priv->n_poll; i++) { for (i = 0; i < priv->n_poll; i++) {
if (priv->poll[i].fds) if (priv->poll[i].n_fds > 0)
g_debug ("poll %d: %p %d", i, priv->poll[i].user_data, priv->poll[i].fds[0].fd); g_debug ("poll %d: %d %d", i, priv->poll[i].id, priv->poll[i].fds[0].fd);
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -227,7 +231,7 @@ do_update_item (SpaPoll *poll,
unsigned int i; unsigned int i;
for (i = 0; i < priv->n_poll; i++) { for (i = 0; i < priv->n_poll; i++) {
if (priv->poll[i].id == item->id && priv->poll[i].user_data == item->user_data) if (priv->poll[i].id == item->id)
priv->poll[i] = *item; priv->poll[i] = *item;
} }
if (item->n_fds) if (item->n_fds)
@ -248,9 +252,9 @@ do_remove_item (SpaPoll *poll,
gboolean in_thread = pthread_equal (priv->thread, pthread_self()); gboolean in_thread = pthread_equal (priv->thread, pthread_self());
unsigned int i; unsigned int i;
g_debug ("data-loop %p: remove poll %d %d", this, item->n_fds, priv->n_poll); g_debug ("data-loop %p: %d: remove poll %d %d", this, item->id, item->n_fds, priv->n_poll);
for (i = 0; i < priv->n_poll; i++) { for (i = 0; i < priv->n_poll; i++) {
if (priv->poll[i].id == item->id && priv->poll[i].user_data == item->user_data) { if (priv->poll[i].id == item->id) {
priv->n_poll--; priv->n_poll--;
for (; i < priv->n_poll; i++) for (; i < priv->n_poll; i++)
priv->poll[i] = priv->poll[i+1]; priv->poll[i] = priv->poll[i+1];
@ -266,8 +270,8 @@ do_remove_item (SpaPoll *poll,
stop_thread (this, in_thread); stop_thread (this, in_thread);
} }
for (i = 0; i < priv->n_poll; i++) { for (i = 0; i < priv->n_poll; i++) {
if (priv->poll[i].fds) if (priv->poll[i].n_fds > 0)
g_debug ("poll %d: %p %d", i, priv->poll[i].user_data, priv->poll[i].fds[0].fd); g_debug ("poll %d: %d %d", i, priv->poll[i].id, priv->poll[i].fds[0].fd);
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }

View file

@ -655,11 +655,10 @@ do_start (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
pinos_link_update_state (this, PINOS_LINK_STATE_PAUSED); pinos_link_update_state (this, PINOS_LINK_STATE_PAUSED);
if (in_state == SPA_NODE_STATE_PAUSED) { if (in_state == SPA_NODE_STATE_PAUSED) {
pinos_node_set_state (this->input->node, PINOS_NODE_STATE_RUNNING); res = pinos_node_set_state (this->input->node, PINOS_NODE_STATE_RUNNING);
} }
if (out_state == SPA_NODE_STATE_PAUSED) { if (out_state == SPA_NODE_STATE_PAUSED) {
pinos_node_set_state (this->output->node, PINOS_NODE_STATE_RUNNING); res = pinos_node_set_state (this->output->node, PINOS_NODE_STATE_RUNNING);
} }
} }
return res; return res;

View file

@ -280,6 +280,7 @@ pinos_main_loop_defer_cancel (PinosMainLoop *loop,
for (walk = priv->work.head; walk; walk = g_list_next (walk)) { for (walk = priv->work.head; walk; walk = g_list_next (walk)) {
WorkItem *i = walk->data; WorkItem *i = walk->data;
if ((id == 0 || i->id == id) && (obj == NULL || i->obj == obj)) { if ((id == 0 || i->id == id) && (obj == NULL || i->obj == obj)) {
g_debug ("main-loop %p: cancel defer %d for object %p", loop, i->seq, i->obj);
i->seq = SPA_ID_INVALID; i->seq = SPA_ID_INVALID;
i->func = NULL; i->func = NULL;
have_work = TRUE; have_work = TRUE;

View file

@ -328,66 +328,6 @@ send_clock_update (PinosNode *this)
g_debug ("got error %d", res); g_debug ("got error %d", res);
} }
typedef struct {
PinosNode *node;
PinosNodeState state;
} StateData;
static void
on_state_complete (StateData *data)
{
pinos_node_update_state (data->node, data->state);
}
static gboolean
node_set_state (PinosNode *this,
PinosNodeState state)
{
PinosNodePrivate *priv = this->priv;
SpaResult res = SPA_RESULT_OK;
StateData data;
g_debug ("node %p: set state %s", this, pinos_node_state_as_string (state));
switch (state) {
case PINOS_NODE_STATE_CREATING:
return FALSE;
case PINOS_NODE_STATE_SUSPENDED:
res = suspend_node (this);
break;
case PINOS_NODE_STATE_INITIALIZING:
break;
case PINOS_NODE_STATE_IDLE:
res = pause_node (this);
break;
case PINOS_NODE_STATE_RUNNING:
send_clock_update (this);
res = start_node (this);
break;
case PINOS_NODE_STATE_ERROR:
break;
}
if (SPA_RESULT_IS_ERROR (res))
return FALSE;
data.node = this;
data.state = state;
pinos_main_loop_defer (priv->main_loop,
this,
res,
(PinosDeferFunc) on_state_complete,
g_memdup (&data, sizeof (StateData)),
g_free);
return TRUE;
}
static gboolean static gboolean
do_read_link (PinosNode *this, PinosLink *link) do_read_link (PinosNode *this, PinosLink *link)
{ {
@ -427,8 +367,6 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
switch (event->type) { switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID: case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_DRAINED:
case SPA_NODE_EVENT_TYPE_MARKER:
case SPA_NODE_EVENT_TYPE_ERROR: case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING: case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
@ -793,7 +731,6 @@ static void
pinos_node_class_init (PinosNodeClass * klass) pinos_node_class_init (PinosNodeClass * klass)
{ {
GObjectClass *gobject_class = G_OBJECT_CLASS (klass); GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
PinosNodeClass *node_class = PINOS_NODE_CLASS (klass);
g_type_class_add_private (klass, sizeof (PinosNodePrivate)); g_type_class_add_private (klass, sizeof (PinosNodePrivate));
@ -930,8 +867,6 @@ pinos_node_class_init (PinosNodeClass * klass)
2, 2,
G_TYPE_UINT, G_TYPE_UINT,
G_TYPE_UINT); G_TYPE_UINT);
node_class->set_state = node_set_state;
} }
static void static void
@ -1287,6 +1222,17 @@ remove_idle_timeout (PinosNode *node)
} }
} }
typedef struct {
PinosNode *node;
PinosNodeState state;
} StateData;
static void
on_state_complete (StateData *data)
{
pinos_node_update_state (data->node, data->state);
}
/** /**
* pinos_node_set_state: * pinos_node_set_state:
* @node: a #PinosNode * @node: a #PinosNode
@ -1294,26 +1240,58 @@ remove_idle_timeout (PinosNode *node)
* *
* Set the state of @node to @state. * Set the state of @node to @state.
* *
* Returns: %TRUE on success. * Returns: a #SpaResult
*/ */
gboolean SpaResult
pinos_node_set_state (PinosNode *node, pinos_node_set_state (PinosNode *node,
PinosNodeState state) PinosNodeState state)
{ {
PinosNodeClass *klass; PinosNodePrivate *priv;
gboolean res; SpaResult res = SPA_RESULT_OK;
StateData data;
g_return_val_if_fail (PINOS_IS_NODE (node), FALSE); g_return_val_if_fail (PINOS_IS_NODE (node), SPA_RESULT_INVALID_ARGUMENTS);
priv = node->priv;
klass = PINOS_NODE_GET_CLASS (node);
remove_idle_timeout (node); remove_idle_timeout (node);
g_debug ("node %p: set state to %s", node, pinos_node_state_as_string (state)); g_debug ("node %p: set state %s", node, pinos_node_state_as_string (state));
if (klass->set_state)
res = klass->set_state (node, state); switch (state) {
else case PINOS_NODE_STATE_CREATING:
res = FALSE; return SPA_RESULT_ERROR;
case PINOS_NODE_STATE_SUSPENDED:
res = suspend_node (node);
break;
case PINOS_NODE_STATE_INITIALIZING:
break;
case PINOS_NODE_STATE_IDLE:
res = pause_node (node);
break;
case PINOS_NODE_STATE_RUNNING:
send_clock_update (node);
res = start_node (node);
break;
case PINOS_NODE_STATE_ERROR:
break;
}
if (SPA_RESULT_IS_ERROR (res))
return res;
data.node = node;
data.state = state;
pinos_main_loop_defer (priv->main_loop,
node,
res,
(PinosDeferFunc) on_state_complete,
g_memdup (&data, sizeof (StateData)),
g_free);
return res; return res;
} }

View file

@ -84,9 +84,6 @@ struct _PinosNode {
*/ */
struct _PinosNodeClass { struct _PinosNodeClass {
GObjectClass parent_class; GObjectClass parent_class;
gboolean (*set_state) (PinosNode *node,
PinosNodeState state);
}; };
/* normal GObject stuff */ /* normal GObject stuff */

View file

@ -48,7 +48,7 @@ typedef enum {
/** /**
* SpaClock: * SpaClock:
* *
* The main processing clocks. * A time provider.
*/ */
struct _SpaClock { struct _SpaClock {
/* the total size of this clock. This can be used to expand this /* the total size of this clock. This can be used to expand this

View file

@ -37,8 +37,6 @@ typedef struct _SpaNodeEvent SpaNodeEvent;
#define SPA_NODE_EVENT__HaveOutput SPA_NODE_EVENT_PREFIX "HaveOutput" #define SPA_NODE_EVENT__HaveOutput SPA_NODE_EVENT_PREFIX "HaveOutput"
#define SPA_NODE_EVENT__NeedInput SPA_NODE_EVENT_PREFIX "NeedInput" #define SPA_NODE_EVENT__NeedInput SPA_NODE_EVENT_PREFIX "NeedInput"
#define SPA_NODE_EVENT__ReuseBuffer SPA_NODE_EVENT_PREFIX "ReuseBuffer" #define SPA_NODE_EVENT__ReuseBuffer SPA_NODE_EVENT_PREFIX "ReuseBuffer"
#define SPA_NODE_EVENT__Drained SPA_NODE_EVENT_PREFIX "Drained"
#define SPA_NODE_EVENT__Marker SPA_NODE_EVENT_PREFIX "Marker"
#define SPA_NODE_EVENT__Error SPA_NODE_EVENT_PREFIX "Error" #define SPA_NODE_EVENT__Error SPA_NODE_EVENT_PREFIX "Error"
#define SPA_NODE_EVENT__Buffering SPA_NODE_EVENT_PREFIX "Buffering" #define SPA_NODE_EVENT__Buffering SPA_NODE_EVENT_PREFIX "Buffering"
#define SPA_NODE_EVENT__RequestRefresh SPA_NODE_EVENT_PREFIX "RequestRefresh" #define SPA_NODE_EVENT__RequestRefresh SPA_NODE_EVENT_PREFIX "RequestRefresh"
@ -51,8 +49,6 @@ typedef struct _SpaNodeEvent SpaNodeEvent;
* @SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: emited when an async node has output that can be pulled * @SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: emited when an async node has output that can be pulled
* @SPA_NODE_EVENT_TYPE_NEED_INPUT: emited when more data can be pushed to an async node * @SPA_NODE_EVENT_TYPE_NEED_INPUT: emited when more data can be pushed to an async node
* @SPA_NODE_EVENT_TYPE_REUSE_BUFFER: emited when a buffer can be reused * @SPA_NODE_EVENT_TYPE_REUSE_BUFFER: emited when a buffer can be reused
* @SPA_NODE_EVENT_TYPE_DRAINED: emited when DRAIN command completed
* @SPA_NODE_EVENT_TYPE_MARKER: emited when MARK command completed
* @SPA_NODE_EVENT_TYPE_ERROR: emited when error occured * @SPA_NODE_EVENT_TYPE_ERROR: emited when error occured
* @SPA_NODE_EVENT_TYPE_BUFFERING: emited when buffering is in progress * @SPA_NODE_EVENT_TYPE_BUFFERING: emited when buffering is in progress
* @SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: emited when a keyframe refresh is needed * @SPA_NODE_EVENT_TYPE_REQUEST_REFRESH: emited when a keyframe refresh is needed
@ -64,8 +60,6 @@ typedef enum {
SPA_NODE_EVENT_TYPE_HAVE_OUTPUT, SPA_NODE_EVENT_TYPE_HAVE_OUTPUT,
SPA_NODE_EVENT_TYPE_NEED_INPUT, SPA_NODE_EVENT_TYPE_NEED_INPUT,
SPA_NODE_EVENT_TYPE_REUSE_BUFFER, SPA_NODE_EVENT_TYPE_REUSE_BUFFER,
SPA_NODE_EVENT_TYPE_DRAINED,
SPA_NODE_EVENT_TYPE_MARKER,
SPA_NODE_EVENT_TYPE_ERROR, SPA_NODE_EVENT_TYPE_ERROR,
SPA_NODE_EVENT_TYPE_BUFFERING, SPA_NODE_EVENT_TYPE_BUFFERING,
SPA_NODE_EVENT_TYPE_REQUEST_REFRESH, SPA_NODE_EVENT_TYPE_REQUEST_REFRESH,

View file

@ -130,7 +130,7 @@ typedef struct {
} SpaPortOutputInfo; } SpaPortOutputInfo;
/** /**
* SpaNodeCallback: * SpaNodeEventCallback:
* @node: a #SpaNode emiting the event * @node: a #SpaNode emiting the event
* @event: the event that was emited * @event: the event that was emited
* @user_data: user data provided when registering the callback * @user_data: user data provided when registering the callback
@ -174,6 +174,8 @@ struct _SpaNode {
* can be modified. The modifications will take effect after a call * can be modified. The modifications will take effect after a call
* to SpaNode::set_props. * to SpaNode::set_props.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node or props are %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node or props are %NULL
* #SPA_RESULT_NOT_IMPLEMENTED when there are no properties * #SPA_RESULT_NOT_IMPLEMENTED when there are no properties
@ -196,6 +198,8 @@ struct _SpaNode {
* *
* If @props is NULL, all the properties are reset to their defaults. * If @props is NULL, all the properties are reset to their defaults.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL
* #SPA_RESULT_NOT_IMPLEMENTED when no properties can be * #SPA_RESULT_NOT_IMPLEMENTED when no properties can be
@ -214,6 +218,8 @@ struct _SpaNode {
* *
* Upon completion, a command might change the state of a node. * Upon completion, a command might change the state of a node.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node or command is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node or command is %NULL
* #SPA_RESULT_NOT_IMPLEMENTED when this node can't process commands * #SPA_RESULT_NOT_IMPLEMENTED when this node can't process commands
@ -234,6 +240,8 @@ struct _SpaNode {
* The callback can be emited from any thread. The caller should take * The callback can be emited from any thread. The caller should take
* appropriate actions to node the event in other threads when needed. * appropriate actions to node the event in other threads when needed.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL
*/ */
@ -251,6 +259,8 @@ struct _SpaNode {
* Get the current number of input and output ports and also the maximum * Get the current number of input and output ports and also the maximum
* number of ports. * number of ports.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL
*/ */
@ -269,6 +279,8 @@ struct _SpaNode {
* *
* Get the ids of the currently available ports. * Get the ids of the currently available ports.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL
*/ */
@ -289,6 +301,8 @@ struct _SpaNode {
* *
* Port ids should be between 0 and max_ports as obtained from get_n_ports(). * Port ids should be between 0 and max_ports as obtained from get_n_ports().
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL
*/ */
@ -317,6 +331,8 @@ struct _SpaNode {
* The result format can be queried and modified and ultimately be used * The result format can be queried and modified and ultimately be used
* to call SpaNode::port_set_format. * to call SpaNode::port_set_format.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node or format is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node or format is %NULL
* #SPA_RESULT_INVALID_PORT when port_id is not valid * #SPA_RESULT_INVALID_PORT when port_id is not valid
@ -345,6 +361,8 @@ struct _SpaNode {
* Upon completion, this function might change the state of a node to * Upon completion, this function might change the state of a node to
* the READY state or to CONFIGURE when @format is NULL. * the READY state or to CONFIGURE when @format is NULL.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_OK_RECHECK on success, the value of @format might have been * #SPA_RESULT_OK_RECHECK on success, the value of @format might have been
* changed depending on @flags and the final value can be found by * changed depending on @flags and the final value can be found by
@ -374,6 +392,8 @@ struct _SpaNode {
* Get the format on @port_id of @node. The result #SpaFormat can * Get the format on @port_id of @node. The result #SpaFormat can
* not be modified. * not be modified.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when @node or @format is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when @node or @format is %NULL
* #SPA_RESULT_INVALID_PORT when @port_id is not valid * #SPA_RESULT_INVALID_PORT when @port_id is not valid
@ -423,6 +443,8 @@ struct _SpaNode {
* node to PAUSED, when the node has enough buffers on all ports, or READY * node to PAUSED, when the node has enough buffers on all ports, or READY
* when @buffers are %NULL. * when @buffers are %NULL.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_ASYNC the function is executed asynchronously * #SPA_RESULT_ASYNC the function is executed asynchronously
*/ */
@ -460,6 +482,8 @@ struct _SpaNode {
* Once the port has allocated buffers, the memory of the buffers can be * Once the port has allocated buffers, the memory of the buffers can be
* released again by calling SpaNode::port_use_buffers with %NULL. * released again by calling SpaNode::port_use_buffers with %NULL.
* *
* This function must be called from the main thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_ERROR when the node already has allocated buffers. * #SPA_RESULT_ERROR when the node already has allocated buffers.
* #SPA_RESULT_ASYNC the function is executed asynchronously * #SPA_RESULT_ASYNC the function is executed asynchronously
@ -485,6 +509,8 @@ struct _SpaNode {
* *
* Push a buffer id into one or more input ports of @node. * Push a buffer id into one or more input ports of @node.
* *
* This function must be called from the data thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node or info is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node or info is %NULL
* #SPA_RESULT_ERROR when one or more of the @info has an * #SPA_RESULT_ERROR when one or more of the @info has an
@ -503,6 +529,8 @@ struct _SpaNode {
* *
* Pull a buffer id from one or more output ports of @node. * Pull a buffer id from one or more output ports of @node.
* *
* This function must be called from the data thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node or info is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node or info is %NULL
* #SPA_RESULT_PORTS_CHANGED the number of ports has changed. None * #SPA_RESULT_PORTS_CHANGED the number of ports has changed. None
@ -527,6 +555,8 @@ struct _SpaNode {
* *
* Tell an output port to reuse a buffer. * Tell an output port to reuse a buffer.
* *
* This function must be called from the data thread.
*
* Returns: #SPA_RESULT_OK on success * Returns: #SPA_RESULT_OK on success
* #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL * #SPA_RESULT_INVALID_ARGUMENTS when node is %NULL
*/ */
@ -534,10 +564,10 @@ struct _SpaNode {
uint32_t port_id, uint32_t port_id,
uint32_t buffer_id); uint32_t buffer_id);
SpaResult (*port_push_event) (SpaNode *node, SpaResult (*port_send_command) (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event); SpaNodeCommand *command);
}; };
@ -561,7 +591,7 @@ struct _SpaNode {
#define spa_node_port_push_input(n,...) (n)->port_push_input((n),__VA_ARGS__) #define spa_node_port_push_input(n,...) (n)->port_push_input((n),__VA_ARGS__)
#define spa_node_port_pull_output(n,...) (n)->port_pull_output((n),__VA_ARGS__) #define spa_node_port_pull_output(n,...) (n)->port_pull_output((n),__VA_ARGS__)
#define spa_node_port_reuse_buffer(n,...) (n)->port_reuse_buffer((n),__VA_ARGS__) #define spa_node_port_reuse_buffer(n,...) (n)->port_reuse_buffer((n),__VA_ARGS__)
#define spa_node_port_push_event(n,...) (n)->port_push_event((n),__VA_ARGS__) #define spa_node_port_send_command(n,...) (n)->port_send_command((n),__VA_ARGS__)
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */

View file

@ -530,10 +530,10 @@ spa_alsa_sink_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_alsa_sink_node_port_push_event (SpaNode *node, spa_alsa_sink_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -563,7 +563,7 @@ static const SpaNode alsasink_node = {
spa_alsa_sink_node_port_push_input, spa_alsa_sink_node_port_push_input,
spa_alsa_sink_node_port_pull_output, spa_alsa_sink_node_port_pull_output,
spa_alsa_sink_node_port_reuse_buffer, spa_alsa_sink_node_port_reuse_buffer,
spa_alsa_sink_node_port_push_event, spa_alsa_sink_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -634,10 +634,10 @@ spa_alsa_source_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_alsa_source_node_port_push_event (SpaNode *node, spa_alsa_source_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -666,7 +666,7 @@ static const SpaNode alsasource_node = {
spa_alsa_source_node_port_push_input, spa_alsa_source_node_port_push_input,
spa_alsa_source_node_port_pull_output, spa_alsa_source_node_port_pull_output,
spa_alsa_source_node_port_reuse_buffer, spa_alsa_source_node_port_reuse_buffer,
spa_alsa_source_node_port_push_event, spa_alsa_source_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -722,10 +722,10 @@ spa_audiomixer_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_audiomixer_node_port_push_event (SpaNode *node, spa_audiomixer_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -754,7 +754,7 @@ static const SpaNode audiomixer_node = {
spa_audiomixer_node_port_push_input, spa_audiomixer_node_port_push_input,
spa_audiomixer_node_port_pull_output, spa_audiomixer_node_port_pull_output,
spa_audiomixer_node_port_reuse_buffer, spa_audiomixer_node_port_reuse_buffer,
spa_audiomixer_node_port_push_event, spa_audiomixer_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -859,10 +859,10 @@ spa_audiotestsrc_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_audiotestsrc_node_port_push_event (SpaNode *node, spa_audiotestsrc_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -891,7 +891,7 @@ static const SpaNode audiotestsrc_node = {
spa_audiotestsrc_node_port_push_input, spa_audiotestsrc_node_port_push_input,
spa_audiotestsrc_node_port_pull_output, spa_audiotestsrc_node_port_pull_output,
spa_audiotestsrc_node_port_reuse_buffer, spa_audiotestsrc_node_port_reuse_buffer,
spa_audiotestsrc_node_port_push_event, spa_audiotestsrc_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -498,10 +498,10 @@ spa_ffmpeg_dec_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_ffmpeg_dec_node_port_push_event (SpaNode *node, spa_ffmpeg_dec_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -531,7 +531,7 @@ static const SpaNode ffmpeg_dec_node = {
spa_ffmpeg_dec_node_port_push_input, spa_ffmpeg_dec_node_port_push_input,
spa_ffmpeg_dec_node_port_pull_output, spa_ffmpeg_dec_node_port_pull_output,
spa_ffmpeg_dec_node_port_reuse_buffer, spa_ffmpeg_dec_node_port_reuse_buffer,
spa_ffmpeg_dec_node_port_push_event, spa_ffmpeg_dec_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -508,10 +508,10 @@ spa_ffmpeg_enc_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_ffmpeg_enc_node_port_push_event (SpaNode *node, spa_ffmpeg_enc_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -540,7 +540,7 @@ static const SpaNode ffmpeg_enc_node = {
spa_ffmpeg_enc_node_port_push_input, spa_ffmpeg_enc_node_port_push_input,
spa_ffmpeg_enc_node_port_pull_output, spa_ffmpeg_enc_node_port_pull_output,
spa_ffmpeg_enc_node_port_reuse_buffer, spa_ffmpeg_enc_node_port_reuse_buffer,
spa_ffmpeg_enc_node_port_push_event, spa_ffmpeg_enc_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -226,7 +226,6 @@ spa_v4l2_source_node_send_command (SpaNode *node,
SpaNodeCommand *command) SpaNodeCommand *command)
{ {
SpaV4l2Source *this; SpaV4l2Source *this;
SpaResult res;
if (node == NULL || command == NULL) if (node == NULL || command == NULL)
return SPA_RESULT_INVALID_ARGUMENTS; return SPA_RESULT_INVALID_ARGUMENTS;
@ -247,10 +246,7 @@ spa_v4l2_source_node_send_command (SpaNode *node,
if (state->n_buffers == 0) if (state->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS; return SPA_RESULT_NO_BUFFERS;
if ((res = spa_v4l2_start (this)) < 0) return spa_v4l2_start (this);
return res;
break;
} }
case SPA_NODE_COMMAND_PAUSE: case SPA_NODE_COMMAND_PAUSE:
{ {
@ -262,12 +258,8 @@ spa_v4l2_source_node_send_command (SpaNode *node,
if (state->n_buffers == 0) if (state->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS; return SPA_RESULT_NO_BUFFERS;
if ((res = spa_v4l2_pause (this)) < 0) return spa_v4l2_pause (this);
return res;
break;
} }
case SPA_NODE_COMMAND_FLUSH: case SPA_NODE_COMMAND_FLUSH:
case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_DRAIN:
case SPA_NODE_COMMAND_MARKER: case SPA_NODE_COMMAND_MARKER:
@ -714,15 +706,14 @@ spa_v4l2_source_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_v4l2_source_node_port_push_event (SpaNode *node, spa_v4l2_source_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
static const SpaNode v4l2source_node = { static const SpaNode v4l2source_node = {
sizeof (SpaNode), sizeof (SpaNode),
NULL, NULL,
@ -747,7 +738,7 @@ static const SpaNode v4l2source_node = {
spa_v4l2_source_node_port_push_input, spa_v4l2_source_node_port_push_input,
spa_v4l2_source_node_port_pull_output, spa_v4l2_source_node_port_pull_output,
spa_v4l2_source_node_port_reuse_buffer, spa_v4l2_source_node_port_reuse_buffer,
spa_v4l2_source_node_port_push_event, spa_v4l2_source_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -9,6 +9,8 @@
#define CLEAR(x) memset(&(x), 0, sizeof(x)) #define CLEAR(x) memset(&(x), 0, sizeof(x))
static int v4l2_on_fd_events (SpaPollNotifyData *data);
static int static int
xioctl (int fd, int request, void *arg) xioctl (int fd, int request, void *arg)
{ {
@ -66,6 +68,21 @@ spa_v4l2_open (SpaV4l2Source *this)
spa_log_error (state->log, "v4l2: %s is no video capture device\n", props->device); spa_log_error (state->log, "v4l2: %s is no video capture device\n", props->device);
return -1; return -1;
} }
state->fds[0].fd = state->fd;
state->fds[0].events = POLLIN | POLLPRI | POLLERR;
state->fds[0].revents = 0;
state->poll.id = 0;
state->poll.enabled = false;
state->poll.fds = state->fds;
state->poll.n_fds = 1;
state->poll.idle_cb = NULL;
state->poll.before_cb = NULL;
state->poll.after_cb = v4l2_on_fd_events;
state->poll.user_data = this;
spa_poll_add_item (state->data_loop, &state->poll);
state->opened = true; state->opened = true;
return 0; return 0;
@ -140,6 +157,9 @@ spa_v4l2_close (SpaV4l2Source *this)
return 0; return 0;
spa_log_info (state->log, "v4l2: close\n"); spa_log_info (state->log, "v4l2: close\n");
spa_poll_remove_item (state->data_loop, &state->poll);
if (close(state->fd)) if (close(state->fd))
perror ("close"); perror ("close");
@ -1132,19 +1152,8 @@ spa_v4l2_start (SpaV4l2Source *this)
state->started = true; state->started = true;
update_state (this, SPA_NODE_STATE_STREAMING); update_state (this, SPA_NODE_STATE_STREAMING);
state->fds[0].fd = state->fd;
state->fds[0].events = POLLIN | POLLPRI | POLLERR;
state->fds[0].revents = 0;
state->poll.id = 0;
state->poll.enabled = true; state->poll.enabled = true;
state->poll.fds = state->fds; spa_poll_update_item (state->data_loop, &state->poll);
state->poll.n_fds = 1;
state->poll.idle_cb = NULL;
state->poll.before_cb = NULL;
state->poll.after_cb = v4l2_on_fd_events;
state->poll.user_data = this;
spa_poll_add_item (state->data_loop, &state->poll);
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -1161,7 +1170,8 @@ spa_v4l2_pause (SpaV4l2Source *this)
state->started = false; state->started = false;
spa_poll_remove_item (state->data_loop, &state->poll); state->poll.enabled = false;
spa_poll_update_item (state->data_loop, &state->poll);
type = V4L2_BUF_TYPE_VIDEO_CAPTURE; type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) { if (xioctl (state->fd, VIDIOC_STREAMOFF, &type) < 0) {

View file

@ -807,10 +807,10 @@ spa_videotestsrc_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_videotestsrc_node_port_push_event (SpaNode *node, spa_videotestsrc_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -839,7 +839,7 @@ static const SpaNode videotestsrc_node = {
spa_videotestsrc_node_port_push_input, spa_videotestsrc_node_port_push_input,
spa_videotestsrc_node_port_pull_output, spa_videotestsrc_node_port_pull_output,
spa_videotestsrc_node_port_reuse_buffer, spa_videotestsrc_node_port_reuse_buffer,
spa_videotestsrc_node_port_push_event, spa_videotestsrc_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -618,10 +618,10 @@ spa_volume_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_volume_node_port_push_event (SpaNode *node, spa_volume_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -650,7 +650,7 @@ static const SpaNode volume_node = {
spa_volume_node_port_push_input, spa_volume_node_port_push_input,
spa_volume_node_port_pull_output, spa_volume_node_port_pull_output,
spa_volume_node_port_reuse_buffer, spa_volume_node_port_reuse_buffer,
spa_volume_node_port_push_event, spa_volume_node_port_send_command,
}; };
static SpaResult static SpaResult

View file

@ -500,10 +500,10 @@ spa_xv_sink_node_port_reuse_buffer (SpaNode *node,
} }
static SpaResult static SpaResult
spa_xv_sink_node_port_push_event (SpaNode *node, spa_xv_sink_node_port_send_command (SpaNode *node,
SpaDirection direction, SpaDirection direction,
uint32_t port_id, uint32_t port_id,
SpaNodeEvent *event) SpaNodeCommand *command)
{ {
return SPA_RESULT_NOT_IMPLEMENTED; return SPA_RESULT_NOT_IMPLEMENTED;
} }
@ -532,7 +532,7 @@ static const SpaNode xvsink_node = {
spa_xv_sink_node_port_push_input, spa_xv_sink_node_port_push_input,
spa_xv_sink_node_port_pull_output, spa_xv_sink_node_port_pull_output,
spa_xv_sink_node_port_reuse_buffer, spa_xv_sink_node_port_reuse_buffer,
spa_xv_sink_node_port_push_event, spa_xv_sink_node_port_send_command,
}; };
static SpaResult static SpaResult