diff --git a/pinos/client/loop.c b/pinos/client/loop.c index ba62eaeef..db7684773 100644 --- a/pinos/client/loop.c +++ b/pinos/client/loop.c @@ -83,31 +83,58 @@ typedef struct { uint8_t buffer_data[DATAS_SIZE]; } PinosLoopImpl; +static inline uint32_t +spa_io_to_epoll (SpaIO mask) +{ + uint32_t events = 0; + + if (mask & SPA_IO_IN) + events |= EPOLLIN; + if (mask & SPA_IO_OUT) + events |= EPOLLOUT; + if (mask & SPA_IO_ERR) + events |= EPOLLERR; + if (mask & SPA_IO_HUP) + events |= EPOLLHUP; + + return events; +} + +static inline SpaIO +spa_epoll_to_io (uint32_t events) +{ + SpaIO mask = 0; + + if (events & EPOLLIN) + mask |= SPA_IO_IN; + if (events & EPOLLOUT) + mask |= SPA_IO_OUT; + if (events & EPOLLHUP) + mask |= SPA_IO_HUP; + if (events & EPOLLERR) + mask |= SPA_IO_ERR; + + return mask; +} + static SpaResult loop_add_source (SpaLoop *loop, SpaSource *source) { PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); - struct epoll_event ep; source->loop = loop; if (source->fd != -1) { + struct epoll_event ep; + spa_zero (ep); - if (source->mask & SPA_IO_IN) - ep.events |= EPOLLIN; - if (source->mask & SPA_IO_OUT) - ep.events |= EPOLLOUT; - if (source->mask & SPA_IO_ERR) - ep.events |= EPOLLERR; - if (source->mask & SPA_IO_HUP) - ep.events |= EPOLLHUP; + ep.events = spa_io_to_epoll (source->mask); ep.data.ptr = source; if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0) return SPA_RESULT_ERRNO; } - return SPA_RESULT_OK; } @@ -121,14 +148,7 @@ loop_update_source (SpaSource *source) struct epoll_event ep; spa_zero (ep); - if (source->mask & SPA_IO_IN) - ep.events |= EPOLLIN; - if (source->mask & SPA_IO_OUT) - ep.events |= EPOLLOUT; - if (source->mask & SPA_IO_ERR) - ep.events |= EPOLLERR; - if (source->mask & SPA_IO_HUP) - ep.events |= EPOLLHUP; + ep.events = spa_io_to_epoll (source->mask); ep.data.ptr = source; if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0) @@ -240,10 +260,6 @@ loop_enter (SpaLoopControl *ctrl) { PinosLoopImpl *impl = SPA_CONTAINER_OF (ctrl, PinosLoopImpl, control); impl->thread = pthread_self(); - if (impl->event == NULL) - impl->event = spa_loop_utils_add_event (&impl->utils, - event_func, - impl); } static void @@ -275,20 +291,19 @@ loop_iterate (SpaLoopControl *ctrl, return SPA_RESULT_ERRNO; } + /* first we set all the rmasks, then call the callbacks. The reason is that + * some callback might also want to look at other sources it manages and + * can then reset the rmask to suppress the callback */ for (i = 0; i < nfds; i++) { SpaSource *source = ep[i].data.ptr; - - source->rmask = 0; - if (ep[i].events & EPOLLIN) - source->rmask |= SPA_IO_IN; - if (ep[i].events & EPOLLOUT) - source->rmask |= SPA_IO_OUT; - if (ep[i].events & EPOLLHUP) - source->rmask |= SPA_IO_HUP; - if (ep[i].events & EPOLLERR) - source->rmask |= SPA_IO_ERR; - - source->func (source); + source->rmask = spa_epoll_to_io (ep[i].events); + } + for (i = 0; i < nfds; i++) { + SpaSource *source = ep[i].data.ptr; + if (source->rmask) { + source->func (source); + source->rmask = 0; + } } return SPA_RESULT_OK; } @@ -610,6 +625,10 @@ pinos_loop_new (void) spa_ringbuffer_init (&impl->buffer, DATAS_SIZE); + impl->event = spa_loop_utils_add_event (&impl->utils, + event_func, + impl); + return this; no_epoll: @@ -628,6 +647,8 @@ pinos_loop_destroy (PinosLoop *loop) spa_list_for_each_safe (source, tmp, &impl->source_list, link) loop_destroy_source (&source->source); + pinos_loop_destroy_source (loop, impl->event); + close (impl->epoll_fd); free (impl); } diff --git a/pinos/client/stream.c b/pinos/client/stream.c index 6a2e5a81c..0a45a7545 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -53,7 +53,6 @@ typedef struct { bool used; void *buf_ptr; SpaBuffer *buf; - SpaData *datas; } BufferId; typedef struct @@ -310,29 +309,35 @@ add_port_update (PinosStream *stream, uint32_t change_mask, bool flush) static inline void send_need_input (PinosStream *stream) { +#if 0 PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventNeedInput ni; - uint8_t cmd = 0; + uint64_t cmd = 1; + + pinos_log_debug ("stream %p: need input", stream); ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = 0; pinos_transport_add_event (impl->trans, &ni.event); - write (impl->rtfd, &cmd, 1); + write (impl->rtfd, &cmd, 8); +#endif } static inline void send_have_output (PinosStream *stream) { +#if 0 PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventHaveOutput ho; - uint8_t cmd = 0; + uint64_t cmd = 1; + + pinos_log_debug ("stream %p: have output", stream); ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; pinos_transport_add_event (impl->trans, &ho.event); - write (impl->rtfd, &cmd, 1); + write (impl->rtfd, &cmd, 8); +#endif } static void @@ -442,6 +447,8 @@ handle_rtnode_event (PinosStream *stream, { int i; + //pinos_log_debug ("stream %p: have output", stream); + for (i = 0; i < impl->trans->area->n_inputs; i++) { SpaPortInput *input = &impl->trans->inputs[i]; @@ -456,6 +463,7 @@ handle_rtnode_event (PinosStream *stream, } case SPA_NODE_EVENT_TYPE_NEED_INPUT: + //pinos_log_debug ("stream %p: need input", stream); pinos_signal_emit (&stream->need_buffer, stream); break; @@ -469,7 +477,7 @@ handle_rtnode_event (PinosStream *stream, if (impl->direction != SPA_DIRECTION_OUTPUT) break; - if ((bid = find_buffer (stream, p->buffer_id))) { + if ((bid = find_buffer (stream, p->buffer_id)) && bid->used) { bid->used = false; pinos_signal_emit (&stream->new_buffer, stream, p->buffer_id); } @@ -481,6 +489,17 @@ handle_rtnode_event (PinosStream *stream, } } +static void +unhandle_socket (PinosStream *stream) +{ + PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); + + if (impl->rtsocket_source) { + pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source); + impl->rtsocket_source = NULL; + } +} + static void on_rtsocket_condition (SpaSource *source, int fd, @@ -492,13 +511,15 @@ on_rtsocket_condition (SpaSource *source, if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pinos_log_warn ("got error"); + unhandle_socket (stream); + return; } if (mask & SPA_IO_IN) { SpaNodeEvent event; - uint8_t cmd; + uint64_t cmd; - read (impl->rtfd, &cmd, 1); + read (impl->rtfd, &cmd, 8); while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) { SpaNodeEvent *ev = alloca (event.size); @@ -535,17 +556,6 @@ handle_socket (PinosStream *stream, int rtfd) return; } -static void -unhandle_socket (PinosStream *stream) -{ - PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); - - if (impl->rtsocket_source) { - pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source); - impl->rtsocket_source = NULL; - } -} - static void handle_node_event (PinosStream *stream, SpaNodeEvent *event) @@ -724,6 +734,7 @@ stream_dispatch_func (void *object, } len = pinos_array_get_len (&impl->buffer_ids, BufferId); bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId)); + bid->used = false; b = p->buffers[i].buffer; @@ -1052,14 +1063,14 @@ pinos_stream_recycle_buffer (PinosStream *stream, { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); SpaNodeEventReuseBuffer rb; - uint8_t cmd = 0; + uint64_t cmd = 1; rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; rb.event.size = sizeof (rb); rb.port_id = impl->port_id; rb.buffer_id = id; pinos_transport_add_event (impl->trans, &rb.event); - write (impl->rtfd, &cmd, 1); + write (impl->rtfd, &cmd, 8); return true; } @@ -1107,11 +1118,18 @@ pinos_stream_send_buffer (PinosStream *stream, PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); BufferId *bid; - if ((bid = find_buffer (stream, id))) { + if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) { + pinos_log_debug ("can't send %u, pending buffer %u", id, impl->trans->outputs[0].buffer_id); + return false; + } + + if ((bid = find_buffer (stream, id)) && !bid->used) { bid->used = true; impl->trans->outputs[0].buffer_id = id; impl->trans->outputs[0].status = SPA_RESULT_OK; send_have_output (stream); + } else { + pinos_log_debug ("stream %p: output %u was used", stream, id); } return true; diff --git a/pinos/client/transport.c b/pinos/client/transport.c index e8c9eb442..a73a7cdb3 100644 --- a/pinos/client/transport.c +++ b/pinos/client/transport.c @@ -60,14 +60,27 @@ static void transport_setup_area (void *p, PinosTransport *trans) { PinosTransportArea *a; + int i; trans->area = a = p; p = SPA_MEMBER (p, sizeof (PinosTransportArea), SpaPortInput); trans->inputs = p; + for (i = 0; i < a->max_inputs; i++) { + trans->inputs[i].state = SPA_PORT_STATE_FLAG_NONE; + trans->inputs[i].flags = SPA_PORT_INPUT_FLAG_NONE; + trans->inputs[i].buffer_id = SPA_ID_INVALID; + trans->inputs[i].status = SPA_RESULT_OK; + } p = SPA_MEMBER (p, a->max_inputs * sizeof (SpaPortInput), void); trans->outputs = p; + for (i = 0; i < a->max_outputs; i++) { + trans->outputs[i].state = SPA_PORT_STATE_FLAG_NONE; + trans->outputs[i].flags = SPA_PORT_OUTPUT_FLAG_NONE; + trans->outputs[i].buffer_id = SPA_ID_INVALID; + trans->outputs[i].status = SPA_RESULT_OK; + } p = SPA_MEMBER (p, a->max_outputs * sizeof (SpaPortOutput), void); trans->input_buffer = p; diff --git a/pinos/gst/gstpinossink.c b/pinos/gst/gstpinossink.c index 86cd2c81d..ce5f02b2c 100644 --- a/pinos/gst/gstpinossink.c +++ b/pinos/gst/gstpinossink.c @@ -458,7 +458,7 @@ on_new_buffer (PinosListener *listener, GstPinosSink *pinossink = SPA_CONTAINER_OF (listener, GstPinosSink, stream_new_buffer); GstBuffer *buf; - GST_LOG_OBJECT (pinossink, "got new buffer"); + GST_LOG_OBJECT (pinossink, "got new buffer %u", id); if (pinossink->stream == NULL) { GST_LOG_OBJECT (pinossink, "no stream"); return; @@ -498,10 +498,12 @@ do_send_buffer (GstPinosSink *pinossink) d->chunk->size = mem->size; } - if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) + if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) { g_warning ("can't send buffer"); - - pinossink->need_ready--; + gst_buffer_unref (buffer); + pinos_thread_main_loop_signal (pinossink->main_loop, FALSE); + } else + pinossink->need_ready--; } @@ -683,8 +685,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_ref (buffer); g_queue_push_tail (&pinossink->queue, buffer); - if (pinossink->need_ready) - do_send_buffer (pinossink); +// if (pinossink->need_ready) +// do_send_buffer (pinossink); done: pinos_thread_main_loop_unlock (pinossink->main_loop); diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 7748f2875..c81fac8ca 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -171,13 +171,12 @@ send_need_input (SpaProxy *this) { PinosNode *pnode = this->pnode; SpaNodeEventNeedInput ni; - uint8_t cmd = 0; + uint64_t cmd = 1; ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = 0; pinos_transport_add_event (pnode->transport, &ni.event); - write (this->data_source.fd, &cmd, 1); + write (this->data_source.fd, &cmd, 8); } static void @@ -185,13 +184,12 @@ send_have_output (SpaProxy *this) { PinosNode *pnode = this->pnode; SpaNodeEventHaveOutput ho; - uint8_t cmd = 0; + uint64_t cmd = 1; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; pinos_transport_add_event (pnode->transport, &ho.event); - write (this->data_source.fd, &cmd, 1); + write (this->data_source.fd, &cmd, 8); } static SpaResult @@ -857,7 +855,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, SpaProxy *this; SpaNodeEventReuseBuffer rb; PinosNode *pnode; - uint8_t cmd = 0; + //uint64_t cmd = 1; if (node == NULL) return SPA_RESULT_INVALID_ARGUMENTS; @@ -873,7 +871,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, rb.port_id = port_id; rb.buffer_id = buffer_id; pinos_transport_add_event (pnode->transport, &rb.event); - write (this->data_source.fd, &cmd, 1); + //write (this->data_source.fd, &cmd, 8); return SPA_RESULT_OK; } @@ -1060,9 +1058,9 @@ proxy_on_data_fd_events (SpaSource *source) if (source->rmask & SPA_IO_IN) { SpaNodeEvent event; - uint8_t cmd; + uint64_t cmd; - read (this->data_source.fd, &cmd, 1); + read (this->data_source.fd, &cmd, 8); while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) { SpaNodeEvent *ev = alloca (event.size); @@ -1338,15 +1336,22 @@ pinos_client_node_get_data_socket (PinosClientNode *this, PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this); if (impl->data_fd == -1) { +#if 1 int fd[2]; - if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0) + if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fd) != 0) return SPA_RESULT_ERRNO; impl->proxy.data_source.fd = fd[0]; - spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source); - pinos_log_debug ("client-node %p: add data fd %d", this, fd[0]); impl->data_fd = fd[1]; +#else + + impl->proxy.data_source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->data_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); +#endif + + spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source); + pinos_log_debug ("client-node %p: add data fd %d", this, impl->proxy.data_source.fd); } *fd = impl->data_fd; return SPA_RESULT_OK; diff --git a/pinos/server/data-loop.c b/pinos/server/data-loop.c index 1a343bf8e..c43843271 100644 --- a/pinos/server/data-loop.c +++ b/pinos/server/data-loop.c @@ -42,6 +42,7 @@ typedef struct pthread_t thread; } PinosDataLoopImpl; + static void make_realtime (PinosDataLoop *this) { @@ -57,7 +58,7 @@ make_realtime (PinosDataLoop *this) spa_zero (sp); sp.sched_priority = rtprio; - if (pthread_setschedparam (pthread_self(), SCHED_RR|SCHED_RESET_ON_FORK, &sp) == 0) { + if (pthread_setschedparam (pthread_self(), SCHED_OTHER|SCHED_RESET_ON_FORK, &sp) == 0) { pinos_log_debug ("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); return; } diff --git a/pinos/server/link.c b/pinos/server/link.c index b3d947cba..52e77205f 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -716,8 +716,6 @@ pinos_link_activate (PinosLink *this) { PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this); - spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue)); - pinos_work_queue_add (impl->work, this, SPA_RESULT_WAIT_SYNC, @@ -729,7 +727,6 @@ pinos_link_activate (PinosLink *this) bool pinos_pinos_link_deactivate (PinosLink *this) { - spa_ringbuffer_clear (&this->ringbuffer); return true; } diff --git a/pinos/server/link.h b/pinos/server/link.h index 44ac1eb72..c9eb161f4 100644 --- a/pinos/server/link.h +++ b/pinos/server/link.h @@ -68,9 +68,6 @@ struct _PinosLink { PinosLink *link, PinosPort *port)); - uint32_t queue[64]; - SpaRingbuffer ringbuffer; - struct { unsigned int in_ready; PinosPort *input; diff --git a/pinos/server/node.c b/pinos/server/node.c index ff2d31f25..f9c7d1a57 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -246,37 +246,6 @@ send_clock_update (PinosNode *this) pinos_log_debug ("got error %d", res); } -static SpaResult -do_read_link (SpaLoop *loop, - bool async, - uint32_t seq, - size_t size, - void *data, - void *user_data) -{ - PinosNode *this = user_data; - PinosLink *link = ((PinosLink**)data)[0]; - size_t offset; - SpaResult res; - - if (link->rt.input == NULL) - return SPA_RESULT_OK; - - while (link->rt.in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) { - SpaPortInput *input = &this->transport->inputs[link->rt.input->port_id]; - - input->buffer_id = link->queue[offset]; - - if ((res = spa_node_process_input (link->rt.input->node->node)) < 0) - pinos_log_warn ("node %p: error pushing buffer: %d, %d", this, res, input->status); - - spa_ringbuffer_read_advance (&link->ringbuffer, 1); - link->rt.in_ready--; - } - return SPA_RESULT_OK; -} - - static void on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) { @@ -303,59 +272,84 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) case SPA_NODE_EVENT_TYPE_NEED_INPUT: { - SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event; - PinosPort *port = this->input_port_map[ni->port_id]; - PinosLink *link; + SpaResult res; + int i; + bool processed = false; - spa_list_for_each (link, &port->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) +// pinos_log_debug ("node %p: need input", this); + + for (i = 0; i < this->transport->area->n_inputs; i++) { + PinosLink *link; + PinosPort *inport, *outport; + SpaPortInput *pi; + SpaPortOutput *po; + + pi = &this->transport->inputs[i]; + if (pi->buffer_id != SPA_ID_INVALID) continue; - link->rt.in_ready++; - pinos_loop_invoke (link->rt.input->node->data_loop->loop, - do_read_link, - SPA_ID_INVALID, - sizeof (PinosLink *), - &link, - link->rt.input->node); + inport = this->input_port_map[i]; + spa_list_for_each (link, &inport->rt.links, rt.input_link) { + if (link->rt.input == NULL || link->rt.output == NULL) + continue; + + outport = link->rt.output; + po = &outport->node->transport->outputs[outport->port_id]; + + if (po->buffer_id != SPA_ID_INVALID) { + processed = true; + + pi->buffer_id = po->buffer_id; + po->buffer_id = SPA_ID_INVALID; + } + if ((res = spa_node_process_output (outport->node->node)) < 0) + pinos_log_warn ("node %p: got process output %d", outport->node, res); + } + } + if (processed) { + if ((res = spa_node_process_input (this->node)) < 0) + pinos_log_warn ("node %p: got process input %d", this, res); } break; } case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: { - SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event; SpaResult res; - bool pushed = false; - SpaPortOutput *po = &this->transport->outputs[ho->port_id]; - PinosPort *port = this->output_port_map[ho->port_id]; - PinosLink *link; + int i; + bool processed = false; - spa_list_for_each (link, &port->rt.links, rt.output_link) { - size_t offset; +// pinos_log_debug ("node %p: have output", this); - if (link->rt.input == NULL || link->rt.output == NULL) + for (i = 0; i < this->transport->area->n_outputs; i++) { + PinosLink *link; + PinosPort *inport, *outport; + SpaPortInput *pi; + SpaPortOutput *po; + + po = &this->transport->outputs[i]; + if (po->buffer_id == SPA_ID_INVALID) continue; - if (spa_ringbuffer_get_write_offset (&link->ringbuffer, &offset) > 0) { - link->queue[offset] = po->buffer_id; - spa_ringbuffer_write_advance (&link->ringbuffer, 1); + outport = this->output_port_map[i]; + spa_list_for_each (link, &outport->rt.links, rt.output_link) { + if (link->rt.input == NULL || link->rt.output == NULL) + continue; - pinos_loop_invoke (link->rt.input->node->data_loop->loop, - do_read_link, - SPA_ID_INVALID, - sizeof (PinosLink *), - &link, - link->rt.input->node); - pushed = true; + inport = link->rt.input; + pi = &inport->node->transport->inputs[inport->port_id]; + + processed = true; + + pi->buffer_id = po->buffer_id; + + if ((res = spa_node_process_input (inport->node->node)) < 0) + pinos_log_warn ("node %p: got process input %d", inport->node, res); } + po->buffer_id = SPA_ID_INVALID; } - if (!pushed) { - if ((res = spa_node_port_reuse_buffer (node, ho->port_id, po->buffer_id)) < 0) - pinos_log_warn ("node %p: error reuse buffer: %d", node, res); - } - if ((res = spa_node_process_output (node)) < 0) { - pinos_log_warn ("node %p: got pull error %d, %d", this, res, po->status); - break; + if (processed) { + if ((res = spa_node_process_output (this->node)) < 0) + pinos_log_warn ("node %p: got process output %d", this, res); } break; } @@ -366,6 +360,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) PinosPort *port = this->input_port_map[rb->port_id]; PinosLink *link; +// pinos_log_debug ("node %p: reuse buffer %u", this, rb->buffer_id); + spa_list_for_each (link, &port->rt.links, rt.input_link) { if (link->rt.input == NULL || link->rt.output == NULL) continue; diff --git a/spa/include/spa/loop.h b/spa/include/spa/loop.h index b0f2e22c2..4ebfa03b1 100644 --- a/spa/include/spa/loop.h +++ b/spa/include/spa/loop.h @@ -54,7 +54,6 @@ struct _SpaSource { int fd; SpaIO mask; SpaIO rmask; - void *loop_private; }; typedef SpaResult (*SpaInvokeFunc) (SpaLoop *loop, diff --git a/spa/include/spa/node-event.h b/spa/include/spa/node-event.h index 4833bc102..0b1823ae5 100644 --- a/spa/include/spa/node-event.h +++ b/spa/include/spa/node-event.h @@ -78,12 +78,10 @@ typedef struct { typedef struct { SpaNodeEvent event; - uint32_t port_id; } SpaNodeEventHaveOutput; typedef struct { SpaNodeEvent event; - uint32_t port_id; } SpaNodeEventNeedInput; typedef struct { diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 6aa779138..6a5415f5d 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -32,7 +32,7 @@ typedef struct _SpaALSAState SpaALSASink; static const char default_device[] = "default"; -static const uint32_t default_period_size = 128; +static const uint32_t default_period_size = 32; static const uint32_t default_periods = 2; static const bool default_period_event = 0; @@ -691,6 +691,7 @@ spa_alsa_sink_node_process_input (SpaNode *node) b = &this->buffers[input->buffer_id]; if (!b->outstanding) { + input->buffer_id = SPA_ID_INVALID; input->status = SPA_RESULT_INVALID_BUFFER_ID; return SPA_RESULT_ERROR; } @@ -700,7 +701,9 @@ spa_alsa_sink_node_process_input (SpaNode *node) } else { spa_list_insert (this->ready.prev, &b->link); } + //spa_log_debug (this->log, "alsa-source: got buffer %u", input->buffer_id); b->outstanding = false; + input->buffer_id = SPA_ID_INVALID; input->status = SPA_RESULT_OK; return SPA_RESULT_OK; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 5cd5d7296..22a1e9a49 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -39,7 +39,7 @@ update_state (SpaALSASource *this, SpaNodeState state) } static const char default_device[] = "hw:0"; -static const uint32_t default_period_size = 128; +static const uint32_t default_period_size = 32; static const uint32_t default_periods = 2; static const bool default_period_event = 0; diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index f6019ecae..f419f528a 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -175,22 +175,6 @@ spa_alsa_set_format (SpaALSAState *state, SpaFormatAudio *fmt, SpaPortFormatFlag CHECK (snd_pcm_hw_params_set_buffer_size (hndl, params, state->buffer_frames), "set_buffer_size"); -#if 0 - /* set the buffer time */ - buffer_time = props->buffer_time; - CHECK (snd_pcm_hw_params_set_buffer_time_near (hndl, params, &buffer_time, &dir), "set_buffer_time_near"); - - CHECK (snd_pcm_hw_params_get_buffer_size (params, &size), "get_buffer_size"); - state->buffer_frames = size; - - /* set the period time */ - period_time = props->period_time; - CHECK (snd_pcm_hw_params_set_period_time_near (hndl, params, &period_time, &dir), "set_period_time_near"); - - CHECK (snd_pcm_hw_params_get_period_size (params, &size, &dir), "get_period_size"); - state->period_frames = size; -#endif - spa_log_info (state->log, "buffer frames %zd, period frames %zd", state->buffer_frames, state->period_frames); /* write the parameters to device */ @@ -216,22 +200,22 @@ set_swparams (SpaALSAState *state) /* start the transfer */ CHECK (snd_pcm_sw_params_set_start_threshold (hndl, params, 0U), "set_start_threshold"); +#if 1 CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, (state->buffer_frames / state->period_frames) * state->period_frames), "set_stop_threshold"); -// CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, -1), "set_stop_threshold"); +#else + CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, -1), "set_stop_threshold"); +#endif CHECK (snd_pcm_sw_params_set_silence_threshold (hndl, params, 0U), "set_silence_threshold"); + /* enable period events when requested */ + CHECK (snd_pcm_sw_params_set_period_event (hndl, params, props->period_event ? 1 : 0), "set_period_event"); + #if 1 /* allow the transfer when at least period_size samples can be processed */ /* or disable this mechanism when period event is enabled (aka interrupt like style processing) */ - CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, - props->period_event ? state->buffer_frames : state->period_frames), "set_avail_min"); - - /* enable period events when requested */ - if (props->period_event) { - CHECK (snd_pcm_sw_params_set_period_event (hndl, params, 1), "set_period_event"); - } + CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, state->period_frames), "set_avail_min"); #else CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, 0), "set_avail_min"); #endif @@ -286,7 +270,6 @@ pull_frames_queue (SpaALSAState *state, ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = 0; state->event_cb (&state->node, &ni.event, state->user_data); } if (!spa_list_is_empty (&state->ready)) { @@ -384,7 +367,6 @@ mmap_write (SpaALSAState *state) snd_pcm_sframes_t avail; snd_pcm_uframes_t offset, frames, size; const snd_pcm_channel_area_t *my_areas; - SpaNodeEventNeedInput ni; #if 0 snd_pcm_status_t *status; @@ -404,13 +386,6 @@ mmap_write (SpaALSAState *state) } #endif -#if 0 - ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; - ni.event.size = sizeof (ni); - ni.port_id = 0; - state->event_cb (&state->node, &ni.event, state->user_data); -#endif - size = avail; while (size > 0) { frames = size; @@ -418,6 +393,10 @@ mmap_write (SpaALSAState *state) spa_log_error (state->log, "snd_pcm_mmap_begin error: %s", snd_strerror(err)); return -1; } + if (frames < state->period_frames) + break; + else + frames = state->period_frames; if (state->ringbuffer) frames = pull_frames_ringbuffer (state, my_areas, offset, frames); @@ -524,43 +503,75 @@ mmap_read (SpaALSAState *state) } ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; state->event_cb (&state->node, &ho.event, state->user_data); } return 0; } +static inline short +spa_io_to_poll (SpaIO mask) +{ + short events = 0; + + if (mask & SPA_IO_IN) + events |= POLLIN; + if (mask & SPA_IO_OUT) + events |= POLLOUT; + if (mask & SPA_IO_ERR) + events |= POLLERR; + if (mask & SPA_IO_HUP) + events |= POLLHUP; + + return events; +} + +static inline SpaIO +spa_poll_to_io (short events) +{ + SpaIO mask = 0; + + if (events & POLLIN) + mask |= SPA_IO_IN; + if (events & POLLOUT) + mask |= SPA_IO_OUT; + if (events & POLLERR) + mask |= SPA_IO_ERR; + if (events & POLLHUP) + mask |= SPA_IO_HUP; + + return mask; +} + static void alsa_on_fd_events (SpaSource *source) { SpaALSAState *state = source->data; snd_pcm_t *hndl = state->hndl; - int err; + int err, i; unsigned short revents = 0; -#if 0 + for (i = 0; i < state->n_fds; i++) { + state->fds[i].revents = spa_io_to_poll (state->sources[i].rmask); + state->sources[i].rmask = 0; + } + snd_pcm_poll_descriptors_revents (hndl, state->fds, state->n_fds, &revents); - - spa_log_debug (state->log, "revents: %d %d", revents, state->n_fds); -#endif - revents = source->rmask; - - if (revents & SPA_IO_ERR) { + if (revents & POLLERR) { if ((err = xrun_recovery (state, hndl, err)) < 0) { spa_log_error (state->log, "error: %s", snd_strerror (err)); } } if (state->stream == SND_PCM_STREAM_CAPTURE) { - if (!(revents & SPA_IO_IN)) + if (!(revents & POLLIN)) return; mmap_read (state); } else { - if (!(revents & SPA_IO_OUT)) + if (!(revents & POLLOUT)) return; mmap_write (state); @@ -601,15 +612,9 @@ spa_alsa_start (SpaALSAState *state, bool xrun_recover) state->sources[i].func = alsa_on_fd_events; state->sources[i].data = state; state->sources[i].fd = state->fds[i].fd; - state->sources[i].mask = 0; - if (state->fds[i].events & POLLIN) - state->sources[i].mask |= SPA_IO_IN; - if (state->fds[i].events & POLLOUT) - state->sources[i].mask |= SPA_IO_OUT; - if (state->fds[i].events & POLLERR) - state->sources[i].mask |= SPA_IO_ERR; - if (state->fds[i].events & POLLHUP) - state->sources[i].mask |= SPA_IO_HUP; + state->sources[i].mask = spa_poll_to_io (state->fds[i].events); + state->sources[i].rmask = 0; + state->fds[i].revents = 0; spa_loop_add_source (state->data_loop, &state->sources[i]); } diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index 8645d6ff6..322659438 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -581,7 +581,6 @@ pull_port (SpaAudioMixer *this, uint32_t port_id, SpaPortOutput *output, size_t ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.size = sizeof (ni); - ni.port_id = port_id; this->event_cb (&this->node, &ni.event, this->user_data); } diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index a4eeb8f45..4127e20e5 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -234,7 +234,6 @@ send_have_output (SpaAudioTestSrc *this) if (this->event_cb) { ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); } diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 800f3bb00..ef76fe5e4 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -906,7 +906,6 @@ v4l2_on_fd_events (SpaSource *source) ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); } diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index 6076d7fd8..a7c0c1522 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -199,7 +199,6 @@ send_have_output (SpaVideoTestSrc *this) if (this->event_cb) { ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.size = sizeof (ho); - ho.port_id = 0; this->event_cb (&this->node, &ho.event, this->user_data); }