optimize data transport

Remove queue and ringbuffer between nodes. transfer the buffer id
directly between the io areas when possible.
Let only pinos send push or pull requests for now.
Allow polling multiple fds, like how alsa wants it
Remove port_id from events.
This commit is contained in:
Wim Taymans 2017-01-18 18:29:15 +01:00
parent c8648eaf59
commit 7a9dc2c4fd
18 changed files with 265 additions and 214 deletions

View file

@ -83,31 +83,58 @@ typedef struct {
uint8_t buffer_data[DATAS_SIZE]; uint8_t buffer_data[DATAS_SIZE];
} PinosLoopImpl; } PinosLoopImpl;
static inline uint32_t
spa_io_to_epoll (SpaIO mask)
{
uint32_t events = 0;
if (mask & SPA_IO_IN)
events |= EPOLLIN;
if (mask & SPA_IO_OUT)
events |= EPOLLOUT;
if (mask & SPA_IO_ERR)
events |= EPOLLERR;
if (mask & SPA_IO_HUP)
events |= EPOLLHUP;
return events;
}
static inline SpaIO
spa_epoll_to_io (uint32_t events)
{
SpaIO mask = 0;
if (events & EPOLLIN)
mask |= SPA_IO_IN;
if (events & EPOLLOUT)
mask |= SPA_IO_OUT;
if (events & EPOLLHUP)
mask |= SPA_IO_HUP;
if (events & EPOLLERR)
mask |= SPA_IO_ERR;
return mask;
}
static SpaResult static SpaResult
loop_add_source (SpaLoop *loop, loop_add_source (SpaLoop *loop,
SpaSource *source) SpaSource *source)
{ {
PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop); PinosLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosLoopImpl, loop);
struct epoll_event ep;
source->loop = loop; source->loop = loop;
if (source->fd != -1) { if (source->fd != -1) {
struct epoll_event ep;
spa_zero (ep); spa_zero (ep);
if (source->mask & SPA_IO_IN) ep.events = spa_io_to_epoll (source->mask);
ep.events |= EPOLLIN;
if (source->mask & SPA_IO_OUT)
ep.events |= EPOLLOUT;
if (source->mask & SPA_IO_ERR)
ep.events |= EPOLLERR;
if (source->mask & SPA_IO_HUP)
ep.events |= EPOLLHUP;
ep.data.ptr = source; ep.data.ptr = source;
if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0) if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_ADD, source->fd, &ep) < 0)
return SPA_RESULT_ERRNO; return SPA_RESULT_ERRNO;
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -121,14 +148,7 @@ loop_update_source (SpaSource *source)
struct epoll_event ep; struct epoll_event ep;
spa_zero (ep); spa_zero (ep);
if (source->mask & SPA_IO_IN) ep.events = spa_io_to_epoll (source->mask);
ep.events |= EPOLLIN;
if (source->mask & SPA_IO_OUT)
ep.events |= EPOLLOUT;
if (source->mask & SPA_IO_ERR)
ep.events |= EPOLLERR;
if (source->mask & SPA_IO_HUP)
ep.events |= EPOLLHUP;
ep.data.ptr = source; ep.data.ptr = source;
if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0) if (epoll_ctl (impl->epoll_fd, EPOLL_CTL_MOD, source->fd, &ep) < 0)
@ -240,10 +260,6 @@ loop_enter (SpaLoopControl *ctrl)
{ {
PinosLoopImpl *impl = SPA_CONTAINER_OF (ctrl, PinosLoopImpl, control); PinosLoopImpl *impl = SPA_CONTAINER_OF (ctrl, PinosLoopImpl, control);
impl->thread = pthread_self(); impl->thread = pthread_self();
if (impl->event == NULL)
impl->event = spa_loop_utils_add_event (&impl->utils,
event_func,
impl);
} }
static void static void
@ -275,20 +291,19 @@ loop_iterate (SpaLoopControl *ctrl,
return SPA_RESULT_ERRNO; return SPA_RESULT_ERRNO;
} }
/* first we set all the rmasks, then call the callbacks. The reason is that
* some callback might also want to look at other sources it manages and
* can then reset the rmask to suppress the callback */
for (i = 0; i < nfds; i++) { for (i = 0; i < nfds; i++) {
SpaSource *source = ep[i].data.ptr; SpaSource *source = ep[i].data.ptr;
source->rmask = spa_epoll_to_io (ep[i].events);
source->rmask = 0; }
if (ep[i].events & EPOLLIN) for (i = 0; i < nfds; i++) {
source->rmask |= SPA_IO_IN; SpaSource *source = ep[i].data.ptr;
if (ep[i].events & EPOLLOUT) if (source->rmask) {
source->rmask |= SPA_IO_OUT; source->func (source);
if (ep[i].events & EPOLLHUP) source->rmask = 0;
source->rmask |= SPA_IO_HUP; }
if (ep[i].events & EPOLLERR)
source->rmask |= SPA_IO_ERR;
source->func (source);
} }
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -610,6 +625,10 @@ pinos_loop_new (void)
spa_ringbuffer_init (&impl->buffer, DATAS_SIZE); spa_ringbuffer_init (&impl->buffer, DATAS_SIZE);
impl->event = spa_loop_utils_add_event (&impl->utils,
event_func,
impl);
return this; return this;
no_epoll: no_epoll:
@ -628,6 +647,8 @@ pinos_loop_destroy (PinosLoop *loop)
spa_list_for_each_safe (source, tmp, &impl->source_list, link) spa_list_for_each_safe (source, tmp, &impl->source_list, link)
loop_destroy_source (&source->source); loop_destroy_source (&source->source);
pinos_loop_destroy_source (loop, impl->event);
close (impl->epoll_fd); close (impl->epoll_fd);
free (impl); free (impl);
} }

