impl-link: refactor function to update busy_count

This also makes sure the busy_count of a previous busy_id is undone when
setting a new id.

Also logs an error when the busy count goes negative.
This commit is contained in:
Wim Taymans 2023-10-20 15:09:13 +02:00
parent 5a93d77acf
commit 81437f7a77

View file

@ -140,6 +140,30 @@ static void info_changed(struct pw_impl_link *link)
link->info.change_mask = 0;
}
static inline void input_set_busy_id(struct pw_impl_link *link, uint32_t id)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
if (impl->input_busy_id != SPA_ID_INVALID)
link->input->busy_count--;
if (id != SPA_ID_INVALID)
link->input->busy_count++;
impl->input_busy_id = id;
if (link->input->busy_count < 0)
pw_log_error("%p: invalid busy count:%d", link->input->busy_count);
}
static inline void output_set_busy_id(struct pw_impl_link *link, uint32_t id)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
if (impl->output_busy_id != SPA_ID_INVALID)
link->output->busy_count--;
if (id != SPA_ID_INVALID)
link->output->busy_count++;
impl->output_busy_id = id;
if (link->output->busy_count < 0)
pw_log_error("%p: invalid busy count:%d", link->output->busy_count);
}
static void link_update_state(struct pw_impl_link *link, enum pw_link_state state, int res, char *error)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
@ -194,15 +218,11 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat
} else if (state == PW_LINK_STATE_INIT) {
link->prepared = false;
link->preparing = false;
if (impl->output_busy_id != SPA_ID_INVALID) {
impl->output_busy_id = SPA_ID_INVALID;
link->output->busy_count--;
}
output_set_busy_id(link, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->output_link, SPA_ID_INVALID);
if (impl->input_busy_id != SPA_ID_INVALID) {
impl->input_busy_id = SPA_ID_INVALID;
link->input->busy_count--;
}
input_set_busy_id(link, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->input_link, SPA_ID_INVALID);
}
}
@ -220,11 +240,9 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
if (id != SPA_ID_INVALID) {
if (id == impl->input_busy_id) {
impl->input_busy_id = SPA_ID_INVALID;
port->busy_count--;
input_set_busy_id(this, SPA_ID_INVALID);
} else if (id == impl->output_busy_id) {
impl->output_busy_id = SPA_ID_INVALID;
port->busy_count--;
output_set_busy_id(this, SPA_ID_INVALID);
} else {
return;
}
@ -263,11 +281,9 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
if (id != SPA_ID_INVALID) {
if (id == impl->input_busy_id) {
impl->input_busy_id = SPA_ID_INVALID;
port->busy_count--;
input_set_busy_id(this, SPA_ID_INVALID);
} else if (id == impl->output_busy_id) {
impl->output_busy_id = SPA_ID_INVALID;
port->busy_count--;
output_set_busy_id(this, SPA_ID_INVALID);
} else {
return;
}
@ -315,7 +331,7 @@ static int do_negotiate(struct pw_impl_link *this)
struct pw_impl_port *input, *output;
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
uint32_t index;
uint32_t index, busy_id;
uint32_t in_state, out_state;
if (this->info.state >= PW_LINK_STATE_NEGOTIATING)
@ -437,10 +453,9 @@ static int do_negotiate(struct pw_impl_link *this)
}
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(output->node->node, res);
impl->output_busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
complete_ready, this);
if (impl->output_busy_id != SPA_ID_INVALID)
output->busy_count++;
output_set_busy_id(this, busy_id);
} else {
complete_ready(&this->output_link, this, res, SPA_ID_INVALID);
}
@ -457,10 +472,9 @@ static int do_negotiate(struct pw_impl_link *this)
}
if (SPA_RESULT_IS_ASYNC(res2)) {
res2 = spa_node_sync(input->node->node, res2);
impl->input_busy_id = pw_work_queue_add(impl->work, &this->input_link, res2,
busy_id = pw_work_queue_add(impl->work, &this->input_link, res2,
complete_ready, this);
if (impl->input_busy_id != SPA_ID_INVALID)
input->busy_count++;
input_set_busy_id(this, busy_id);
if (res == 0)
res = res2;
} else {
@ -532,7 +546,7 @@ static int do_allocation(struct pw_impl_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
int res;
uint32_t in_flags, out_flags;
uint32_t in_flags, out_flags, busy_id;
char *error = NULL;
struct pw_impl_port *input, *output;
@ -606,10 +620,9 @@ static int do_allocation(struct pw_impl_link *this)
}
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(output->node->node, res);
impl->output_busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
complete_paused, this);
if (impl->output_busy_id != SPA_ID_INVALID)
output->busy_count++;
output_set_busy_id(this, busy_id);
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
return 0;
} else {
@ -630,10 +643,9 @@ static int do_allocation(struct pw_impl_link *this)
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(input->node->node, res);
impl->input_busy_id = pw_work_queue_add(impl->work, &this->input_link, res,
busy_id = pw_work_queue_add(impl->work, &this->input_link, res,
complete_paused, this);
if (impl->input_busy_id != SPA_ID_INVALID)
input->busy_count++;
input_set_busy_id(this, busy_id);
} else {
complete_paused(&this->input_link, this, res, SPA_ID_INVALID);
}
@ -736,13 +748,13 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
}
if (output->busy_count > 0) {
pw_log_debug("%p: output port %p was busy", this, output);
pw_log_debug("%p: output port %p was busy %d", this, output, output->busy_count);
res = spa_node_sync(output->node->node, 0);
pw_work_queue_add(impl->work, &this->output_link, res, complete_sync, this);
goto exit;
}
else if (input->busy_count > 0) {
pw_log_debug("%p: input port %p was busy", this, input);
pw_log_debug("%p: input port %p was busy %d", this, input, input->busy_count);
res = spa_node_sync(input->node->node, 0);
pw_work_queue_add(impl->work, &this->input_link, res, complete_sync, this);
goto exit;
@ -772,10 +784,8 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_log_debug("%p: remove input port %p", this, port);
if (impl->input_busy_id != SPA_ID_INVALID) {
impl->input_busy_id = SPA_ID_INVALID;
port->busy_count--;
}
input_set_busy_id(this, SPA_ID_INVALID);
spa_hook_remove(&impl->input_port_listener);
spa_hook_remove(&impl->input_node_listener);
spa_hook_remove(&impl->input_global_listener);
@ -802,10 +812,8 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_log_debug("%p: remove output port %p", this, port);
if (impl->output_busy_id != SPA_ID_INVALID) {
impl->output_busy_id = SPA_ID_INVALID;
port->busy_count--;
}
output_set_busy_id(this, SPA_ID_INVALID);
spa_hook_remove(&impl->output_port_listener);
spa_hook_remove(&impl->output_node_listener);
spa_hook_remove(&impl->output_global_listener);