From 7a9dc2c4fdbdaf881a29eab6b3a192500665f666 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 18 Jan 2017 18:29:15 +0100 Subject: [PATCH] optimize data transport Remove queue and ringbuffer between nodes. transfer the buffer id directly between the io areas when possible. Let only pinos send push or pull requests for now. Allow polling multiple fds, like how alsa wants it Remove port_id from events. --- pinos/client/loop.c | 89 ++++++++++------ pinos/client/stream.c | 66 +++++++----- pinos/client/transport.c | 13 +++ pinos/gst/gstpinossink.c | 14 +-- pinos/server/client-node.c | 31 +++--- pinos/server/data-loop.c | 3 +- pinos/server/link.c | 3 - pinos/server/link.h | 3 - pinos/server/node.c | 132 ++++++++++++------------ spa/include/spa/loop.h | 1 - spa/include/spa/node-event.h | 2 - spa/plugins/alsa/alsa-sink.c | 5 +- spa/plugins/alsa/alsa-source.c | 2 +- spa/plugins/alsa/alsa-utils.c | 111 ++++++++++---------- spa/plugins/audiomixer/audiomixer.c | 1 - spa/plugins/audiotestsrc/audiotestsrc.c | 1 - spa/plugins/v4l2/v4l2-utils.c | 1 - spa/plugins/videotestsrc/videotestsrc.c | 1 - 18 files changed, 265 insertions(+), 214 deletions(-) diff --git a/pinos/client/loop.c b/pinos/client/loop.c index ba62eaeef..db7684773 100644 --- a/pinos/client/loop.c +++ b/pinos/client/loop.c @@ -83,31 +83,58 @@ typedef struct { uint8_t buffer_data[DATAS_SIZE]; } PinosLoopImpl; +static inline uint32_t +spa_io_to_epoll (SpaIO mask) +{ + uint32_t events = 0; + + if (mask & SPA_IO_IN) + events |= EPOLLIN; + if (mask & SPA_IO_OUT) + events |= EPOLLOUT; + if (mask & SPA_IO_ERR) + events |= EPOLLERR; + if (mask & SPA_IO_HUP) + events |= EPOLLHUP; + + return events; +} + +static inline SpaIO +spa_epoll_to_io (uint32_t events) +{ + SpaIO mask = 0; + + if (events & EPOLLIN) + mask |= SPA_IO_IN; + if (events & EPOLLOUT) + mask |= SPA_IO_OUT; + if (events & EPOLLHUP) + mask |= SPA_IO_HUP; + if (events & EPOLLERR) + mask |= SPA_IO_ERR; + + return mask; +} + static SpaResult loop_add_source (SpaLoop *loop, SpaSource *source) { PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); - struct epoll_event ep; source->loop = loop; if (source->fd != -1) { + struct epoll_event ep; + spa_zero (ep); - if (source->mask & SPA_IO_IN) - ep.events |= EPOLLIN; - if (source->mask & SPA_IO_OUT) - ep.events |= EPOLLOUT; - if (source->mask & SPA_IO_ERR) - ep.events |= EPOLLERR; - if (source->mask & SPA_IO_HUP) - ep.events |= EPOLLHUP; + ep.events = spa_io_to_epoll (source->mask); ep.data.ptr = source; if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0) return SPA_RESULT_ERRNO; } - return SPA_RESULT_OK; } @@ -121,14 +148,7 @@ loop_update_source (SpaSource *source) struct epoll_event ep; spa_zero (ep); - if (source->mask & SPA_IO_IN) - ep.events |= EPOLLIN; - if (source->mask & SPA_IO_OUT) - ep.events |= EPOLLOUT; - if (source->mask & SPA_IO_ERR) - ep.events |= EPOLLERR; - if (source->mask & SPA_IO_HUP) - ep.events |= EPOLLHUP; + ep.events = spa_io_to_epoll (source->mask); ep.data.ptr = source; if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0) @@ -240,10 +260,6 @@ loop_enter (SpaLoopControl *ctrl) { PinosLoopImpl *impl = SPA_CONTAINER_OF (ctrl, PinosLoopImpl, control); impl->thread = pthread_self(); - if (impl->event == NULL) - impl->event = spa_loop_utils_add_event (&impl->utils, - event_func, - impl); } static void @@ -275,20 +291,19 @@ loop_iterate (SpaLoopControl *ctrl, return SPA_RESULT_ERRNO; } + /* first we set all the rmasks, then call the callbacks. The reason is that + * some callback might also want to look at other sources it manages and + * can then reset the rmask to suppress the callback */ for (i = 0; i < nfds; i++) { SpaSource *source = ep[i].data.ptr; - - source->rmask = 0; - if (ep[i].events & EPOLLIN) - source->rmask |= SPA_IO_IN; - if (ep[i].events & EPOLLOUT) - source->rmask |= SPA_IO_OUT; - if (ep[i].events & EPOLLHUP) - source->rmask |= SPA_IO_HUP; - if (ep[i].events & EPOLLERR) - source->rmask |= SPA_IO_ERR; - - source->func (source); + source->rmask = spa_epoll_to_io (ep[i].events); + } + for (i = 0; i < nfds; i++) { + SpaSource *source = ep[i].data.ptr; + if (source->rmask) { + source->func (source); + source->rmask = 0; + } } return SPA_RESULT_OK; } @@ -610,6 +625,10 @@ pinos_loop_new (void) spa_ringbuffer_init (&impl->buffer, DATAS_SIZE); + impl->event = spa_loop_utils_add_event (&impl->utils, + event_func, + impl); + return this; no_epoll: @@ -628,6 +647,8 @@ pinos_loop_destroy (PinosLoop *loop) spa_list_for_each_safe (source, tmp, &impl->source_list, link) loop_destroy_source (&source->source); + pinos_loop_destroy_source (loop, impl->event); + close (impl->epoll_fd); free (impl); } diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 6a2e5a81c..0a45a7545 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -53,7 +53,6 @@ typedef struct { bool used; void *buf_ptr; SpaBuffer *buf; - SpaData *datas; } BufferId; typedef struct @@ -310,29 +309,35 @@ add_port_update (PinosStream *stream, uint32_t change_mask, bool flush) static inline void send_need_input (PinosStream *stream) { +#if 0 PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventNeedInput ni; - uint8_t cmd = 0; + uint64_t cmd = 1; + + pinos_log_debug ("stream %p: need input", stream); ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = 0; pinos_transport_add_event (impl->trans, &ni.event); - write (impl->rtfd, &cmd, 1); + write (impl->rtfd, &cmd, 8); +#endif } static inline void send_have_output (PinosStream *stream) { +#if 0 PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventHaveOutput ho; - uint8_t cmd = 0; + uint64_t cmd = 1; + + pinos_log_debug ("stream %p: have output", stream); ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; pinos_transport_add_event (impl->trans, &ho.event); - write (impl->rtfd, &cmd, 1); + write (impl->rtfd, &cmd, 8); +#endif } static void @@ -442,6 +447,8 @@ handle_rtnode_event (PinosStream *stream, { int i; + //pinos_log_debug ("stream %p: have output", stream); + for (i = 0; i < impl->trans->area->n_inputs; i++) { SpaPortInput *input = &impl->trans->inputs[i]; @@ -456,6 +463,7 @@ handle_rtnode_event (PinosStream *stream, } case SPA_NODE_EVENT_TYPE_NEED_INPUT: + //pinos_log_debug ("stream %p: need input", stream); pinos_signal_emit (&stream->need_buffer, stream); break; @@ -469,7 +477,7 @@ handle_rtnode_event (PinosStream *stream, if (impl->direction != SPA_DIRECTION_OUTPUT) break; - if ((bid = find_buffer (stream, p->buffer_id))) { + if ((bid = find_buffer (stream, p->buffer_id)) && bid->used) { bid->used = false; pinos_signal_emit (&stream->new_buffer, stream, p->buffer_id); } @@ -481,6 +489,17 @@ handle_rtnode_event (PinosStream *stream, } } +static void +unhandle_socket (PinosStream *stream) +{ + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + + if (impl->rtsocket_source) { + pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source); + impl->rtsocket_source = NULL; + } +} + static void on_rtsocket_condition (SpaSource *source, int fd, @@ -492,13 +511,15 @@ on_rtsocket_condition (SpaSource *source, if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pinos_log_warn ("got error"); + unhandle_socket (stream); + return; } if (mask & SPA_IO_IN) { SpaNodeEvent event; - uint8_t cmd; + uint64_t cmd; - read (impl->rtfd, &cmd, 1); + read (impl->rtfd, &cmd, 8); while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) { SpaNodeEvent *ev = alloca (event.size); @@ -535,17 +556,6 @@ handle_socket (PinosStream *stream, int rtfd) return; } -static void -unhandle_socket (PinosStream *stream) -{ - PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - - if (impl->rtsocket_source) { - pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source); - impl->rtsocket_source = NULL; - } -} - static void handle_node_event (PinosStream *stream, SpaNodeEvent *event) @@ -724,6 +734,7 @@ stream_dispatch_func (void *object, } len = pinos_array_get_len (&impl->buffer_ids, BufferId); bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId)); + bid->used = false; b = p->buffers[i].buffer; @@ -1052,14 +1063,14 @@ pinos_stream_recycle_buffer (PinosStream *stream, { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventReuseBuffer rb; - uint8_t cmd = 0; + uint64_t cmd = 1; rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; rb.event.size = sizeof (rb); rb.port_id = impl->port_id; rb.buffer_id = id; pinos_transport_add_event (impl->trans, &rb.event); - write (impl->rtfd, &cmd, 1); + write (impl->rtfd, &cmd, 8); return true; } @@ -1107,11 +1118,18 @@ pinos_stream_send_buffer (PinosStream *stream, PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); BufferId *bid; - if ((bid = find_buffer (stream, id))) { + if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) { + pinos_log_debug ("can't send %u, pending buffer %u", id, impl->trans->outputs[0].buffer_id); + return false; + } + + if ((bid = find_buffer (stream, id)) && !bid->used) { bid->used = true; impl->trans->outputs[0].buffer_id = id; impl->trans->outputs[0].status = SPA_RESULT_OK; send_have_output (stream); + } else { + pinos_log_debug ("stream %p: output %u was used", stream, id); } return true; diff --git a/pinos/client/transport.c b/pinos/client/transport.c index e8c9eb442..a73a7cdb3 100644 --- a/pinos/client/transport.c +++ b/pinos/client/transport.c @@ -60,14 +60,27 @@ static void transport_setup_area (void *p, PinosTransport *trans) { PinosTransportArea *a; + int i; trans->area = a = p; p = SPA_MEMBER (p, sizeof (PinosTransportArea), SpaPortInput); trans->inputs = p; + for (i = 0; i < a->max_inputs; i++) { + trans->inputs[i].state = SPA_PORT_STATE_FLAG_NONE; + trans->inputs[i].flags = SPA_PORT_INPUT_FLAG_NONE; + trans->inputs[i].buffer_id = SPA_ID_INVALID; + trans->inputs[i].status = SPA_RESULT_OK; + } p = SPA_MEMBER (p, a->max_inputs * sizeof (SpaPortInput), void); trans->outputs = p; + for (i = 0; i < a->max_outputs; i++) { + trans->outputs[i].state = SPA_PORT_STATE_FLAG_NONE; + trans->outputs[i].flags = SPA_PORT_OUTPUT_FLAG_NONE; + trans->outputs[i].buffer_id = SPA_ID_INVALID; + trans->outputs[i].status = SPA_RESULT_OK; + } p = SPA_MEMBER (p, a->max_outputs * sizeof (SpaPortOutput), void); trans->input_buffer = p; diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 86cd2c81d..ce5f02b2c 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -458,7 +458,7 @@ on_new_buffer (PinosListener *listener, GstPinosSink *pinossink = SPA_CONTAINER_OF (listener, GstPinosSink, stream_new_buffer); GstBuffer *buf; - GST_LOG_OBJECT (pinossink, "got new buffer"); + GST_LOG_OBJECT (pinossink, "got new buffer %u", id); if (pinossink->stream == NULL) { GST_LOG_OBJECT (pinossink, "no stream"); return; @@ -498,10 +498,12 @@ do_send_buffer (GstPinosSink *pinossink) d->chunk->size = mem->size; } - if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) + if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) { g_warning ("can't send buffer"); - - pinossink->need_ready--; + gst_buffer_unref (buffer); + pinos_thread_main_loop_signal (pinossink->main_loop, FALSE); + } else + pinossink->need_ready--; } @@ -683,8 +685,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_ref (buffer); g_queue_push_tail (&pinossink->queue, buffer); - if (pinossink->need_ready) - do_send_buffer (pinossink); +// if (pinossink->need_ready) +// do_send_buffer (pinossink); done: pinos_thread_main_loop_unlock (pinossink->main_loop); diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 7748f2875..c81fac8ca 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -171,13 +171,12 @@ send_need_input (SpaProxy *this) { PinosNode *pnode = this->pnode; SpaNodeEventNeedInput ni; - uint8_t cmd = 0; + uint64_t cmd = 1; ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = 0; pinos_transport_add_event (pnode->transport, &ni.event); - write (this->data_source.fd, &cmd, 1); + write (this->data_source.fd, &cmd, 8); } static void @@ -185,13 +184,12 @@ send_have_output (SpaProxy *this) { PinosNode *pnode = this->pnode; SpaNodeEventHaveOutput ho; - uint8_t cmd = 0; + uint64_t cmd = 1; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; pinos_transport_add_event (pnode->transport, &ho.event); - write (this->data_source.fd, &cmd, 1); + write (this->data_source.fd, &cmd, 8); } static SpaResult @@ -857,7 +855,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, SpaProxy *this; SpaNodeEventReuseBuffer rb; PinosNode *pnode; - uint8_t cmd = 0; + //uint64_t cmd = 1; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -873,7 +871,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, rb.port_id = port_id; rb.buffer_id = buffer_id; pinos_transport_add_event (pnode->transport, &rb.event); - write (this->data_source.fd, &cmd, 1); + //write (this->data_source.fd, &cmd, 8); return SPA_RESULT_OK; } @@ -1060,9 +1058,9 @@ proxy_on_data_fd_events (SpaSource *source) if (source->rmask & SPA_IO_IN) { SpaNodeEvent event; - uint8_t cmd; + uint64_t cmd; - read (this->data_source.fd, &cmd, 1); + read (this->data_source.fd, &cmd, 8); while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) { SpaNodeEvent *ev = alloca (event.size); @@ -1338,15 +1336,22 @@ pinos_client_node_get_data_socket (PinosClientNode *this, PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this); if (impl->data_fd == -1) { +#if 1 int fd[2]; - if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0) + if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fd) != 0) return SPA_RESULT_ERRNO; impl->proxy.data_source.fd = fd[0]; - spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source); - pinos_log_debug ("client-node %p: add data fd %d", this, fd[0]); impl->data_fd = fd[1]; +#else + + impl->proxy.data_source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->data_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); +#endif + + spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source); + pinos_log_debug ("client-node %p: add data fd %d", this, impl->proxy.data_source.fd); } *fd = impl->data_fd; return SPA_RESULT_OK; diff --git a/pinos/server/data-loop.c b/pinos/server/data-loop.c index 1a343bf8e..c43843271 100644 --- a/pinos/server/data-loop.c +++ b/pinos/server/data-loop.c @@ -42,6 +42,7 @@ typedef struct pthread_t thread; } PinosDataLoopImpl; + static void make_realtime (PinosDataLoop *this) { @@ -57,7 +58,7 @@ make_realtime (PinosDataLoop *this) spa_zero (sp); sp.sched_priority = rtprio; - if (pthread_setschedparam (pthread_self(), SCHED_RR|SCHED_RESET_ON_FORK, &sp) == 0) { + if (pthread_setschedparam (pthread_self(), SCHED_OTHER|SCHED_RESET_ON_FORK, &sp) == 0) { pinos_log_debug ("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); return; } diff --git a/pinos/server/link.c b/pinos/server/link.c index b3d947cba..52e77205f 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -716,8 +716,6 @@ pinos_link_activate (PinosLink *this) { PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this); - spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue)); - pinos_work_queue_add (impl->work, this, SPA_RESULT_WAIT_SYNC, @@ -729,7 +727,6 @@ pinos_link_activate (PinosLink *this) bool pinos_pinos_link_deactivate (PinosLink *this) { - spa_ringbuffer_clear (&this->ringbuffer); return true; } diff --git a/pinos/server/link.h b/pinos/server/link.h index 44ac1eb72..c9eb161f4 100644 --- a/pinos/server/link.h +++ b/pinos/server/link.h @@ -68,9 +68,6 @@ struct _PinosLink { PinosLink *link, PinosPort *port)); - uint32_t queue[64]; - SpaRingbuffer ringbuffer; - struct { unsigned int in_ready; PinosPort *input; diff --git a/pinos/server/node.c b/pinos/server/node.c index ff2d31f25..f9c7d1a57 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -246,37 +246,6 @@ send_clock_update (PinosNode *this) pinos_log_debug ("got error %d", res); } -static SpaResult -do_read_link (SpaLoop *loop, - bool async, - uint32_t seq, - size_t size, - void *data, - void *user_data) -{ - PinosNode *this = user_data; - PinosLink *link = ((PinosLink**)data)[0]; - size_t offset; - SpaResult res; - - if (link->rt.input == NULL) - return SPA_RESULT_OK; - - while (link->rt.in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) { - SpaPortInput *input = &this->transport->inputs[link->rt.input->port_id]; - - input->buffer_id = link->queue[offset]; - - if ((res = spa_node_process_input (link->rt.input->node->node)) < 0) - pinos_log_warn ("node %p: error pushing buffer: %d, %d", this, res, input->status); - - spa_ringbuffer_read_advance (&link->ringbuffer, 1); - link->rt.in_ready--; - } - return SPA_RESULT_OK; -} - - static void on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) { @@ -303,59 +272,84 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) case SPA_NODE_EVENT_TYPE_NEED_INPUT: { - SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event; - PinosPort *port = this->input_port_map[ni->port_id]; - PinosLink *link; + SpaResult res; + int i; + bool processed = false; - spa_list_for_each (link, &port->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) +// pinos_log_debug ("node %p: need input", this); + + for (i = 0; i < this->transport->area->n_inputs; i++) { + PinosLink *link; + PinosPort *inport, *outport; + SpaPortInput *pi; + SpaPortOutput *po; + + pi = &this->transport->inputs[i]; + if (pi->buffer_id != SPA_ID_INVALID) continue; - link->rt.in_ready++; - pinos_loop_invoke (link->rt.input->node->data_loop->loop, - do_read_link, - SPA_ID_INVALID, - sizeof (PinosLink *), - &link, - link->rt.input->node); + inport = this->input_port_map[i]; + spa_list_for_each (link, &inport->rt.links, rt.input_link) { + if (link->rt.input == NULL || link->rt.output == NULL) + continue; + + outport = link->rt.output; + po = &outport->node->transport->outputs[outport->port_id]; + + if (po->buffer_id != SPA_ID_INVALID) { + processed = true; + + pi->buffer_id = po->buffer_id; + po->buffer_id = SPA_ID_INVALID; + } + if ((res = spa_node_process_output (outport->node->node)) < 0) + pinos_log_warn ("node %p: got process output %d", outport->node, res); + } + } + if (processed) { + if ((res = spa_node_process_input (this->node)) < 0) + pinos_log_warn ("node %p: got process input %d", this, res); } break; } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: { - SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event; SpaResult res; - bool pushed = false; - SpaPortOutput *po = &this->transport->outputs[ho->port_id]; - PinosPort *port = this->output_port_map[ho->port_id]; - PinosLink *link; + int i; + bool processed = false; - spa_list_for_each (link, &port->rt.links, rt.output_link) { - size_t offset; +// pinos_log_debug ("node %p: have output", this); - if (link->rt.input == NULL || link->rt.output == NULL) + for (i = 0; i < this->transport->area->n_outputs; i++) { + PinosLink *link; + PinosPort *inport, *outport; + SpaPortInput *pi; + SpaPortOutput *po; + + po = &this->transport->outputs[i]; + if (po->buffer_id == SPA_ID_INVALID) continue; - if (spa_ringbuffer_get_write_offset (&link->ringbuffer, &offset) > 0) { - link->queue[offset] = po->buffer_id; - spa_ringbuffer_write_advance (&link->ringbuffer, 1); + outport = this->output_port_map[i]; + spa_list_for_each (link, &outport->rt.links, rt.output_link) { + if (link->rt.input == NULL || link->rt.output == NULL) + continue; - pinos_loop_invoke (link->rt.input->node->data_loop->loop, - do_read_link, - SPA_ID_INVALID, - sizeof (PinosLink *), - &link, - link->rt.input->node); - pushed = true; + inport = link->rt.input; + pi = &inport->node->transport->inputs[inport->port_id]; + + processed = true; + + pi->buffer_id = po->buffer_id; + + if ((res = spa_node_process_input (inport->node->node)) < 0) + pinos_log_warn ("node %p: got process input %d", inport->node, res); } + po->buffer_id = SPA_ID_INVALID; } - if (!pushed) { - if ((res = spa_node_port_reuse_buffer (node, ho->port_id, po->buffer_id)) < 0) - pinos_log_warn ("node %p: error reuse buffer: %d", node, res); - } - if ((res = spa_node_process_output (node)) < 0) { - pinos_log_warn ("node %p: got pull error %d, %d", this, res, po->status); - break; + if (processed) { + if ((res = spa_node_process_output (this->node)) < 0) + pinos_log_warn ("node %p: got process output %d", this, res); } break; } @@ -366,6 +360,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) PinosPort *port = this->input_port_map[rb->port_id]; PinosLink *link; +// pinos_log_debug ("node %p: reuse buffer %u", this, rb->buffer_id); + spa_list_for_each (link, &port->rt.links, rt.input_link) { if (link->rt.input == NULL || link->rt.output == NULL) continue; diff --git a/spa/include/spa/loop.h b/spa/include/spa/loop.h index b0f2e22c2..4ebfa03b1 100644 --- a/spa/include/spa/loop.h +++ b/spa/include/spa/loop.h @@ -54,7 +54,6 @@ struct _SpaSource { int fd; SpaIO mask; SpaIO rmask; - void *loop_private; }; typedef SpaResult (*SpaInvokeFunc) (SpaLoop *loop, diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 4833bc102..0b1823ae5 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -78,12 +78,10 @@ typedef struct { typedef struct { SpaNodeEvent event; - uint32_t port_id; } SpaNodeEventHaveOutput; typedef struct { SpaNodeEvent event; - uint32_t port_id; } SpaNodeEventNeedInput; typedef struct { diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 6aa779138..6a5415f5d 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -32,7 +32,7 @@ typedef struct _SpaALSAState SpaALSASink; static const char default_device[] = "default"; -static const uint32_t default_period_size = 128; +static const uint32_t default_period_size = 32; static const uint32_t default_periods = 2; static const bool default_period_event = 0; @@ -691,6 +691,7 @@ spa_alsa_sink_node_process_input (SpaNode *node) b = &this->buffers[input->buffer_id]; if (!b->outstanding) { + input->buffer_id = SPA_ID_INVALID; input->status = SPA_RESULT_INVALID_BUFFER_ID; return SPA_RESULT_ERROR; } @@ -700,7 +701,9 @@ spa_alsa_sink_node_process_input (SpaNode *node) } else { spa_list_insert (this->ready.prev, &b->link); } + //spa_log_debug (this->log, "alsa-source: got buffer %u", input->buffer_id); b->outstanding = false; + input->buffer_id = SPA_ID_INVALID; input->status = SPA_RESULT_OK; return SPA_RESULT_OK; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 5cd5d7296..22a1e9a49 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -39,7 +39,7 @@ update_state (SpaALSASource *this, SpaNodeState state) } static const char default_device[] = "hw:0"; -static const uint32_t default_period_size = 128; +static const uint32_t default_period_size = 32; static const uint32_t default_periods = 2; static const bool default_period_event = 0; diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index f6019ecae..f419f528a 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -175,22 +175,6 @@ spa_alsa_set_format (SpaALSAState *state, SpaFormatAudio *fmt, SpaPortFormatFlag CHECK (snd_pcm_hw_params_set_buffer_size (hndl, params, state->buffer_frames), "set_buffer_size"); -#if 0 - /* set the buffer time */ - buffer_time = props->buffer_time; - CHECK (snd_pcm_hw_params_set_buffer_time_near (hndl, params, &buffer_time, &dir), "set_buffer_time_near"); - - CHECK (snd_pcm_hw_params_get_buffer_size (params, &size), "get_buffer_size"); - state->buffer_frames = size; - - /* set the period time */ - period_time = props->period_time; - CHECK (snd_pcm_hw_params_set_period_time_near (hndl, params, &period_time, &dir), "set_period_time_near"); - - CHECK (snd_pcm_hw_params_get_period_size (params, &size, &dir), "get_period_size"); - state->period_frames = size; -#endif - spa_log_info (state->log, "buffer frames %zd, period frames %zd", state->buffer_frames, state->period_frames); /* write the parameters to device */ @@ -216,22 +200,22 @@ set_swparams (SpaALSAState *state) /* start the transfer */ CHECK (snd_pcm_sw_params_set_start_threshold (hndl, params, 0U), "set_start_threshold"); +#if 1 CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, (state->buffer_frames / state->period_frames) * state->period_frames), "set_stop_threshold"); -// CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, -1), "set_stop_threshold"); +#else + CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, -1), "set_stop_threshold"); +#endif CHECK (snd_pcm_sw_params_set_silence_threshold (hndl, params, 0U), "set_silence_threshold"); + /* enable period events when requested */ + CHECK (snd_pcm_sw_params_set_period_event (hndl, params, props->period_event ? 1 : 0), "set_period_event"); + #if 1 /* allow the transfer when at least period_size samples can be processed */ /* or disable this mechanism when period event is enabled (aka interrupt like style processing) */ - CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, - props->period_event ? state->buffer_frames : state->period_frames), "set_avail_min"); - - /* enable period events when requested */ - if (props->period_event) { - CHECK (snd_pcm_sw_params_set_period_event (hndl, params, 1), "set_period_event"); - } + CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, state->period_frames), "set_avail_min"); #else CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, 0), "set_avail_min"); #endif @@ -286,7 +270,6 @@ pull_frames_queue (SpaALSAState *state, ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = 0; state->event_cb (&state->node, &ni.event, state->user_data); } if (!spa_list_is_empty (&state->ready)) { @@ -384,7 +367,6 @@ mmap_write (SpaALSAState *state) snd_pcm_sframes_t avail; snd_pcm_uframes_t offset, frames, size; const snd_pcm_channel_area_t *my_areas; - SpaNodeEventNeedInput ni; #if 0 snd_pcm_status_t *status; @@ -404,13 +386,6 @@ mmap_write (SpaALSAState *state) } #endif -#if 0 - ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; - ni.event.size = sizeof (ni); - ni.port_id = 0; - state->event_cb (&state->node, &ni.event, state->user_data); -#endif - size = avail; while (size > 0) { frames = size; @@ -418,6 +393,10 @@ mmap_write (SpaALSAState *state) spa_log_error (state->log, "snd_pcm_mmap_begin error: %s", snd_strerror(err)); return -1; } + if (frames < state->period_frames) + break; + else + frames = state->period_frames; if (state->ringbuffer) frames = pull_frames_ringbuffer (state, my_areas, offset, frames); @@ -524,43 +503,75 @@ mmap_read (SpaALSAState *state) } ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; state->event_cb (&state->node, &ho.event, state->user_data); } return 0; } +static inline short +spa_io_to_poll (SpaIO mask) +{ + short events = 0; + + if (mask & SPA_IO_IN) + events |= POLLIN; + if (mask & SPA_IO_OUT) + events |= POLLOUT; + if (mask & SPA_IO_ERR) + events |= POLLERR; + if (mask & SPA_IO_HUP) + events |= POLLHUP; + + return events; +} + +static inline SpaIO +spa_poll_to_io (short events) +{ + SpaIO mask = 0; + + if (events & POLLIN) + mask |= SPA_IO_IN; + if (events & POLLOUT) + mask |= SPA_IO_OUT; + if (events & POLLERR) + mask |= SPA_IO_ERR; + if (events & POLLHUP) + mask |= SPA_IO_HUP; + + return mask; +} + static void alsa_on_fd_events (SpaSource *source) { SpaALSAState *state = source->data; snd_pcm_t *hndl = state->hndl; - int err; + int err, i; unsigned short revents = 0; -#if 0 + for (i = 0; i < state->n_fds; i++) { + state->fds[i].revents = spa_io_to_poll (state->sources[i].rmask); + state->sources[i].rmask = 0; + } + snd_pcm_poll_descriptors_revents (hndl, state->fds, state->n_fds, &revents); - - spa_log_debug (state->log, "revents: %d %d", revents, state->n_fds); -#endif - revents = source->rmask; - - if (revents & SPA_IO_ERR) { + if (revents & POLLERR) { if ((err = xrun_recovery (state, hndl, err)) < 0) { spa_log_error (state->log, "error: %s", snd_strerror (err)); } } if (state->stream == SND_PCM_STREAM_CAPTURE) { - if (!(revents & SPA_IO_IN)) + if (!(revents & POLLIN)) return; mmap_read (state); } else { - if (!(revents & SPA_IO_OUT)) + if (!(revents & POLLOUT)) return; mmap_write (state); @@ -601,15 +612,9 @@ spa_alsa_start (SpaALSAState *state, bool xrun_recover) state->sources[i].func = alsa_on_fd_events; state->sources[i].data = state; state->sources[i].fd = state->fds[i].fd; - state->sources[i].mask = 0; - if (state->fds[i].events & POLLIN) - state->sources[i].mask |= SPA_IO_IN; - if (state->fds[i].events & POLLOUT) - state->sources[i].mask |= SPA_IO_OUT; - if (state->fds[i].events & POLLERR) - state->sources[i].mask |= SPA_IO_ERR; - if (state->fds[i].events & POLLHUP) - state->sources[i].mask |= SPA_IO_HUP; + state->sources[i].mask = spa_poll_to_io (state->fds[i].events); + state->sources[i].rmask = 0; + state->fds[i].revents = 0; spa_loop_add_source (state->data_loop, &state->sources[i]); } diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index 8645d6ff6..322659438 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -581,7 +581,6 @@ pull_port (SpaAudioMixer *this, uint32_t port_id, SpaPortOutput *output, size_t ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = port_id; this->event_cb (&this->node, &ni.event, this->user_data); } diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index a4eeb8f45..4127e20e5 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -234,7 +234,6 @@ send_have_output (SpaAudioTestSrc *this) if (this->event_cb) { ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); } diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 800f3bb00..ef76fe5e4 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -906,7 +906,6 @@ v4l2_on_fd_events (SpaSource *source) ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); } diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index 6076d7fd8..a7c0c1522 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -199,7 +199,6 @@ send_have_output (SpaVideoTestSrc *this) if (this->event_cb) { ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); }