View file

@ -53,7 +53,6 @@ typedef struct {
bool used; bool used;
void *buf_ptr; void *buf_ptr;
SpaBuffer *buf; SpaBuffer *buf;
SpaData *datas;
} BufferId; } BufferId;
typedef struct typedef struct
@ -310,29 +309,35 @@ add_port_update (PinosStream *stream, uint32_t change_mask, bool flush)
static inline void static inline void
send_need_input (PinosStream *stream) send_need_input (PinosStream *stream)
{ {
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventNeedInput ni; SpaNodeEventNeedInput ni;
uint8_t cmd = 0; uint64_t cmd = 1;
pinos_log_debug ("stream %p: need input", stream);
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni); ni.event.size = sizeof (ni);
ni.port_id = 0;
pinos_transport_add_event (impl->trans, &ni.event); pinos_transport_add_event (impl->trans, &ni.event);
write (impl->rtfd, &cmd, 1); write (impl->rtfd, &cmd, 8);
#endif
} }
static inline void static inline void
send_have_output (PinosStream *stream) send_have_output (PinosStream *stream)
{ {
#if 0
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventHaveOutput ho; SpaNodeEventHaveOutput ho;
uint8_t cmd = 0; uint64_t cmd = 1;
pinos_log_debug ("stream %p: have output", stream);
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho); ho.event.size = sizeof (ho);
ho.port_id = 0;
pinos_transport_add_event (impl->trans, &ho.event); pinos_transport_add_event (impl->trans, &ho.event);
write (impl->rtfd, &cmd, 1); write (impl->rtfd, &cmd, 8);
#endif
} }
static void static void
@ -442,6 +447,8 @@ handle_rtnode_event (PinosStream *stream,
{ {
int i; int i;
//pinos_log_debug ("stream %p: have output", stream);
for (i = 0; i < impl->trans->area->n_inputs; i++) { for (i = 0; i < impl->trans->area->n_inputs; i++) {
SpaPortInput *input = &impl->trans->inputs[i]; SpaPortInput *input = &impl->trans->inputs[i];
@ -456,6 +463,7 @@ handle_rtnode_event (PinosStream *stream,
} }
case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT:
//pinos_log_debug ("stream %p: need input", stream);
pinos_signal_emit (&stream->need_buffer, stream); pinos_signal_emit (&stream->need_buffer, stream);
break; break;
@ -469,7 +477,7 @@ handle_rtnode_event (PinosStream *stream,
if (impl->direction != SPA_DIRECTION_OUTPUT) if (impl->direction != SPA_DIRECTION_OUTPUT)
break; break;
if ((bid = find_buffer (stream, p->buffer_id))) { if ((bid = find_buffer (stream, p->buffer_id)) && bid->used) {
bid->used = false; bid->used = false;
pinos_signal_emit (&stream->new_buffer, stream, p->buffer_id); pinos_signal_emit (&stream->new_buffer, stream, p->buffer_id);
} }
@ -481,6 +489,17 @@ handle_rtnode_event (PinosStream *stream,
} }
} }
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->rtsocket_source) {
pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source);
impl->rtsocket_source = NULL;
}
}
static void static void
on_rtsocket_condition (SpaSource *source, on_rtsocket_condition (SpaSource *source,
int fd, int fd,
@ -492,13 +511,15 @@ on_rtsocket_condition (SpaSource *source,
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pinos_log_warn ("got error"); pinos_log_warn ("got error");
unhandle_socket (stream);
return;
} }
if (mask & SPA_IO_IN) { if (mask & SPA_IO_IN) {
SpaNodeEvent event; SpaNodeEvent event;
uint8_t cmd; uint64_t cmd;
read (impl->rtfd, &cmd, 1); read (impl->rtfd, &cmd, 8);
while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) { while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (event.size); SpaNodeEvent *ev = alloca (event.size);
@ -535,17 +556,6 @@ handle_socket (PinosStream *stream, int rtfd)
return; return;
} }
static void
unhandle_socket (PinosStream *stream)
{
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
if (impl->rtsocket_source) {
pinos_loop_destroy_source (stream->context->loop, impl->rtsocket_source);
impl->rtsocket_source = NULL;
}
}
static void static void
handle_node_event (PinosStream *stream, handle_node_event (PinosStream *stream,
SpaNodeEvent *event) SpaNodeEvent *event)
@ -724,6 +734,7 @@ stream_dispatch_func (void *object,
} }
len = pinos_array_get_len (&impl->buffer_ids, BufferId); len = pinos_array_get_len (&impl->buffer_ids, BufferId);
bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId)); bid = pinos_array_add (&impl->buffer_ids, sizeof (BufferId));
bid->used = false;
b = p->buffers[i].buffer; b = p->buffers[i].buffer;
@ -1052,14 +1063,14 @@ pinos_stream_recycle_buffer (PinosStream *stream,
{ {
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
SpaNodeEventReuseBuffer rb; SpaNodeEventReuseBuffer rb;
uint8_t cmd = 0; uint64_t cmd = 1;
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
rb.event.size = sizeof (rb); rb.event.size = sizeof (rb);
rb.port_id = impl->port_id; rb.port_id = impl->port_id;
rb.buffer_id = id; rb.buffer_id = id;
pinos_transport_add_event (impl->trans, &rb.event); pinos_transport_add_event (impl->trans, &rb.event);
write (impl->rtfd, &cmd, 1); write (impl->rtfd, &cmd, 8);
return true; return true;
} }
@ -1107,11 +1118,18 @@ pinos_stream_send_buffer (PinosStream *stream,
PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this);
BufferId *bid; BufferId *bid;
if ((bid = find_buffer (stream, id))) { if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) {
pinos_log_debug ("can't send %u, pending buffer %u", id, impl->trans->outputs[0].buffer_id);
return false;
}
if ((bid = find_buffer (stream, id)) && !bid->used) {
bid->used = true; bid->used = true;
impl->trans->outputs[0].buffer_id = id; impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_RESULT_OK; impl->trans->outputs[0].status = SPA_RESULT_OK;
send_have_output (stream); send_have_output (stream);
} else {
pinos_log_debug ("stream %p: output %u was used", stream, id);
} }
return true; return true;

