From 474981ddda624a4cc7e5f74571fdf2124018587d Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 12 Jan 2017 16:48:17 +0100 Subject: [PATCH] Refactor the work queue Make a separate work queue to track async operations. Keep separate work queues for links and nodes. This avoids lockups when some async operation take a long time and the work queue has SYNC_WAIT operations. --- pinos/modules/module-protocol-native.c | 21 +-- pinos/server/link.c | 51 +++--- pinos/server/main-loop.c | 158 ------------------ pinos/server/main-loop.h | 22 --- pinos/server/meson.build | 2 + pinos/server/node.c | 33 ++-- pinos/server/port.c | 9 - pinos/server/work-queue.c | 218 +++++++++++++++++++++++++ pinos/server/work-queue.h | 68 ++++++++ 9 files changed, 344 insertions(+), 238 deletions(-) create mode 100644 pinos/server/work-queue.c create mode 100644 pinos/server/work-queue.h diff --git a/pinos/modules/module-protocol-native.c b/pinos/modules/module-protocol-native.c index 48eceab81..5e2aebe93 100644 --- a/pinos/modules/module-protocol-native.c +++ b/pinos/modules/module-protocol-native.c @@ -80,19 +80,6 @@ typedef struct { PinosConnection *connection; } PinosProtocolNativeClient; -static void -sync_destroy (void *object, - void *data, - SpaResult res, - uint32_t id) -{ - PinosProtocolNativeClient *this = object; - - pinos_connection_destroy (this->connection); - close (this->fd); - free (this); -} - static void client_destroy (PinosProtocolNativeClient *this) { @@ -101,11 +88,9 @@ client_destroy (PinosProtocolNativeClient *this) pinos_client_destroy (this->client); spa_list_remove (&this->link); - pinos_main_loop_defer (this->impl->core->main_loop, - this, - SPA_RESULT_WAIT_SYNC, - sync_destroy, - this); + pinos_connection_destroy (this->connection); + close (this->fd); + free (this); } static SpaResult diff --git a/pinos/server/link.c b/pinos/server/link.c index f51fc366e..7857a6ca9 100644 --- a/pinos/server/link.c +++ b/pinos/server/link.c @@ -25,6 +25,7 @@ #include "pinos/client/pinos.h" #include "pinos/server/link.h" +#include "pinos/server/work-queue.h" #define MAX_BUFFERS 16 @@ -34,6 +35,7 @@ typedef struct int refcount; + PinosWorkQueue *work; uint32_t seq; SpaFormat **format_filter; @@ -72,6 +74,7 @@ pinos_link_update_state (PinosLink *link, static SpaResult do_negotiate (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) { + PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this); SpaResult res; SpaFormat *filter = NULL, *format; void *istate = NULL, *ostate = NULL; @@ -150,7 +153,7 @@ again: asprintf (&error, "error set output format: %d", res); goto error; } - pinos_main_loop_defer (this->core->main_loop, this->output->node, res, NULL, NULL); + pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL); } else if (in_state == SPA_NODE_STATE_CONFIGURE) { pinos_log_debug ("link %p: doing set format on input", this); if ((res = spa_node_port_set_format (this->input->node->node, @@ -161,7 +164,7 @@ again: asprintf (&error, "error set input format: %d", res); goto error; } - pinos_main_loop_defer (this->core->main_loop, this->input->node, res, NULL, NULL); + pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL); } return res; @@ -479,7 +482,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) asprintf (&error, "error alloc output buffers: %d", res); goto error; } - pinos_main_loop_defer (this->core->main_loop, this->output->node, res, NULL, NULL); + pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL); this->output->buffers = impl->buffers; this->output->n_buffers = impl->n_buffers; this->output->allocated = true; @@ -495,7 +498,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) asprintf (&error, "error alloc input buffers: %d", res); goto error; } - pinos_main_loop_defer (this->core->main_loop, this->input->node, res, NULL, NULL); + pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL); this->input->buffers = impl->buffers; this->input->n_buffers = impl->n_buffers; this->input->allocated = true; @@ -515,7 +518,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) asprintf (&error, "error use input buffers: %d", res); goto error; } - pinos_main_loop_defer (this->core->main_loop, this->input->node, res, NULL, NULL); + pinos_work_queue_add (impl->work, this->input->node, res, NULL, NULL); this->input->buffers = impl->buffers; this->input->n_buffers = impl->n_buffers; this->input->allocated = false; @@ -530,7 +533,7 @@ do_allocation (PinosLink *this, SpaNodeState in_state, SpaNodeState out_state) asprintf (&error, "error use output buffers: %d", res); goto error; } - pinos_main_loop_defer (this->core->main_loop, this->output->node, res, NULL, NULL); + pinos_work_queue_add (impl->work, this->output->node, res, NULL, NULL); this->output->buffers = impl->buffers; this->output->n_buffers = impl->n_buffers; this->output->allocated = false; @@ -581,6 +584,7 @@ check_states (PinosLink *this, void *user_data, SpaResult res) { + PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this); SpaNodeState in_state, out_state; again: @@ -616,11 +620,11 @@ again: return SPA_RESULT_OK; exit: - pinos_main_loop_defer (this->core->main_loop, - this, - SPA_RESULT_WAIT_SYNC, - (PinosDeferFunc) check_states, - this); + pinos_work_queue_add (impl->work, + this, + SPA_RESULT_WAIT_SYNC, + (PinosWorkFunc) check_states, + this); return res; } @@ -631,10 +635,9 @@ on_input_async_complete_notify (PinosListener *listener, SpaResult res) { PinosLinkImpl *impl = SPA_CONTAINER_OF (listener, PinosLinkImpl, input_async_complete); - PinosLink *this = &impl->this; pinos_log_debug ("link %p: node %p async complete %d %d", impl, node, seq, res); - pinos_main_loop_defer_complete (this->core->main_loop, impl, seq, res); + pinos_work_queue_complete (impl->work, node, seq, res); } static void @@ -644,10 +647,9 @@ on_output_async_complete_notify (PinosListener *listener, SpaResult res) { PinosLinkImpl *impl = SPA_CONTAINER_OF (listener, PinosLinkImpl, output_async_complete); - PinosLink *this = &impl->this; pinos_log_debug ("link %p: node %p async complete %d %d", impl, node, seq, res); - pinos_main_loop_defer_complete (this->core->main_loop, impl, seq, res); + pinos_work_queue_complete (impl->work, node, seq, res); } static void @@ -708,12 +710,15 @@ on_output_port_destroy (PinosListener *listener, bool pinos_link_activate (PinosLink *this) { + PinosLinkImpl *impl = SPA_CONTAINER_OF (this, PinosLinkImpl, this); + spa_ringbuffer_init (&this->ringbuffer, SPA_N_ELEMENTS (this->queue)); - pinos_main_loop_defer (this->core->main_loop, - this, - SPA_RESULT_WAIT_SYNC, - (PinosDeferFunc) check_states, - this); + + pinos_work_queue_add (impl->work, + this, + SPA_RESULT_WAIT_SYNC, + (PinosWorkFunc) check_states, + this); return true; } @@ -830,6 +835,8 @@ pinos_link_new (PinosCore *core, this = &impl->this; pinos_log_debug ("link %p: new", this); + impl->work = pinos_work_queue_new (core->main_loop->loop); + this->core = core; this->properties = properties; this->state = PINOS_LINK_STATE_INIT; @@ -975,6 +982,10 @@ pinos_link_destroy (PinosLink * this) pinos_global_destroy (this->global); spa_list_remove (&this->link); + pinos_work_queue_cancel (impl->work, + this, + SPA_ID_INVALID); + spa_list_for_each_safe (resource, tmp, &this->resource_list, link) pinos_resource_destroy (resource); diff --git a/pinos/server/main-loop.c b/pinos/server/main-loop.c index b4920cbf2..aee4b7a1f 100644 --- a/pinos/server/main-loop.c +++ b/pinos/server/main-loop.c @@ -27,156 +27,13 @@ #include "pinos/client/log.h" #include "pinos/server/main-loop.h" -typedef struct _WorkItem WorkItem; - -struct _WorkItem { - uint32_t id; - void *obj; - uint32_t seq; - SpaResult res; - PinosDeferFunc func; - void *data; - SpaList link; -}; - typedef struct { PinosMainLoop this; bool running; - SpaSource *wakeup; - - uint32_t counter; - - SpaList work_list; - SpaList free_list; } PinosMainLoopImpl; -static void -process_work_queue (SpaSource *source, - void *data) -{ - PinosMainLoopImpl *impl = data; - PinosMainLoop *this = &impl->this; - WorkItem *item, *tmp; - - spa_list_for_each_safe (item, tmp, &impl->work_list, link) { - if (item->seq != SPA_ID_INVALID) { - pinos_log_debug ("main-loop %p: waiting for item %p %d", this, item->obj, item->seq); - continue; - } - - if (item->res == SPA_RESULT_WAIT_SYNC && item != spa_list_first (&impl->work_list, WorkItem, link)) { - pinos_log_debug ("main-loop %p: sync item %p not head", this, item->obj); - continue; - } - - spa_list_remove (&item->link); - - if (item->func) { - pinos_log_debug ("main-loop %p: process work item %p %d", this, item->obj, item->seq); - item->func (item->obj, item->data, item->res, item->id); - } - spa_list_insert (impl->free_list.prev, &item->link); - } -} - -static uint32_t -main_loop_defer (PinosMainLoop *loop, - void *obj, - SpaResult res, - PinosDeferFunc func, - void *data) -{ - PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this); - WorkItem *item; - bool have_work = false; - - if (!spa_list_is_empty (&impl->free_list)) { - item = spa_list_first (&impl->free_list, WorkItem, link); - spa_list_remove (&item->link); - } else { - item = malloc (sizeof (WorkItem)); - if (item == NULL) - return SPA_ID_INVALID; - } - item->id = ++impl->counter; - item->obj = obj; - item->func = func; - item->data = data; - - if (SPA_RESULT_IS_ASYNC (res)) { - item->seq = SPA_RESULT_ASYNC_SEQ (res); - item->res = res; - pinos_log_debug ("main-loop %p: defer async %d for object %p", loop, item->seq, obj); - } else if (res == SPA_RESULT_WAIT_SYNC) { - pinos_log_debug ("main-loop %p: wait sync object %p", loop, obj); - item->seq = SPA_ID_INVALID; - item->res = res; - have_work = true; - } else { - item->seq = SPA_ID_INVALID; - item->res = res; - have_work = true; - pinos_log_debug ("main-loop %p: defer object %p", loop, obj); - } - spa_list_insert (impl->work_list.prev, &item->link); - - if (have_work) - pinos_loop_signal_event (impl->this.loop, impl->wakeup); - - return item->id; -} - -static void -main_loop_defer_cancel (PinosMainLoop *loop, - void *obj, - uint32_t id) -{ - PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this); - bool have_work = false; - WorkItem *item; - - spa_list_for_each (item, &impl->work_list, link) { - if ((id == 0 || item->id == id) && (obj == NULL || item->obj == obj)) { - pinos_log_debug ("main-loop %p: cancel defer %d for object %p", loop, item->seq, item->obj); - item->seq = SPA_ID_INVALID; - item->func = NULL; - have_work = true; - } - } - if (have_work) - pinos_loop_signal_event (impl->this.loop, impl->wakeup); -} - -static bool -main_loop_defer_complete (PinosMainLoop *loop, - void *obj, - uint32_t seq, - SpaResult res) -{ - WorkItem *item; - PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this); - bool have_work = false; - - spa_list_for_each (item, &impl->work_list, link) { - if (item->obj == obj && item->seq == seq) { - pinos_log_debug ("main-loop %p: found defered %d for object %p", loop, seq, obj); - item->seq = SPA_ID_INVALID; - item->res = res; - have_work = true; - } - } - if (!have_work) { - pinos_log_debug ("main-loop %p: no defered %d found for object %p", loop, seq, obj); - } else { - pinos_loop_signal_event (impl->this.loop, impl->wakeup); - } - - return have_work; -} - - /** * pinos_main_loop_new: * @@ -203,17 +60,6 @@ pinos_main_loop_new (void) pinos_signal_init (&this->destroy_signal); - impl->wakeup = pinos_loop_add_event (this->loop, - process_work_queue, - impl); - - this->defer = main_loop_defer; - this->defer_cancel = main_loop_defer_cancel; - this->defer_complete = main_loop_defer_complete; - - spa_list_init (&impl->work_list); - spa_list_init (&impl->free_list); - return this; no_loop: @@ -225,16 +71,12 @@ void pinos_main_loop_destroy (PinosMainLoop *loop) { PinosMainLoopImpl *impl = SPA_CONTAINER_OF (loop, PinosMainLoopImpl, this); - WorkItem *item, *tmp; pinos_log_debug ("main-loop %p: destroy", impl); pinos_signal_emit (&loop->destroy_signal, loop); - pinos_loop_destroy_source (loop->loop, impl->wakeup); pinos_loop_destroy (loop->loop); - spa_list_for_each_safe (item, tmp, &impl->free_list, link) - free (item); free (impl); } diff --git a/pinos/server/main-loop.h b/pinos/server/main-loop.h index 60cf26049..0be90dc7b 100644 --- a/pinos/server/main-loop.h +++ b/pinos/server/main-loop.h @@ -29,11 +29,6 @@ extern "C" { typedef struct _PinosMainLoop PinosMainLoop; -typedef void (*PinosDeferFunc) (void *obj, - void *data, - SpaResult res, - uint32_t id); - /** * PinosMainLoop: * @@ -44,19 +39,6 @@ struct _PinosMainLoop { PINOS_SIGNAL (destroy_signal, (PinosListener *listener, PinosMainLoop *loop)); - - uint32_t (*defer) (PinosMainLoop *loop, - void *obj, - SpaResult res, - PinosDeferFunc func, - void *data); - void (*defer_cancel) (PinosMainLoop *loop, - void *obj, - uint32_t id); - bool (*defer_complete) (PinosMainLoop *loop, - void *obj, - uint32_t seq, - SpaResult res); }; PinosMainLoop * pinos_main_loop_new (void); @@ -65,10 +47,6 @@ void pinos_main_loop_destroy (PinosMainLoop *loop void pinos_main_loop_run (PinosMainLoop *loop); void pinos_main_loop_quit (PinosMainLoop *loop); -#define pinos_main_loop_defer(m,...) (m)->defer(m,__VA_ARGS__) -#define pinos_main_loop_defer_cancel(m,...) (m)->defer_cancel(m,__VA_ARGS__) -#define pinos_main_loop_defer_complete(m,...) (m)->defer_complete(m,__VA_ARGS__) - #ifdef __cplusplus } #endif diff --git a/pinos/server/meson.build b/pinos/server/meson.build index ff903dded..d1cd97c9e 100644 --- a/pinos/server/meson.build +++ b/pinos/server/meson.build @@ -12,6 +12,7 @@ pinoscore_headers = [ 'node-factory.h', 'port.h', 'resource.h', + 'work-queue.h', ] pinoscore_sources = [ @@ -28,6 +29,7 @@ pinoscore_sources = [ 'node-factory.c', 'port.c', 'resource.c', + 'work-queue.c', ] libpinoscore_c_args = [ diff --git a/pinos/server/node.c b/pinos/server/node.c index ccaf26f03..615c5ec0a 100644 --- a/pinos/server/node.c +++ b/pinos/server/node.c @@ -26,11 +26,14 @@ #include "pinos/server/node.h" #include "pinos/server/data-loop.h" #include "pinos/server/main-loop.h" +#include "pinos/server/work-queue.h" typedef struct { PinosNode this; + PinosWorkQueue *work; + uint32_t seq; bool async_init; @@ -280,6 +283,7 @@ static void on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) { PinosNode *this = user_data; + PinosNodeImpl *impl = SPA_CONTAINER_OF (this, PinosNodeImpl, this); switch (event->type) { case SPA_NODE_EVENT_TYPE_INVALID: @@ -293,7 +297,7 @@ on_node_event (SpaNode *node, SpaNodeEvent *event, void *user_data) SpaNodeEventAsyncComplete *ac = (SpaNodeEventAsyncComplete *) event; pinos_log_debug ("node %p: async complete event %d %d", this, ac->seq, ac->res); - if (!pinos_main_loop_defer_complete (this->core->main_loop, this, ac->seq, ac->res)) { + if (!pinos_work_queue_complete (impl->work, this, ac->seq, ac->res)) { pinos_signal_emit (&this->async_complete, this, ac->seq, ac->res); } break; @@ -498,6 +502,8 @@ pinos_node_new (PinosCore *core, this->core = core; pinos_log_debug ("node %p: new", this); + impl->work = pinos_work_queue_new (this->core->main_loop->loop); + this->name = strdup (name); this->properties = properties; @@ -540,11 +546,11 @@ pinos_node_new (PinosCore *core, init_complete (this); } else { impl->async_init = true; - pinos_main_loop_defer (this->core->main_loop, - this, - SPA_RESULT_RETURN_ASYNC (0), - (PinosDeferFunc) init_complete, - NULL); + pinos_work_queue_add (impl->work, + this, + SPA_RESULT_RETURN_ASYNC (0), + (PinosWorkFunc) init_complete, + NULL); } return this; @@ -651,6 +657,10 @@ pinos_node_destroy (PinosNode * this) spa_list_remove (&this->link); pinos_global_destroy (this->global); + pinos_work_queue_cancel (impl->work, + this, + SPA_ID_INVALID); + spa_list_for_each_safe (resource, tmp, &this->resource_list, link) pinos_resource_destroy (resource); @@ -739,6 +749,7 @@ pinos_node_set_state (PinosNode *node, PinosNodeState state) { SpaResult res = SPA_RESULT_OK; + PinosNodeImpl *impl = SPA_CONTAINER_OF (node, PinosNodeImpl, this); pinos_signal_emit (&node->core->node_state_request, node, state); @@ -767,11 +778,11 @@ pinos_node_set_state (PinosNode *node, if (SPA_RESULT_IS_ERROR (res)) return res; - pinos_main_loop_defer (node->core->main_loop, - node, - res, - (PinosDeferFunc) on_state_complete, - SPA_INT_TO_PTR (state)); + pinos_work_queue_add (impl->work, + node, + res, + (PinosWorkFunc) on_state_complete, + SPA_INT_TO_PTR (state)); return res; } diff --git a/pinos/server/port.c b/pinos/server/port.c index 94a9581ed..6cf4c04cd 100644 --- a/pinos/server/port.c +++ b/pinos/server/port.c @@ -258,10 +258,6 @@ do_remove_link_done (SpaLoop *loop, port->n_buffers = 0; } - pinos_main_loop_defer_complete (node->core->main_loop, - port, - seq, - SPA_RESULT_OK); return SPA_RESULT_OK; } @@ -326,7 +322,6 @@ do_clear_buffers_done (SpaLoop *loop, void *user_data) { PinosPort *port = user_data; - PinosNode *node = port->node; SpaResult res; pinos_log_debug ("port %p: clear buffers finish", port); @@ -338,10 +333,6 @@ do_clear_buffers_done (SpaLoop *loop, port->buffers = NULL; port->n_buffers = 0; - pinos_main_loop_defer_complete (node->core->main_loop, - port, - seq, - res); return res; } diff --git a/pinos/server/work-queue.c b/pinos/server/work-queue.c new file mode 100644 index 000000000..1bac32b22 --- /dev/null +++ b/pinos/server/work-queue.c @@ -0,0 +1,218 @@ +/* Pinos + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include + +#include "pinos/client/log.h" +#include "pinos/server/work-queue.h" + +typedef struct _WorkItem WorkItem; + +struct _WorkItem { + uint32_t id; + void *obj; + uint32_t seq; + SpaResult res; + PinosWorkFunc func; + void *data; + SpaList link; +}; + +typedef struct +{ + PinosWorkQueue this; + + SpaSource *wakeup; + uint32_t counter; + + SpaList work_list; + SpaList free_list; +} PinosWorkQueueImpl; + + +static void +process_work_queue (SpaSource *source, + void *data) +{ + PinosWorkQueueImpl *impl = data; + PinosWorkQueue *this = &impl->this; + WorkItem *item, *tmp; + + spa_list_for_each_safe (item, tmp, &impl->work_list, link) { + if (item->seq != SPA_ID_INVALID) { + pinos_log_debug ("work-queue %p: waiting for item %p %d", this, item->obj, item->seq); + continue; + } + + if (item->res == SPA_RESULT_WAIT_SYNC && item != spa_list_first (&impl->work_list, WorkItem, link)) { + pinos_log_debug ("work-queue %p: sync item %p not head", this, item->obj); + continue; + } + + spa_list_remove (&item->link); + + if (item->func) { + pinos_log_debug ("work-queue %p: process work item %p %d", this, item->obj, item->seq); + item->func (item->obj, item->data, item->res, item->id); + } + spa_list_insert (impl->free_list.prev, &item->link); + } +} + +/** + * pinos_data_loop_new: + * + * Create a new #PinosWorkQueue. + * + * Returns: a new #PinosWorkQueue + */ +PinosWorkQueue * +pinos_work_queue_new (PinosLoop *loop) +{ + PinosWorkQueueImpl *impl; + PinosWorkQueue *this; + + impl = calloc (1, sizeof (PinosWorkQueueImpl)); + pinos_log_debug ("work-queue %p: new", impl); + + this = &impl->this; + this->loop = loop; + pinos_signal_init (&this->destroy_signal); + + impl->wakeup = pinos_loop_add_event (this->loop, + process_work_queue, + impl); + + spa_list_init (&impl->work_list); + spa_list_init (&impl->free_list); + + return this; +} + +void +pinos_work_queue_destroy (PinosWorkQueue * queue) +{ + PinosWorkQueueImpl *impl = SPA_CONTAINER_OF (queue, PinosWorkQueueImpl, this); + WorkItem *item, *tmp; + + pinos_log_debug ("work-queue %p: destroy", impl); + pinos_signal_emit (&queue->destroy_signal, queue); + + pinos_loop_destroy_source (queue->loop, impl->wakeup); + + spa_list_for_each_safe (item, tmp, &impl->free_list, link) + free (item); + + free (impl); +} + +uint32_t +pinos_work_queue_add (PinosWorkQueue *queue, + void *obj, + SpaResult res, + PinosWorkFunc func, + void *data) +{ + PinosWorkQueueImpl *impl = SPA_CONTAINER_OF (queue, PinosWorkQueueImpl, this); + WorkItem *item; + bool have_work = false; + + if (!spa_list_is_empty (&impl->free_list)) { + item = spa_list_first (&impl->free_list, WorkItem, link); + spa_list_remove (&item->link); + } else { + item = malloc (sizeof (WorkItem)); + if (item == NULL) + return SPA_ID_INVALID; + } + item->id = ++impl->counter; + item->obj = obj; + item->func = func; + item->data = data; + + if (SPA_RESULT_IS_ASYNC (res)) { + item->seq = SPA_RESULT_ASYNC_SEQ (res); + item->res = res; + pinos_log_debug ("work-queue %p: defer async %d for object %p", queue, item->seq, obj); + } else if (res == SPA_RESULT_WAIT_SYNC) { + pinos_log_debug ("work-queue %p: wait sync object %p", queue, obj); + item->seq = SPA_ID_INVALID; + item->res = res; + have_work = true; + } else { + item->seq = SPA_ID_INVALID; + item->res = res; + have_work = true; + pinos_log_debug ("work-queue %p: defer object %p", queue, obj); + } + spa_list_insert (impl->work_list.prev, &item->link); + + if (have_work) + pinos_loop_signal_event (impl->this.loop, impl->wakeup); + + return item->id; +} + +void +pinos_work_queue_cancel (PinosWorkQueue *queue, + void *obj, + uint32_t id) +{ + PinosWorkQueueImpl *impl = SPA_CONTAINER_OF (queue, PinosWorkQueueImpl, this); + bool have_work = false; + WorkItem *item; + + spa_list_for_each (item, &impl->work_list, link) { + if ((id == SPA_ID_INVALID || item->id == id) && (obj == NULL || item->obj == obj)) { + pinos_log_debug ("work-queue %p: cancel defer %d for object %p", queue, item->seq, item->obj); + item->seq = SPA_ID_INVALID; + item->func = NULL; + have_work = true; + } + } + if (have_work) + pinos_loop_signal_event (impl->this.loop, impl->wakeup); +} + +bool +pinos_work_queue_complete (PinosWorkQueue *queue, + void *obj, + uint32_t seq, + SpaResult res) +{ + WorkItem *item; + PinosWorkQueueImpl *impl = SPA_CONTAINER_OF (queue, PinosWorkQueueImpl, this); + bool have_work = false; + + spa_list_for_each (item, &impl->work_list, link) { + if (item->obj == obj && item->seq == seq) { + pinos_log_debug ("work-queue %p: found defered %d for object %p", queue, seq, obj); + item->seq = SPA_ID_INVALID; + item->res = res; + have_work = true; + } + } + if (!have_work) { + pinos_log_debug ("work-queue %p: no defered %d found for object %p", queue, seq, obj); + } else { + pinos_loop_signal_event (impl->this.loop, impl->wakeup); + } + return have_work; +} diff --git a/pinos/server/work-queue.h b/pinos/server/work-queue.h new file mode 100644 index 000000000..3440e2d21 --- /dev/null +++ b/pinos/server/work-queue.h @@ -0,0 +1,68 @@ +/* Pinos + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PINOS_WORK_QUEUE_H__ +#define __PINOS_WORK_QUEUE_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +typedef struct _PinosWorkQueue PinosWorkQueue; + +typedef void (*PinosWorkFunc) (void *obj, + void *data, + SpaResult res, + uint32_t id); + +/** + * PinosWorkQueue: + * + * Pinos work queue object. + */ +struct _PinosWorkQueue { + PinosLoop *loop; + + PINOS_SIGNAL (destroy_signal, (PinosListener *listener, + PinosWorkQueue *queue)); +}; + +PinosWorkQueue * pinos_work_queue_new (PinosLoop *loop); +void pinos_work_queue_destroy (PinosWorkQueue *queue); + +uint32_t pinos_work_queue_add (PinosWorkQueue *queue, + void *obj, + SpaResult res, + PinosWorkFunc func, + void *data); +void pinos_work_queue_cancel (PinosWorkQueue *queue, + void *obj, + uint32_t id); +bool pinos_work_queue_complete (PinosWorkQueue *queue, + void *obj, + uint32_t seq, + SpaResult res); + +#ifdef __cplusplus +} +#endif + +#endif /* __PINOS_WORK_QUEUE_H__ */