Refactor the work queue

Make a separate work queue to track async operations. Keep separate
work queues for links and nodes. This avoids lockups when some async
operation take a long time and the work queue has SYNC_WAIT operations.
This commit is contained in:
Wim Taymans 2017-01-12 16:48:17 +01:00
parent 6d4db64767
commit 474981ddda
9 changed files with 344 additions and 238 deletions

View file

@ -25,6 +25,7 @@
#include "pinos/client/pinos.h"
#include "pinos/server/link.h"
#include "pinos/server/work-queue.h"
#define MAX_BUFFERS 16
@ -34,6 +35,7 @@ typedef struct
int refcount;
PinosWorkQueue *work;
uint32_t seq;
SpaFormat **format_filter;
@ -72,6 +74,7 @@ pinos_link_update_state (PinosLink *link,
static SpaResult
do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
{
PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this);
SpaResult res;
SpaFormat *filter = NULL, *format;
void *istate = NULL, *ostate = NULL;
@ -150,7 +153,7 @@ again:
asprintf (&error, "error set output format: %d", res);
goto error;
}
pinos_main_loop_defer (this->core->main_loop, this->output->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL);
} else if (in_state == SPA_NODE_STATE_CONFIGURE) {
pinos_log_debug ("link %p: doing set format on input", this);
if ((res = spa_node_port_set_format (this->input->node->node,
@ -161,7 +164,7 @@ again:
asprintf (&error, "error set input format: %d", res);
goto error;
}
pinos_main_loop_defer (this->core->main_loop, this->input->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
}
return res;
@ -479,7 +482,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
asprintf (&error, "error alloc output buffers: %d", res);
goto error;
}
pinos_main_loop_defer (this->core->main_loop, this->output->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL);
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = true;
@ -495,7 +498,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
asprintf (&error, "error alloc input buffers: %d", res);
goto error;
}
pinos_main_loop_defer (this->core->main_loop, this->input->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
this->input->buffers = impl->buffers;
this->input->n_buffers = impl->n_buffers;
this->input->allocated = true;
@ -515,7 +518,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
asprintf (&error, "error use input buffers: %d", res);
goto error;
}
pinos_main_loop_defer (this->core->main_loop, this->input->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL);
this->input->buffers = impl->buffers;
this->input->n_buffers = impl->n_buffers;
this->input->allocated = false;
@ -530,7 +533,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state)
asprintf (&error, "error use output buffers: %d", res);
goto error;
}
pinos_main_loop_defer (this->core->main_loop, this->output->node, res, NULL, NULL);
pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL);
this->output->buffers = impl->buffers;
this->output->n_buffers = impl->n_buffers;
this->output->allocated = false;
@ -581,6 +584,7 @@ check_states (PinosLink *this,
void *user_data,
SpaResult res)
{
PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this);
SpaNodeState in_state, out_state;
again:
@ -616,11 +620,11 @@ again:
return SPA_RESULT_OK;
exit:
pinos_main_loop_defer (this->core->main_loop,
this,
SPA_RESULT_WAIT_SYNC,
(PinosDeferFunc) check_states,
this);
pinos_work_queue_add (impl->work,
this,
SPA_RESULT_WAIT_SYNC,
(PinosWorkFunc) check_states,
this);
return res;
}
@ -631,10 +635,9 @@ on_input_async_complete_notify (PinosListener *listener,
SpaResult res)
{
PinosLinkImpl *impl = SPA_CONTAINER_OF (listener, PinosLinkImpl, input_async_complete);
PinosLink *this = &impl->this;
pinos_log_debug ("link %p: node %p async complete %d %d", impl, node, seq, res);
pinos_main_loop_defer_complete (this->core->main_loop, impl, seq, res);
pinos_work_queue_complete (impl->work, node, seq, res);
}
static void
@ -644,10 +647,9 @@ on_output_async_complete_notify (PinosListener *listener,
SpaResult res)
{
PinosLinkImpl *impl = SPA_CONTAINER_OF (listener, PinosLinkImpl, output_async_complete);
PinosLink *this = &impl->this;
pinos_log_debug ("link %p: node %p async complete %d %d", impl, node, seq, res);
pinos_main_loop_defer_complete (this->core->main_loop, impl, seq, res);
pinos_work_queue_complete (impl->work, node, seq, res);
}
static void
@ -708,12 +710,15 @@ on_output_port_destroy (PinosListener *listener,
bool
pinos_link_activate (PinosLink *this)
{
PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this);
spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue));
pinos_main_loop_defer (this->core->main_loop,
this,
SPA_RESULT_WAIT_SYNC,
(PinosDeferFunc) check_states,
this);
pinos_work_queue_add (impl->work,
this,
SPA_RESULT_WAIT_SYNC,
(PinosWorkFunc) check_states,
this);
return true;
}
@ -830,6 +835,8 @@ pinos_link_new (PinosCore *core,
this = &impl->this;
pinos_log_debug ("link %p: new", this);
impl->work = pinos_work_queue_new (core->main_loop->loop);
this->core = core;
this->properties = properties;
this->state = PINOS_LINK_STATE_INIT;
@ -975,6 +982,10 @@ pinos_link_destroy (PinosLink * this)
pinos_global_destroy (this->global);
spa_list_remove (&this->link);
pinos_work_queue_cancel (impl->work,
this,
SPA_ID_INVALID);
spa_list_for_each_safe (resource, tmp, &this->resource_list, link)
pinos_resource_destroy (resource);