View file

@ -60,14 +60,27 @@ static void
transport_setup_area (void *p, PinosTransport *trans) transport_setup_area (void *p, PinosTransport *trans)
{ {
PinosTransportArea *a; PinosTransportArea *a;
int i;
trans->area = a = p; trans->area = a = p;
p = SPA_MEMBER (p, sizeof (PinosTransportArea), SpaPortInput); p = SPA_MEMBER (p, sizeof (PinosTransportArea), SpaPortInput);
trans->inputs = p; trans->inputs = p;
for (i = 0; i < a->max_inputs; i++) {
trans->inputs[i].state = SPA_PORT_STATE_FLAG_NONE;
trans->inputs[i].flags = SPA_PORT_INPUT_FLAG_NONE;
trans->inputs[i].buffer_id = SPA_ID_INVALID;
trans->inputs[i].status = SPA_RESULT_OK;
}
p = SPA_MEMBER (p, a->max_inputs * sizeof (SpaPortInput), void); p = SPA_MEMBER (p, a->max_inputs * sizeof (SpaPortInput), void);
trans->outputs = p; trans->outputs = p;
for (i = 0; i < a->max_outputs; i++) {
trans->outputs[i].state = SPA_PORT_STATE_FLAG_NONE;
trans->outputs[i].flags = SPA_PORT_OUTPUT_FLAG_NONE;
trans->outputs[i].buffer_id = SPA_ID_INVALID;
trans->outputs[i].status = SPA_RESULT_OK;
}
p = SPA_MEMBER (p, a->max_outputs * sizeof (SpaPortOutput), void); p = SPA_MEMBER (p, a->max_outputs * sizeof (SpaPortOutput), void);
trans->input_buffer = p; trans->input_buffer = p;

View file

@ -458,7 +458,7 @@ on_new_buffer (PinosListener *listener,
GstPinosSink *pinossink = SPA_CONTAINER_OF (listener, GstPinosSink, stream_new_buffer); GstPinosSink *pinossink = SPA_CONTAINER_OF (listener, GstPinosSink, stream_new_buffer);
GstBuffer *buf; GstBuffer *buf;
GST_LOG_OBJECT (pinossink, "got new buffer"); GST_LOG_OBJECT (pinossink, "got new buffer %u", id);
if (pinossink->stream == NULL) { if (pinossink->stream == NULL) {
GST_LOG_OBJECT (pinossink, "no stream"); GST_LOG_OBJECT (pinossink, "no stream");
return; return;
@ -498,10 +498,12 @@ do_send_buffer (GstPinosSink *pinossink)
d->chunk->size = mem->size; d->chunk->size = mem->size;
} }
if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) if (!(res = pinos_stream_send_buffer (pinossink->stream, data->id))) {
g_warning ("can't send buffer"); g_warning ("can't send buffer");
gst_buffer_unref (buffer);
pinossink->need_ready--; pinos_thread_main_loop_signal (pinossink->main_loop, FALSE);
} else
pinossink->need_ready--;
} }
@ -683,8 +685,8 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_buffer_ref (buffer); gst_buffer_ref (buffer);
g_queue_push_tail (&pinossink->queue, buffer); g_queue_push_tail (&pinossink->queue, buffer);
if (pinossink->need_ready) // if (pinossink->need_ready)
do_send_buffer (pinossink); // do_send_buffer (pinossink);
done: done:
pinos_thread_main_loop_unlock (pinossink->main_loop); pinos_thread_main_loop_unlock (pinossink->main_loop);

