mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-02 09:01:50 -05:00
simplify events and commands
This commit is contained in:
parent
0373f73bac
commit
d3dd90bb05
25 changed files with 220 additions and 252 deletions
|
|
@ -148,15 +148,13 @@ enum
|
|||
static void
|
||||
send_async_complete (SpaProxy *this, uint32_t seq, SpaResult res)
|
||||
{
|
||||
SpaNodeEvent event;
|
||||
SpaNodeEventAsyncComplete ac;
|
||||
|
||||
event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
|
||||
event.data = ∾
|
||||
event.size = sizeof (ac);
|
||||
ac.event.type = SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE;
|
||||
ac.event.size = sizeof (ac);
|
||||
ac.seq = seq;
|
||||
ac.res = res;
|
||||
this->event_cb (&this->node, &event, this->user_data);
|
||||
this->event_cb (&this->node, &ac.event, this->user_data);
|
||||
}
|
||||
|
||||
static SpaResult
|
||||
|
|
@ -220,8 +218,7 @@ spa_proxy_node_send_command (SpaNode *node,
|
|||
if (!pinos_connection_flush (this->conn)) {
|
||||
spa_log_error (this->log, "proxy %p: error writing connection\n", this);
|
||||
res = SPA_RESULT_ERROR;
|
||||
}
|
||||
else
|
||||
} else
|
||||
res = SPA_RESULT_RETURN_ASYNC (cnc.seq);
|
||||
break;
|
||||
}
|
||||
|
|
@ -238,7 +235,6 @@ spa_proxy_node_send_command (SpaNode *node,
|
|||
spa_log_error (this->log, "proxy %p: error writing connection\n", this);
|
||||
res = SPA_RESULT_ERROR;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -960,7 +956,6 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
|
|||
{
|
||||
SpaProxy *this;
|
||||
PinosControlCmdNodeEvent cne;
|
||||
SpaNodeEvent ne;
|
||||
SpaNodeEventReuseBuffer rb;
|
||||
|
||||
if (node == NULL)
|
||||
|
|
@ -972,10 +967,9 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
|
|||
return SPA_RESULT_INVALID_PORT;
|
||||
|
||||
/* send start */
|
||||
cne.event = ≠
|
||||
ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
|
||||
ne.data = &rb;
|
||||
ne.size = sizeof (rb);
|
||||
cne.event = &rb.event;
|
||||
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
|
||||
rb.event.size = sizeof (rb);
|
||||
rb.port_id = port_id;
|
||||
rb.buffer_id = buffer_id;
|
||||
pinos_connection_add_cmd (this->rtconn, PINOS_CONTROL_CMD_NODE_EVENT, &cne);
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ loop (void *user_data)
|
|||
while (spa_ringbuffer_get_read_offset (&priv->buffer, &offset) > 0) {
|
||||
InvokeItem *item = SPA_MEMBER (priv->buffer_data, offset, InvokeItem);
|
||||
g_debug ("data-loop %p: invoke %d", this, item->seq);
|
||||
item->func (p, item->seq, item->size, item->data, item->user_data);
|
||||
item->func (p, true, item->seq, item->size, item->data, item->user_data);
|
||||
spa_ringbuffer_read_advance (&priv->buffer, item->item_size);
|
||||
}
|
||||
continue;
|
||||
|
|
@ -313,36 +313,43 @@ do_invoke (SpaPoll *poll,
|
|||
{
|
||||
PinosDataLoop *this = SPA_CONTAINER_OF (poll, PinosDataLoop, poll);
|
||||
PinosDataLoopPrivate *priv = this->priv;
|
||||
gboolean in_thread = pthread_equal (priv->thread, pthread_self());
|
||||
SpaRingbufferArea areas[2];
|
||||
InvokeItem *item;
|
||||
SpaResult res;
|
||||
|
||||
spa_ringbuffer_get_write_areas (&priv->buffer, areas);
|
||||
if (areas[0].len < sizeof (InvokeItem)) {
|
||||
g_warning ("queue full");
|
||||
return SPA_RESULT_ERROR;
|
||||
}
|
||||
item = SPA_MEMBER (priv->buffer_data, areas[0].offset, InvokeItem);
|
||||
item->seq = seq;
|
||||
item->func = func;
|
||||
item->user_data = user_data;
|
||||
item->size = size;
|
||||
|
||||
if (areas[0].len > sizeof (InvokeItem) + size) {
|
||||
item->data = SPA_MEMBER (item, sizeof (InvokeItem), void);
|
||||
item->item_size = sizeof (InvokeItem) + size;
|
||||
if (areas[0].len < sizeof (InvokeItem) + item->item_size)
|
||||
item->item_size = areas[0].len;
|
||||
if (in_thread) {
|
||||
res = func (poll, false, seq, size, data, user_data);
|
||||
} else {
|
||||
item->data = SPA_MEMBER (priv->buffer_data, areas[1].offset, void);
|
||||
item->item_size = areas[0].len + 1 + size;
|
||||
spa_ringbuffer_get_write_areas (&priv->buffer, areas);
|
||||
if (areas[0].len < sizeof (InvokeItem)) {
|
||||
g_warning ("queue full");
|
||||
return SPA_RESULT_ERROR;
|
||||
}
|
||||
item = SPA_MEMBER (priv->buffer_data, areas[0].offset, InvokeItem);
|
||||
item->seq = seq;
|
||||
item->func = func;
|
||||
item->user_data = user_data;
|
||||
item->size = size;
|
||||
|
||||
if (areas[0].len > sizeof (InvokeItem) + size) {
|
||||
item->data = SPA_MEMBER (item, sizeof (InvokeItem), void);
|
||||
item->item_size = sizeof (InvokeItem) + size;
|
||||
if (areas[0].len < sizeof (InvokeItem) + item->item_size)
|
||||
item->item_size = areas[0].len;
|
||||
} else {
|
||||
item->data = SPA_MEMBER (priv->buffer_data, areas[1].offset, void);
|
||||
item->item_size = areas[0].len + 1 + size;
|
||||
}
|
||||
memcpy (item->data, data, size);
|
||||
|
||||
spa_ringbuffer_write_advance (&priv->buffer, item->item_size);
|
||||
|
||||
wakeup_thread (this);
|
||||
|
||||
res = SPA_RESULT_RETURN_ASYNC (seq);
|
||||
}
|
||||
memcpy (item->data, data, size);
|
||||
|
||||
spa_ringbuffer_write_advance (&priv->buffer, item->item_size);
|
||||
|
||||
wakeup_thread (this);
|
||||
|
||||
return SPA_RESULT_RETURN_ASYNC (seq);
|
||||
return res;
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
|||
|
|
@ -62,10 +62,6 @@ struct _PinosDataLoopClass {
|
|||
GObjectClass parent_class;
|
||||
};
|
||||
|
||||
typedef void (*PinosCommandFunc) (SpaNodeCommand *command,
|
||||
uint32_t seq,
|
||||
void *user_data);
|
||||
|
||||
/* normal GObject stuff */
|
||||
GType pinos_data_loop_get_type (void);
|
||||
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ main_loop_dispatch (SpaPollNotifyData *data)
|
|||
|
||||
while (spa_ringbuffer_get_read_offset (&priv->buffer, &offset) > 0) {
|
||||
item = SPA_MEMBER (priv->buffer_data, offset, InvokeItem);
|
||||
item->func (p, item->seq, item->size, item->data, item->user_data);
|
||||
item->func (p, true, item->seq, item->size, item->data, item->user_data);
|
||||
spa_ringbuffer_read_advance (&priv->buffer, item->item_size);
|
||||
}
|
||||
|
||||
|
|
@ -184,38 +184,45 @@ do_invoke (SpaPoll *poll,
|
|||
{
|
||||
PinosMainLoop *this = SPA_CONTAINER_OF (poll, PinosMainLoop, poll);
|
||||
PinosMainLoopPrivate *priv = this->priv;
|
||||
gboolean in_thread = FALSE;
|
||||
SpaRingbufferArea areas[2];
|
||||
InvokeItem *item;
|
||||
uint64_t u = 1;
|
||||
SpaResult res;
|
||||
|
||||
spa_ringbuffer_get_write_areas (&priv->buffer, areas);
|
||||
if (areas[0].len < sizeof (InvokeItem)) {
|
||||
g_warning ("queue full");
|
||||
return SPA_RESULT_ERROR;
|
||||
}
|
||||
item = SPA_MEMBER (priv->buffer_data, areas[0].offset, InvokeItem);
|
||||
item->seq = seq;
|
||||
item->func = func;
|
||||
item->user_data = user_data;
|
||||
item->size = size;
|
||||
|
||||
if (areas[0].len > sizeof (InvokeItem) + size) {
|
||||
item->data = SPA_MEMBER (item, sizeof (InvokeItem), void);
|
||||
item->item_size = sizeof (InvokeItem) + size;
|
||||
if (areas[0].len < sizeof (InvokeItem) + item->item_size)
|
||||
item->item_size = areas[0].len;
|
||||
if (in_thread) {
|
||||
res = func (poll, false, seq, size, data, user_data);
|
||||
} else {
|
||||
item->data = SPA_MEMBER (priv->buffer_data, areas[1].offset, void);
|
||||
item->item_size = areas[0].len + 1 + size;
|
||||
spa_ringbuffer_get_write_areas (&priv->buffer, areas);
|
||||
if (areas[0].len < sizeof (InvokeItem)) {
|
||||
g_warning ("queue full");
|
||||
return SPA_RESULT_ERROR;
|
||||
}
|
||||
item = SPA_MEMBER (priv->buffer_data, areas[0].offset, InvokeItem);
|
||||
item->seq = seq;
|
||||
item->func = func;
|
||||
item->user_data = user_data;
|
||||
item->size = size;
|
||||
|
||||
if (areas[0].len > sizeof (InvokeItem) + size) {
|
||||
item->data = SPA_MEMBER (item, sizeof (InvokeItem), void);
|
||||
item->item_size = sizeof (InvokeItem) + size;
|
||||
if (areas[0].len < sizeof (InvokeItem) + item->item_size)
|
||||
item->item_size = areas[0].len;
|
||||
} else {
|
||||
item->data = SPA_MEMBER (priv->buffer_data, areas[1].offset, void);
|
||||
item->item_size = areas[0].len + 1 + size;
|
||||
}
|
||||
memcpy (item->data, data, size);
|
||||
|
||||
spa_ringbuffer_write_advance (&priv->buffer, item->item_size);
|
||||
|
||||
if (write (priv->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
|
||||
g_warning ("data-loop %p: failed to write fd", strerror (errno));
|
||||
|
||||
res = SPA_RESULT_RETURN_ASYNC (seq);
|
||||
}
|
||||
memcpy (item->data, data, size);
|
||||
|
||||
spa_ringbuffer_write_advance (&priv->buffer, item->item_size);
|
||||
|
||||
if (write (priv->fds[0].fd, &u, sizeof(uint64_t)) != sizeof(uint64_t))
|
||||
g_warning ("data-loop %p: failed to write fd", strerror (errno));
|
||||
|
||||
return SPA_RESULT_RETURN_ASYNC (seq);
|
||||
return res;
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
|
|||
|
|
@ -62,9 +62,6 @@ struct _PinosMainLoopClass {
|
|||
GObjectClass parent_class;
|
||||
};
|
||||
|
||||
typedef void (*PinosEventFunc) (SpaNodeEvent *event,
|
||||
void *user_data);
|
||||
|
||||
typedef void (*PinosDeferFunc) (gpointer obj,
|
||||
gpointer data,
|
||||
SpaResult res,
|
||||
|
|
|
|||
|
|
@ -240,8 +240,7 @@ pause_node (PinosNode *this)
|
|||
g_debug ("node %p: pause node", this);
|
||||
|
||||
cmd.type = SPA_NODE_COMMAND_PAUSE;
|
||||
cmd.data = NULL;
|
||||
cmd.size = 0;
|
||||
cmd.size = sizeof (cmd);
|
||||
if ((res = spa_node_send_command (this->node, &cmd)) < 0)
|
||||
g_debug ("got error %d", res);
|
||||
|
||||
|
|
@ -257,8 +256,7 @@ start_node (PinosNode *this)
|
|||
g_debug ("node %p: start node", this);
|
||||
|
||||
cmd.type = SPA_NODE_COMMAND_START;
|
||||
cmd.data = NULL;
|
||||
cmd.size = 0;
|
||||
cmd.size = sizeof (cmd);
|
||||
if ((res = spa_node_send_command (this->node, &cmd)) < 0)
|
||||
g_debug ("got error %d", res);
|
||||
|
||||
|
|
@ -300,14 +298,11 @@ suspend_node (PinosNode *this)
|
|||
static void
|
||||
send_clock_update (PinosNode *this)
|
||||
{
|
||||
SpaNodeCommand cmd;
|
||||
SpaNodeCommandClockUpdate cu;
|
||||
SpaResult res;
|
||||
|
||||
cmd.type = SPA_NODE_COMMAND_CLOCK_UPDATE;
|
||||
cmd.data = &cu;
|
||||
cmd.size = sizeof (cu);
|
||||
|
||||
cu.command.type = SPA_NODE_COMMAND_CLOCK_UPDATE;
|
||||
cu.command.size = sizeof (cu);
|
||||
cu.flags = 0;
|
||||
cu.change_mask = SPA_NODE_COMMAND_CLOCK_UPDATE_TIME |
|
||||
SPA_NODE_COMMAND_CLOCK_UPDATE_SCALE |
|
||||
|
|
@ -324,7 +319,7 @@ send_clock_update (PinosNode *this)
|
|||
cu.scale = (1 << 16) | 1;
|
||||
cu.state = SPA_CLOCK_STATE_RUNNING;
|
||||
|
||||
if ((res = spa_node_send_command (this->node, &cmd)) < 0)
|
||||
if ((res = spa_node_send_command (this->node, &cu.command)) < 0)
|
||||
g_debug ("got error %d", res);
|
||||
}
|
||||
|
||||
|
|
@ -374,7 +369,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
|
|||
|
||||
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
|
||||
{
|
||||
SpaNodeEventAsyncComplete *ac = event->data;
|
||||
SpaNodeEventAsyncComplete *ac = (SpaNodeEventAsyncComplete *) event;
|
||||
|
||||
g_debug ("node %p: async complete event %d %d", this, ac->seq, ac->res);
|
||||
pinos_main_loop_defer_complete (priv->main_loop, this, ac->seq, ac->res);
|
||||
|
|
@ -384,7 +379,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
|
|||
|
||||
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
|
||||
{
|
||||
SpaNodeEventNeedInput *ni = event->data;
|
||||
SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event;
|
||||
PinosPort *p;
|
||||
guint i;
|
||||
|
||||
|
|
@ -401,7 +396,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
|
|||
}
|
||||
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
|
||||
{
|
||||
SpaNodeEventHaveOutput *ho = event->data;
|
||||
SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event;
|
||||
SpaPortOutputInfo oinfo[1] = { 0, };
|
||||
SpaResult res;
|
||||
gboolean pushed = FALSE;
|
||||
|
|
@ -440,7 +435,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
|
|||
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
|
||||
{
|
||||
SpaResult res;
|
||||
SpaNodeEventReuseBuffer *rb = event->data;
|
||||
SpaNodeEventReuseBuffer *rb = (SpaNodeEventReuseBuffer *) event;
|
||||
PinosPort *p;
|
||||
guint i;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue