work-queue: share one work queue for links and nodes

This commit is contained in:
Wim Taymans 2021-10-04 19:36:45 +02:00
parent ff8c3d208e
commit 28744fc5ed
3 changed files with 48 additions and 27 deletions

View file

@ -145,21 +145,26 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat
impl->output_busy_id = SPA_ID_INVALID;
link->output->busy_count--;
}
pw_work_queue_cancel(impl->work, link->output, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->output_link, SPA_ID_INVALID);
if (impl->input_busy_id != SPA_ID_INVALID) {
impl->input_busy_id = SPA_ID_INVALID;
link->input->busy_count--;
}
pw_work_queue_cancel(impl->work, link->input, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->input_link, SPA_ID_INVALID);
}
}
static void complete_ready(void *obj, void *data, int res, uint32_t id)
{
struct pw_impl_port *port = obj;
struct pw_impl_port *port;
struct pw_impl_link *this = data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
if (obj == &this->input_link)
port = this->input;
else
port = this->output;
if (id == impl->input_busy_id) {
impl->input_busy_id = SPA_ID_INVALID;
port->busy_count--;
@ -187,10 +192,18 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
static void complete_paused(void *obj, void *data, int res, uint32_t id)
{
struct pw_impl_port *port = obj;
struct pw_impl_port *port;
struct pw_impl_link *this = data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_impl_port_mix *mix = port == this->input ? &this->rt.in_mix : &this->rt.out_mix;
struct pw_impl_port_mix *mix;
if (obj == &this->input_link) {
port = this->input;
mix = &this->rt.in_mix;
} else {
port = this->output;
mix = &this->rt.out_mix;
}
if (id == impl->input_busy_id) {
impl->input_busy_id = SPA_ID_INVALID;
@ -220,8 +233,14 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
static void complete_sync(void *obj, void *data, int res, uint32_t id)
{
struct pw_impl_port *port = obj;
struct pw_impl_port *port;
struct pw_impl_link *this = data;
if (obj == &this->input_link)
port = this->input;
else
port = this->output;
pw_log_debug("%p: obj:%p port %p complete state:%d: %s", this, obj, port,
port->state, spa_strerror(res));
}
@ -357,10 +376,10 @@ static int do_negotiate(struct pw_impl_link *this)
if (SPA_RESULT_IS_ASYNC(res)) {
output->busy_count++;
res = spa_node_sync(output->node->node, res);
impl->output_busy_id = pw_work_queue_add(impl->work, output, res,
impl->output_busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
complete_ready, this);
} else {
complete_ready(output, this, res, SPA_ID_INVALID);
complete_ready(&this->output_link, this, res, SPA_ID_INVALID);
}
}
if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) {
@ -376,12 +395,12 @@ static int do_negotiate(struct pw_impl_link *this)
if (SPA_RESULT_IS_ASYNC(res2)) {
input->busy_count++;
res2 = spa_node_sync(input->node->node, res2);
impl->input_busy_id = pw_work_queue_add(impl->work, input, res2,
impl->input_busy_id = pw_work_queue_add(impl->work, &this->input_link, res2,
complete_ready, this);
if (res == 0)
res = res2;
} else {
complete_ready(input, this, res2, SPA_ID_INVALID);
complete_ready(&this->input_link, this, res2, SPA_ID_INVALID);
}
}
@ -520,12 +539,12 @@ static int do_allocation(struct pw_impl_link *this)
if (SPA_RESULT_IS_ASYNC(res)) {
output->busy_count++;
res = spa_node_sync(output->node->node, res);
impl->output_busy_id = pw_work_queue_add(impl->work, output, res,
impl->output_busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
complete_paused, this);
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
return 0;
} else {
complete_paused(output, this, res, SPA_ID_INVALID);
complete_paused(&this->output_link, this, res, SPA_ID_INVALID);
}
}
@ -543,10 +562,10 @@ static int do_allocation(struct pw_impl_link *this)
if (SPA_RESULT_IS_ASYNC(res)) {
input->busy_count++;
res = spa_node_sync(input->node->node, res);
impl->input_busy_id = pw_work_queue_add(impl->work, input, res,
impl->input_busy_id = pw_work_queue_add(impl->work, &this->input_link, res,
complete_paused, this);
} else {
complete_paused(input, this, res, SPA_ID_INVALID);
complete_paused(&this->input_link, this, res, SPA_ID_INVALID);
}
return 0;
@ -666,13 +685,13 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
if (output->busy_count > 0) {
pw_log_debug("%p: output port %p was busy", this, output);
res = spa_node_sync(output->node->node, 0);
pw_work_queue_add(impl->work, output, res, complete_sync, this);
pw_work_queue_add(impl->work, &this->output_link, res, complete_sync, this);
goto exit;
}
else if (input->busy_count > 0) {
pw_log_debug("%p: input port %p was busy", this, input);
res = spa_node_sync(input->node->node, 0);
pw_work_queue_add(impl->work, input, res, complete_sync, this);
pw_work_queue_add(impl->work, &this->input_link, res, complete_sync, this);
goto exit;
}
@ -717,6 +736,8 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_log_warn("%p: port %p clear error %s", this, port, spa_strerror(res));
}
pw_impl_port_release_mix(port, mix);
pw_work_queue_cancel(impl->work, &this->input_link, SPA_ID_INVALID);
this->input = NULL;
}
@ -743,6 +764,8 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port)
/* we don't clear output buffers when the link goes away. They will get
* cleared when the node goes to suspend */
pw_impl_port_release_mix(port, mix);
pw_work_queue_cancel(impl->work, &this->output_link, SPA_ID_INVALID);
this->output = NULL;
}
@ -962,11 +985,11 @@ static const struct pw_impl_port_events output_port_events = {
.latency_changed = output_port_latency_changed,
};
static void node_result(struct impl *impl, struct pw_impl_port *port,
static void node_result(struct impl *impl, void *obj,
int seq, int res, uint32_t type, const void *result)
{
if (SPA_RESULT_IS_ASYNC(seq))
pw_work_queue_complete(impl->work, port, SPA_RESULT_ASYNC_SEQ(seq), res);
pw_work_queue_complete(impl->work, obj, SPA_RESULT_ASYNC_SEQ(seq), res);
}
static void input_node_result(void *data, int seq, int res, uint32_t type, const void *result)
@ -975,7 +998,7 @@ static void input_node_result(void *data, int seq, int res, uint32_t type, const
struct pw_impl_port *port = impl->this.input;
pw_log_trace("%p: input port %p result seq:%d res:%d type:%u",
impl, port, seq, res, type);
node_result(impl, port, seq, res, type, result);
node_result(impl, &impl->this.input_link, seq, res, type, result);
}
static void output_node_result(void *data, int seq, int res, uint32_t type, const void *result)
@ -984,7 +1007,7 @@ static void output_node_result(void *data, int seq, int res, uint32_t type, cons
struct pw_impl_port *port = impl->this.output;
pw_log_trace("%p: output port %p result seq:%d res:%d type:%u",
impl, port, seq, res, type);
node_result(impl, port, seq, res, type, result);
node_result(impl, &impl->this.output_link, seq, res, type, result);
}
static void node_active_changed(void *data, bool active)
@ -1183,7 +1206,7 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
if (user_data_size > 0)
this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
impl->work = pw_work_queue_new(context->main_loop);
impl->work = pw_context_get_work_queue(context);
if (impl->work == NULL)
goto error_work_queue;
@ -1411,8 +1434,6 @@ void pw_impl_link_destroy(struct pw_impl_link *link)
spa_hook_list_clean(&link->listener_list);
pw_work_queue_destroy(impl->work);
pw_properties_free(link->properties);
free(link->name);

View file

@ -1186,7 +1186,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
goto error_clean;
}
impl->work = pw_work_queue_new(this->context->main_loop);
impl->work = pw_context_get_work_queue(this->context);
if (impl->work == NULL) {
res = -errno;
goto error_clean;
@ -1785,14 +1785,14 @@ void pw_impl_node_destroy(struct pw_impl_node *node)
pw_memblock_unref(node->activation);
pw_work_queue_destroy(impl->work);
pw_param_clear(&impl->param_list, SPA_ID_INVALID);
pw_param_clear(&impl->pending_list, SPA_ID_INVALID);
pw_map_clear(&node->input_port_map);
pw_map_clear(&node->output_port_map);
pw_work_queue_cancel(impl->work, node, SPA_ID_INVALID);
pw_properties_free(node->properties);
clear_info(node);

View file

@ -65,7 +65,7 @@ static void process_work_queue(void *data, uint64_t count)
spa_list_for_each_safe(item, tmp, &this->work_list, link) {
if (item->seq != SPA_ID_INVALID) {
pw_log_debug("%p: %d waiting for item %p seq:%d id:%u", this,
pw_log_debug("%p: n_queued:%d waiting for item %p seq:%d id:%u", this,
this->n_queued, item->obj, item->seq, item->id);
continue;
}