View file

@ -171,13 +171,12 @@ send_need_input (SpaProxy *this)
{ {
PinosNode *pnode = this->pnode; PinosNode *pnode = this->pnode;
SpaNodeEventNeedInput ni; SpaNodeEventNeedInput ni;
uint8_t cmd = 0; uint64_t cmd = 1;
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni); ni.event.size = sizeof (ni);
ni.port_id = 0;
pinos_transport_add_event (pnode->transport, &ni.event); pinos_transport_add_event (pnode->transport, &ni.event);
write (this->data_source.fd, &cmd, 1); write (this->data_source.fd, &cmd, 8);
} }
static void static void
@ -185,13 +184,12 @@ send_have_output (SpaProxy *this)
{ {
PinosNode *pnode = this->pnode; PinosNode *pnode = this->pnode;
SpaNodeEventHaveOutput ho; SpaNodeEventHaveOutput ho;
uint8_t cmd = 0; uint64_t cmd = 1;
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho); ho.event.size = sizeof (ho);
ho.port_id = 0;
pinos_transport_add_event (pnode->transport, &ho.event); pinos_transport_add_event (pnode->transport, &ho.event);
write (this->data_source.fd, &cmd, 1); write (this->data_source.fd, &cmd, 8);
} }
static SpaResult static SpaResult
@ -857,7 +855,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
SpaProxy *this; SpaProxy *this;
SpaNodeEventReuseBuffer rb; SpaNodeEventReuseBuffer rb;
PinosNode *pnode; PinosNode *pnode;
uint8_t cmd = 0; //uint64_t cmd = 1;
if (node == NULL) if (node == NULL)
return SPA_RESULT_INVALID_ARGUMENTS; return SPA_RESULT_INVALID_ARGUMENTS;
@ -873,7 +871,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
rb.port_id = port_id; rb.port_id = port_id;
rb.buffer_id = buffer_id; rb.buffer_id = buffer_id;
pinos_transport_add_event (pnode->transport, &rb.event); pinos_transport_add_event (pnode->transport, &rb.event);
write (this->data_source.fd, &cmd, 1); //write (this->data_source.fd, &cmd, 8);
return SPA_RESULT_OK; return SPA_RESULT_OK;
} }
@ -1060,9 +1058,9 @@ proxy_on_data_fd_events (SpaSource *source)
if (source->rmask & SPA_IO_IN) { if (source->rmask & SPA_IO_IN) {
SpaNodeEvent event; SpaNodeEvent event;
uint8_t cmd; uint64_t cmd;
read (this->data_source.fd, &cmd, 1); read (this->data_source.fd, &cmd, 8);
while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) { while (pinos_transport_next_event (pnode->transport, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (event.size); SpaNodeEvent *ev = alloca (event.size);
@ -1338,15 +1336,22 @@ pinos_client_node_get_data_socket (PinosClientNode *this,
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this); PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this);
if (impl->data_fd == -1) { if (impl->data_fd == -1) {
#if 1
int fd[2]; int fd[2];
if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0) if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fd) != 0)
return SPA_RESULT_ERRNO; return SPA_RESULT_ERRNO;
impl->proxy.data_source.fd = fd[0]; impl->proxy.data_source.fd = fd[0];
spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source);
pinos_log_debug ("client-node %p: add data fd %d", this, fd[0]);
impl->data_fd = fd[1]; impl->data_fd = fd[1];
#else
impl->proxy.data_source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
impl->data_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
#endif
spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source);
pinos_log_debug ("client-node %p: add data fd %d", this, impl->proxy.data_source.fd);
} }
*fd = impl->data_fd; *fd = impl->data_fd;
return SPA_RESULT_OK; return SPA_RESULT_OK;

View file

