From c8648eaf5989dcd2a01bef9ff59cb3cf1fd30303 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 17 Jan 2017 15:02:06 +0100 Subject: [PATCH] only send events Only send events between client and server and use the socket simply to wake up the other end. --- pinos/client/stream.c | 74 ++++++++++++--------- pinos/client/transport.h | 6 -- pinos/server/client-node.c | 127 +++++++++++++------------------------ 3 files changed, 90 insertions(+), 117 deletions(-) diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 7435dc2a9..6a2e5a81c 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -311,7 +311,27 @@ static inline void send_need_input (PinosStream *stream) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - uint8_t cmd = PINOS_TRANSPORT_CMD_NEED_DATA; + SpaNodeEventNeedInput ni; + uint8_t cmd = 0; + + ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; + ni.event.size = sizeof (ni); + ni.port_id = 0; + pinos_transport_add_event (impl->trans, &ni.event); + write (impl->rtfd, &cmd, 1); +} + +static inline void +send_have_output (PinosStream *stream) +{ + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + SpaNodeEventHaveOutput ho; + uint8_t cmd = 0; + + ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; + ho.event.size = sizeof (ho); + ho.port_id = 0; + pinos_transport_add_event (impl->trans, &ho.event); write (impl->rtfd, &cmd, 1); } @@ -419,8 +439,24 @@ handle_rtnode_event (PinosStream *stream, switch (event->type) { case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: + { + int i; + + for (i = 0; i < impl->trans->area->n_inputs; i++) { + SpaPortInput *input = &impl->trans->inputs[i]; + + if (input->buffer_id == SPA_ID_INVALID) + continue; + + pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id); + input->buffer_id = SPA_ID_INVALID; + } + send_need_input (stream); + break; + } + case SPA_NODE_EVENT_TYPE_NEED_INPUT: - pinos_log_warn ("unhandled node event %d", event->type); + pinos_signal_emit (&stream->need_buffer, stream); break; case SPA_NODE_EVENT_TYPE_REUSE_BUFFER: @@ -459,33 +495,15 @@ on_rtsocket_condition (SpaSource *source, } if (mask & SPA_IO_IN) { + SpaNodeEvent event; uint8_t cmd; - int i; read (impl->rtfd, &cmd, 1); - if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) { - for (i = 0; i < impl->trans->area->n_inputs; i++) { - SpaPortInput *input = &impl->trans->inputs[i]; - - if (input->buffer_id == SPA_ID_INVALID) - continue; - - pinos_signal_emit (&stream->new_buffer, stream, input->buffer_id); - input->buffer_id = SPA_ID_INVALID; - } - send_need_input (stream); - } - if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) { - SpaNodeEvent event; - while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) { - SpaNodeEvent *ev = alloca (event.size); - pinos_transport_parse_event (impl->trans, ev); - handle_rtnode_event (stream, ev); - } - } - if (cmd & PINOS_TRANSPORT_CMD_NEED_DATA) { - pinos_signal_emit (&stream->need_buffer, stream); + while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) { + SpaNodeEvent *ev = alloca (event.size); + pinos_transport_parse_event (impl->trans, ev); + handle_rtnode_event (stream, ev); } } } @@ -1034,7 +1052,7 @@ pinos_stream_recycle_buffer (PinosStream *stream, { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventReuseBuffer rb; - uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT; + uint8_t cmd = 0; rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; rb.event.size = sizeof (rb); @@ -1088,14 +1106,12 @@ pinos_stream_send_buffer (PinosStream *stream, { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); BufferId *bid; - uint8_t cmd = PINOS_TRANSPORT_CMD_HAVE_DATA; if ((bid = find_buffer (stream, id))) { bid->used = true; impl->trans->outputs[0].buffer_id = id; impl->trans->outputs[0].status = SPA_RESULT_OK; - if (write (impl->rtfd, &cmd, 1) < 1) - perror ("write"); + send_have_output (stream); } return true; diff --git a/pinos/client/transport.h b/pinos/client/transport.h index 02e06dbd2..2f415097a 100644 --- a/pinos/client/transport.h +++ b/pinos/client/transport.h @@ -36,12 +36,6 @@ typedef struct _PinosTransportArea PinosTransportArea; #include #include -#define PINOS_TRANSPORT_CMD_NONE 0 -#define PINOS_TRANSPORT_CMD_NEED_DATA (1<<0) -#define PINOS_TRANSPORT_CMD_HAVE_DATA (1<<1) -#define PINOS_TRANSPORT_CMD_HAVE_EVENT (1<<2) -#define PINOS_TRANSPORT_CMD_SYNC (1<<3) - typedef struct { int memfd; off_t offset; diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 592a3c228..7748f2875 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -166,6 +166,34 @@ spa_proxy_node_set_props (SpaNode *node, return SPA_RESULT_NOT_IMPLEMENTED; } +static void +send_need_input (SpaProxy *this) +{ + PinosNode *pnode = this->pnode; + SpaNodeEventNeedInput ni; + uint8_t cmd = 0; + + ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; + ni.event.size = sizeof (ni); + ni.port_id = 0; + pinos_transport_add_event (pnode->transport, &ni.event); + write (this->data_source.fd, &cmd, 1); +} + +static void +send_have_output (SpaProxy *this) +{ + PinosNode *pnode = this->pnode; + SpaNodeEventHaveOutput ho; + uint8_t cmd = 0; + + ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; + ho.event.size = sizeof (ho); + ho.port_id = 0; + pinos_transport_add_event (pnode->transport, &ho.event); + write (this->data_source.fd, &cmd, 1); +} + static SpaResult spa_proxy_node_send_command (SpaNode *node, SpaNodeCommand *command) @@ -201,10 +229,9 @@ spa_proxy_node_send_command (SpaNode *node, PINOS_MESSAGE_NODE_COMMAND, &cnc, true); - if (command->type == SPA_NODE_COMMAND_START) { - uint8_t cmd = PINOS_TRANSPORT_CMD_NEED_DATA; - write (this->data_source.fd, &cmd, 1); - } + if (command->type == SPA_NODE_COMMAND_START) + send_need_input (this); + res = SPA_RESULT_RETURN_ASYNC (cnc.seq); break; } @@ -830,7 +857,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, SpaProxy *this; SpaNodeEventReuseBuffer rb; PinosNode *pnode; - uint8_t cmd; + uint8_t cmd = 0; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -846,8 +873,6 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, rb.port_id = port_id; rb.buffer_id = buffer_id; pinos_transport_add_event (pnode->transport, &rb.event); - - cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT; write (this->data_source.fd, &cmd, 1); return SPA_RESULT_OK; @@ -890,39 +915,13 @@ static SpaResult spa_proxy_node_process_input (SpaNode *node) { SpaProxy *this; - uint8_t cmd; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = SPA_CONTAINER_OF (node, SpaProxy, node); -#if 0 - { - unsigned int i; - bool have_error = false; - - for (i = 0; i < this->n_inputs; i++) { - SpaProxyPort *port = &this->in_ports[i]; - SpaPortInput *input; - - if ((input = port->io) == NULL) - continue; - - if (!CHECK_PORT_BUFFER (this, input->buffer_id, port)) { - input->status = SPA_RESULT_INVALID_BUFFER_ID; - have_error = true; - continue; - } - copy_meta_out (this, port, input->buffer_id); - } - if (have_error) - return SPA_RESULT_ERROR; - } -#endif - - cmd = PINOS_TRANSPORT_CMD_HAVE_DATA; - write (this->data_source.fd, &cmd, 1); + send_have_output (this); return SPA_RESULT_OK; } @@ -931,37 +930,13 @@ static SpaResult spa_proxy_node_process_output (SpaNode *node) { SpaProxy *this; - uint8_t cmd; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; this = SPA_CONTAINER_OF (node, SpaProxy, node); -#if 0 - { - unsigned int i; - bool have_error = false; - - for (i = 0; i < this->n_outputs; i++) { - SpaProxyPort *port = &this->out_ports[i]; - SpaPortOutput *output; - - if ((output = port->io) == NULL) - continue; - - copy_meta_in (this, port, output->buffer_id); - - if (output->status != SPA_RESULT_OK) - have_error = true; - } - if (have_error) - return SPA_RESULT_ERROR; - } -#endif - - cmd = PINOS_TRANSPORT_CMD_NEED_DATA; - write (this->data_source.fd, &cmd, 1); + send_need_input (this); return SPA_RESULT_OK; } @@ -1078,33 +1053,21 @@ proxy_on_data_fd_events (SpaSource *source) SpaProxy *this = source->data; PinosNode *pnode = this->pnode; + if (source->rmask & (SPA_IO_ERR | SPA_IO_HUP)) { + spa_log_warn (this->log, "proxy %p: got error", this); + return; + } + if (source->rmask & SPA_IO_IN) { + SpaNodeEvent event; uint8_t cmd; - if (read (this->data_source.fd, &cmd, 1) < 1) - return; + read (this->data_source.fd, &cmd, 1); - if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) { - SpaNodeEvent event; - while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) { - SpaNodeEvent *ev = alloca (event.size); - pinos_transport_parse_event (pnode->transport, ev); - this->event_cb (&this->node, ev, this->user_data); - } - } - if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) { - SpaNodeEventHaveOutput ho; - ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; - ho.event.size = sizeof (ho); - ho.port_id = 0; - this->event_cb (&this->node, &ho.event, this->user_data); - } - if (cmd & PINOS_TRANSPORT_CMD_NEED_DATA) { - SpaNodeEventNeedInput ni; - ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; - ni.event.size = sizeof (ni); - ni.port_id = 0; - this->event_cb (&this->node, &ni.event, this->user_data); + while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) { + SpaNodeEvent *ev = alloca (event.size); + pinos_transport_parse_event (pnode->transport, ev); + this->event_cb (&this->node, ev, this->user_data); } } }