@ -42,6 +42,7 @@ typedef struct
pthread_t thread; pthread_t thread;
} PinosDataLoopImpl; } PinosDataLoopImpl;
static void static void
make_realtime (PinosDataLoop *this) make_realtime (PinosDataLoop *this)
{ {
@ -57,7 +58,7 @@ make_realtime (PinosDataLoop *this)
spa_zero (sp); spa_zero (sp);
sp.sched_priority = rtprio; sp.sched_priority = rtprio;
if (pthread_setschedparam (pthread_self(), SCHED_RR|SCHED_RESET_ON_FORK, &sp) == 0) { if (pthread_setschedparam (pthread_self(), SCHED_OTHER|SCHED_RESET_ON_FORK, &sp) == 0) {
pinos_log_debug ("SCHED_OTHER|SCHED_RESET_ON_FORK worked."); pinos_log_debug ("SCHED_OTHER|SCHED_RESET_ON_FORK worked.");
return; return;
} }

View file

@ -716,8 +716,6 @@ pinos_link_activate (PinosLink *this)
{ {
PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this); PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this);
spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue));
pinos_work_queue_add (impl->work, pinos_work_queue_add (impl->work,
this, this,
SPA_RESULT_WAIT_SYNC, SPA_RESULT_WAIT_SYNC,
@ -729,7 +727,6 @@ pinos_link_activate (PinosLink *this)
bool bool
pinos_pinos_link_deactivate (PinosLink *this) pinos_pinos_link_deactivate (PinosLink *this)
{ {
spa_ringbuffer_clear (&this->ringbuffer);
return true; return true;
} }

View file

@ -68,9 +68,6 @@ struct _PinosLink {
PinosLink *link, PinosLink *link,
PinosPort *port)); PinosPort *port));
uint32_t queue[64];
SpaRingbuffer ringbuffer;
struct { struct {
unsigned int in_ready; unsigned int in_ready;
PinosPort *input; PinosPort *input;

View file

@ -246,37 +246,6 @@ send_clock_update (PinosNode *this)
pinos_log_debug ("got error %d", res); pinos_log_debug ("got error %d", res);
} }
static SpaResult
do_read_link (SpaLoop *loop,
bool async,
uint32_t seq,
size_t size,
void *data,
void *user_data)
{
PinosNode *this = user_data;
PinosLink *link = ((PinosLink**)data)[0];
size_t offset;
SpaResult res;
if (link->rt.input == NULL)
return SPA_RESULT_OK;
while (link->rt.in_ready > 0 && spa_ringbuffer_get_read_offset (&link->ringbuffer, &offset) > 0) {
SpaPortInput *input = &this->transport->inputs[link->rt.input->port_id];
input->buffer_id = link->queue[offset];
if ((res = spa_node_process_input (link->rt.input->node->node)) < 0)
pinos_log_warn ("node %p: error pushing buffer: %d, %d", this, res, input->status);
spa_ringbuffer_read_advance (&link->ringbuffer, 1);
link->rt.in_ready--;
}
return SPA_RESULT_OK;
}
static void static void
on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
{ {
@ -303,59 +272,84 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
case SPA_NODE_EVENT_TYPE_NEED_INPUT: case SPA_NODE_EVENT_TYPE_NEED_INPUT:
{ {
SpaNodeEventNeedInput *ni = (SpaNodeEventNeedInput *) event; SpaResult res;
PinosPort *port = this->input_port_map[ni->port_id]; int i;
PinosLink *link; bool processed = false;
spa_list_for_each (link, &port->rt.links, rt.input_link) { // pinos_log_debug ("node %p: need input", this);
if (link->rt.input == NULL || link->rt.output == NULL)
for (i = 0; i < this->transport->area->n_inputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
pi = &this->transport->inputs[i];
if (pi->buffer_id != SPA_ID_INVALID)
continue; continue;
link->rt.in_ready++; inport = this->input_port_map[i];
pinos_loop_invoke (link->rt.input->node->data_loop->loop, spa_list_for_each (link, &inport->rt.links, rt.input_link) {
do_read_link, if (link->rt.input == NULL || link->rt.output == NULL)
SPA_ID_INVALID, continue;
sizeof (PinosLink *),
&link, outport = link->rt.output;
link->rt.input->node); po = &outport->node->transport->outputs[outport->port_id];
if (po->buffer_id != SPA_ID_INVALID) {
processed = true;
pi->buffer_id = po->buffer_id;
po->buffer_id = SPA_ID_INVALID;
}
if ((res = spa_node_process_output (outport->node->node)) < 0)
pinos_log_warn ("node %p: got process output %d", outport->node, res);
}
}
if (processed) {
if ((res = spa_node_process_input (this->node)) < 0)
pinos_log_warn ("node %p: got process input %d", this, res);
} }
break; break;
} }
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT: case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
{ {
SpaNodeEventHaveOutput *ho = (SpaNodeEventHaveOutput *) event;
SpaResult res; SpaResult res;
bool pushed = false; int i;
SpaPortOutput *po = &this->transport->outputs[ho->port_id]; bool processed = false;
PinosPort *port = this->output_port_map[ho->port_id];
PinosLink *link;
spa_list_for_each (link, &port->rt.links, rt.output_link) { // pinos_log_debug ("node %p: have output", this);
size_t offset;
if (link->rt.input == NULL || link->rt.output == NULL) for (i = 0; i < this->transport->area->n_outputs; i++) {
PinosLink *link;
PinosPort *inport, *outport;
SpaPortInput *pi;
SpaPortOutput *po;
po = &this->transport->outputs[i];
if (po->buffer_id == SPA_ID_INVALID)
continue; continue;
if (spa_ringbuffer_get_write_offset (&link->ringbuffer, &offset) > 0) { outport = this->output_port_map[i];
link->queue[offset] = po->buffer_id; spa_list_for_each (link, &outport->rt.links, rt.output_link) {
spa_ringbuffer_write_advance (&link->ringbuffer, 1); if (link->rt.input == NULL || link->rt.output == NULL)
continue;
pinos_loop_invoke (link->rt.input->node->data_loop->loop, inport = link->rt.input;
do_read_link, pi = &inport->node->transport->inputs[inport->port_id];
SPA_ID_INVALID,
sizeof (PinosLink *), processed = true;
&link,
link->rt.input->node); pi->buffer_id = po->buffer_id;
pushed = true;
if ((res = spa_node_process_input (inport->node->node)) < 0)
pinos_log_warn ("node %p: got process input %d", inport->node, res);
} }
po->buffer_id = SPA_ID_INVALID;
} }
if (!pushed) { if (processed) {
if ((res = spa_node_port_reuse_buffer (node, ho->port_id, po->buffer_id)) < 0) if ((res = spa_node_process_output (this->node)) < 0)
pinos_log_warn ("node %p: error reuse buffer: %d", node, res); pinos_log_warn ("node %p: got process output %d", this, res);
}
if ((res = spa_node_process_output (node)) < 0) {
pinos_log_warn ("node %p: got pull error %d, %d", this, res, po->status);
break;
} }
break; break;
} }
@ -366,6 +360,8 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data)
PinosPort *port = this->input_port_map[rb->port_id]; PinosPort *port = this->input_port_map[rb->port_id];
PinosLink *link; PinosLink *link;
// pinos_log_debug ("node %p: reuse buffer %u", this, rb->buffer_id);
spa_list_for_each (link, &port->rt.links, rt.input_link) { spa_list_for_each (link, &port->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL) if (link->rt.input == NULL || link->rt.output == NULL)
continue; continue;

View file

@ -54,7 +54,6 @@ struct _SpaSource {
int fd; int fd;
SpaIO mask; SpaIO mask;
SpaIO rmask; SpaIO rmask;
void *loop_private;
}; };
typedef SpaResult (*SpaInvokeFunc) (SpaLoop *loop, typedef SpaResult (*SpaInvokeFunc) (SpaLoop *loop,

View file

@ -78,12 +78,10 @@ typedef struct {
typedef struct { typedef struct {
SpaNodeEvent event; SpaNodeEvent event;
uint32_t port_id;
} SpaNodeEventHaveOutput; } SpaNodeEventHaveOutput;
typedef struct { typedef struct {
SpaNodeEvent event; SpaNodeEvent event;
uint32_t port_id;
} SpaNodeEventNeedInput; } SpaNodeEventNeedInput;
typedef struct { typedef struct {

View file

@ -32,7 +32,7 @@
typedef struct _SpaALSAState SpaALSASink; typedef struct _SpaALSAState SpaALSASink;
static const char default_device[] = "default"; static const char default_device[] = "default";
static const uint32_t default_period_size = 128; static const uint32_t default_period_size = 32;
static const uint32_t default_periods = 2; static const uint32_t default_periods = 2;
static const bool default_period_event = 0; static const bool default_period_event = 0;
@ -691,6 +691,7 @@ spa_alsa_sink_node_process_input (SpaNode *node)
b = &this->buffers[input->buffer_id]; b = &this->buffers[input->buffer_id];
if (!b->outstanding) { if (!b->outstanding) {
input->buffer_id = SPA_ID_INVALID;
input->status = SPA_RESULT_INVALID_BUFFER_ID; input->status = SPA_RESULT_INVALID_BUFFER_ID;
return SPA_RESULT_ERROR; return SPA_RESULT_ERROR;
} }
@ -700,7 +701,9 @@ spa_alsa_sink_node_process_input (SpaNode *node)
} else { } else {
spa_list_insert (this->ready.prev, &b->link); spa_list_insert (this->ready.prev, &b->link);
} }
//spa_log_debug (this->log, "alsa-source: got buffer %u", input->buffer_id);
b->outstanding = false; b->outstanding = false;
input->buffer_id = SPA_ID_INVALID;
input->status = SPA_RESULT_OK; input->status = SPA_RESULT_OK;
return SPA_RESULT_OK; return SPA_RESULT_OK;

View file

@ -39,7 +39,7 @@ update_state (SpaALSASource *this, SpaNodeState state)
} }
static const char default_device[] = "hw:0"; static const char default_device[] = "hw:0";
static const uint32_t default_period_size = 128; static const uint32_t default_period_size = 32;
static const uint32_t default_periods = 2; static const uint32_t default_periods = 2;
static const bool default_period_event = 0; static const bool default_period_event = 0;

View file

@ -175,22 +175,6 @@ spa_alsa_set_format (SpaALSAState *state, SpaFormatAudio *fmt, SpaPortFormatFlag
CHECK (snd_pcm_hw_params_set_buffer_size (hndl, params, state->buffer_frames), "set_buffer_size"); CHECK (snd_pcm_hw_params_set_buffer_size (hndl, params, state->buffer_frames), "set_buffer_size");
#if 0
/* set the buffer time */
buffer_time = props->buffer_time;
CHECK (snd_pcm_hw_params_set_buffer_time_near (hndl, params, &buffer_time, &dir), "set_buffer_time_near");
CHECK (snd_pcm_hw_params_get_buffer_size (params, &size), "get_buffer_size");
state->buffer_frames = size;
/* set the period time */
period_time = props->period_time;
CHECK (snd_pcm_hw_params_set_period_time_near (hndl, params, &period_time, &dir), "set_period_time_near");
CHECK (snd_pcm_hw_params_get_period_size (params, &size, &dir), "get_period_size");
state->period_frames = size;
#endif
spa_log_info (state->log, "buffer frames %zd, period frames %zd", state->buffer_frames, state->period_frames); spa_log_info (state->log, "buffer frames %zd, period frames %zd", state->buffer_frames, state->period_frames);
/* write the parameters to device */ /* write the parameters to device */
@ -216,22 +200,22 @@ set_swparams (SpaALSAState *state)
/* start the transfer */ /* start the transfer */
CHECK (snd_pcm_sw_params_set_start_threshold (hndl, params, 0U), "set_start_threshold"); CHECK (snd_pcm_sw_params_set_start_threshold (hndl, params, 0U), "set_start_threshold");
#if 1
CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params,
(state->buffer_frames / state->period_frames) * state->period_frames), "set_stop_threshold"); (state->buffer_frames / state->period_frames) * state->period_frames), "set_stop_threshold");
// CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, -1), "set_stop_threshold"); #else
CHECK (snd_pcm_sw_params_set_stop_threshold (hndl, params, -1), "set_stop_threshold");
#endif
CHECK (snd_pcm_sw_params_set_silence_threshold (hndl, params, 0U), "set_silence_threshold"); CHECK (snd_pcm_sw_params_set_silence_threshold (hndl, params, 0U), "set_silence_threshold");
/* enable period events when requested */
CHECK (snd_pcm_sw_params_set_period_event (hndl, params, props->period_event ? 1 : 0), "set_period_event");
#if 1 #if 1
/* allow the transfer when at least period_size samples can be processed */ /* allow the transfer when at least period_size samples can be processed */
/* or disable this mechanism when period event is enabled (aka interrupt like style processing) */ /* or disable this mechanism when period event is enabled (aka interrupt like style processing) */
CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, state->period_frames), "set_avail_min");
props->period_event ? state->buffer_frames : state->period_frames), "set_avail_min");
/* enable period events when requested */
if (props->period_event) {
CHECK (snd_pcm_sw_params_set_period_event (hndl, params, 1), "set_period_event");
}
#else #else
CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, 0), "set_avail_min"); CHECK (snd_pcm_sw_params_set_avail_min (hndl, params, 0), "set_avail_min");
#endif #endif
@ -286,7 +270,6 @@ pull_frames_queue (SpaALSAState *state,
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni); ni.event.size = sizeof (ni);
ni.port_id = 0;
state->event_cb (&state->node, &ni.event, state->user_data); state->event_cb (&state->node, &ni.event, state->user_data);
} }
if (!spa_list_is_empty (&state->ready)) { if (!spa_list_is_empty (&state->ready)) {
@ -384,7 +367,6 @@ mmap_write (SpaALSAState *state)
snd_pcm_sframes_t avail; snd_pcm_sframes_t avail;
snd_pcm_uframes_t offset, frames, size; snd_pcm_uframes_t offset, frames, size;
const snd_pcm_channel_area_t *my_areas; const snd_pcm_channel_area_t *my_areas;
SpaNodeEventNeedInput ni;
#if 0 #if 0
snd_pcm_status_t *status; snd_pcm_status_t *status;
@ -404,13 +386,6 @@ mmap_write (SpaALSAState *state)
} }
#endif #endif
#if 0
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni);
ni.port_id = 0;
state->event_cb (&state->node, &ni.event, state->user_data);
#endif
size = avail; size = avail;
while (size > 0) { while (size > 0) {
frames = size; frames = size;
@ -418,6 +393,10 @@ mmap_write (SpaALSAState *state)
spa_log_error (state->log, "snd_pcm_mmap_begin error: %s", snd_strerror(err)); spa_log_error (state->log, "snd_pcm_mmap_begin error: %s", snd_strerror(err));
return -1; return -1;
} }
if (frames < state->period_frames)
break;
else
frames = state->period_frames;
if (state->ringbuffer) if (state->ringbuffer)
frames = pull_frames_ringbuffer (state, my_areas, offset, frames); frames = pull_frames_ringbuffer (state, my_areas, offset, frames);
@ -524,43 +503,75 @@ mmap_read (SpaALSAState *state)
} }
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho); ho.event.size = sizeof (ho);
ho.port_id = 0;
state->event_cb (&state->node, &ho.event, state->user_data); state->event_cb (&state->node, &ho.event, state->user_data);
} }
return 0; return 0;
} }
static inline short
spa_io_to_poll (SpaIO mask)
{
short events = 0;
if (mask & SPA_IO_IN)
events |= POLLIN;
if (mask & SPA_IO_OUT)
events |= POLLOUT;
if (mask & SPA_IO_ERR)
events |= POLLERR;
if (mask & SPA_IO_HUP)
events |= POLLHUP;
return events;
}
static inline SpaIO
spa_poll_to_io (short events)
{
SpaIO mask = 0;
if (events & POLLIN)
mask |= SPA_IO_IN;
if (events & POLLOUT)
mask |= SPA_IO_OUT;
if (events & POLLERR)
mask |= SPA_IO_ERR;
if (events & POLLHUP)
mask |= SPA_IO_HUP;
return mask;
}
static void static void
alsa_on_fd_events (SpaSource *source) alsa_on_fd_events (SpaSource *source)
{ {
SpaALSAState *state = source->data; SpaALSAState *state = source->data;
snd_pcm_t *hndl = state->hndl; snd_pcm_t *hndl = state->hndl;
int err; int err, i;
unsigned short revents = 0; unsigned short revents = 0;
#if 0 for (i = 0; i < state->n_fds; i++) {
state->fds[i].revents = spa_io_to_poll (state->sources[i].rmask);
state->sources[i].rmask = 0;
}
snd_pcm_poll_descriptors_revents (hndl, snd_pcm_poll_descriptors_revents (hndl,
state->fds, state->fds,
state->n_fds, state->n_fds,
&revents); &revents);
if (revents & POLLERR) {
spa_log_debug (state->log, "revents: %d %d", revents, state->n_fds);
#endif
revents = source->rmask;
if (revents & SPA_IO_ERR) {
if ((err = xrun_recovery (state, hndl, err)) < 0) { if ((err = xrun_recovery (state, hndl, err)) < 0) {
spa_log_error (state->log, "error: %s", snd_strerror (err)); spa_log_error (state->log, "error: %s", snd_strerror (err));
} }
} }
if (state->stream == SND_PCM_STREAM_CAPTURE) { if (state->stream == SND_PCM_STREAM_CAPTURE) {
if (!(revents & SPA_IO_IN)) if (!(revents & POLLIN))
return; return;
mmap_read (state); mmap_read (state);
} else { } else {
if (!(revents & SPA_IO_OUT)) if (!(revents & POLLOUT))
return; return;
mmap_write (state); mmap_write (state);
@ -601,15 +612,9 @@ spa_alsa_start (SpaALSAState *state, bool xrun_recover)
state->sources[i].func = alsa_on_fd_events; state->sources[i].func = alsa_on_fd_events;
state->sources[i].data = state; state->sources[i].data = state;
state->sources[i].fd = state->fds[i].fd; state->sources[i].fd = state->fds[i].fd;
state->sources[i].mask = 0; state->sources[i].mask = spa_poll_to_io (state->fds[i].events);
if (state->fds[i].events & POLLIN) state->sources[i].rmask = 0;
state->sources[i].mask |= SPA_IO_IN; state->fds[i].revents = 0;
if (state->fds[i].events & POLLOUT)
state->sources[i].mask |= SPA_IO_OUT;
if (state->fds[i].events & POLLERR)
state->sources[i].mask |= SPA_IO_ERR;
if (state->fds[i].events & POLLHUP)
state->sources[i].mask |= SPA_IO_HUP;
spa_loop_add_source (state->data_loop, &state->sources[i]); spa_loop_add_source (state->data_loop, &state->sources[i]);
} }

View file

@ -581,7 +581,6 @@ pull_port (SpaAudioMixer *this, uint32_t port_id, SpaPortOutput *output, size_t
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT; ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni); ni.event.size = sizeof (ni);
ni.port_id = port_id;
this->event_cb (&this->node, &ni.event, this->user_data); this->event_cb (&this->node, &ni.event, this->user_data);
} }

View file

@ -234,7 +234,6 @@ send_have_output (SpaAudioTestSrc *this)
if (this->event_cb) { if (this->event_cb) {
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho); ho.event.size = sizeof (ho);
ho.port_id = 0;
this->event_cb (&this->node, &ho.event, this->user_data); this->event_cb (&this->node, &ho.event, this->user_data);
} }

View file

@ -906,7 +906,6 @@ v4l2_on_fd_events (SpaSource *source)
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho); ho.event.size = sizeof (ho);
ho.port_id = 0;
this->event_cb (&this->node, &ho.event, this->user_data); this->event_cb (&this->node, &ho.event, this->user_data);
} }

View file

@ -199,7 +199,6 @@ send_have_output (SpaVideoTestSrc *this)
if (this->event_cb) { if (this->event_cb) {
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho); ho.event.size = sizeof (ho);
ho.port_id = 0;
this->event_cb (&this->node, &ho.event, this->user_data); this->event_cb (&this->node, &ho.event, this->user_data);
} }