impl-link: handle errors better

Keep the original sequence number of the port_set_param(Format) and
port_use_buffer() calls around. When they produce an error, we will
get an error with the same sequence number.

When the sync completes, check if we got an error for the pending
operation and return that as the result.

We then also mark the link in error. We don't mark the port in error
because that is not really the case.

This should make the link error when negotiation of format or buffers
fails instead of silently continuing.
This commit is contained in:
Wim Taymans 2024-09-02 09:59:08 +02:00
parent d59158529b
commit c84cf9a9d4

View file

@ -33,6 +33,10 @@ struct impl {
uint32_t output_busy_id;
uint32_t input_busy_id;
int output_pending_seq;
int input_pending_seq;
int output_result;
int input_result;
struct spa_pod *format_filter;
struct pw_properties *properties;
@ -68,28 +72,36 @@ static void info_changed(struct pw_impl_link *link)
link->info.change_mask = 0;
}
static inline void input_set_busy_id(struct pw_impl_link *link, uint32_t id)
static inline int input_set_busy_id(struct pw_impl_link *link, uint32_t id, int pending_seq)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
int res = impl->input_result;
if (impl->input_busy_id != SPA_ID_INVALID)
link->input->busy_count--;
if (id != SPA_ID_INVALID)
link->input->busy_count++;
impl->input_busy_id = id;
impl->input_pending_seq = SPA_RESULT_ASYNC_SEQ(pending_seq);
impl->input_result = 0;
if (link->input->busy_count < 0)
pw_log_error("%s: invalid busy count:%d", link->name, link->input->busy_count);
return res;
}
static inline void output_set_busy_id(struct pw_impl_link *link, uint32_t id)
static inline int output_set_busy_id(struct pw_impl_link *link, uint32_t id, int pending_seq)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
int res = impl->output_result;
if (impl->output_busy_id != SPA_ID_INVALID)
link->output->busy_count--;
if (id != SPA_ID_INVALID)
link->output->busy_count++;
impl->output_busy_id = id;
impl->output_pending_seq = SPA_RESULT_ASYNC_SEQ(pending_seq);
impl->output_result = 0;
if (link->output->busy_count < 0)
pw_log_error("%s: invalid busy count:%d", link->name, link->output->busy_count);
return res;
}
static void link_update_state(struct pw_impl_link *link, enum pw_link_state state, int res, char *error)
@ -147,10 +159,10 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat
link->prepared = false;
link->preparing = false;
output_set_busy_id(link, SPA_ID_INVALID);
output_set_busy_id(link, SPA_ID_INVALID, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->output_link, SPA_ID_INVALID);
input_set_busy_id(link, SPA_ID_INVALID);
input_set_busy_id(link, SPA_ID_INVALID, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->input_link, SPA_ID_INVALID);
}
}
@ -167,13 +179,12 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
port = this->output;
if (id != SPA_ID_INVALID) {
if (id == impl->input_busy_id) {
input_set_busy_id(this, SPA_ID_INVALID);
} else if (id == impl->output_busy_id) {
output_set_busy_id(this, SPA_ID_INVALID);
} else {
if (id == impl->input_busy_id)
res = input_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
else if (id == impl->output_busy_id)
res = output_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
else
return;
}
}
pw_log_debug("%p: obj:%p port %p complete state:%d: %s", this, obj, port,
@ -183,13 +194,12 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
if (port->state < PW_IMPL_PORT_STATE_READY)
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY,
0, NULL);
if (this->input->state >= PW_IMPL_PORT_STATE_READY &&
this->output->state >= PW_IMPL_PORT_STATE_READY)
link_update_state(this, PW_LINK_STATE_ALLOCATING, 0, NULL);
} else {
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR,
res, spa_aprintf("port error going to READY: %s", spa_strerror(res)));
link_update_state(this, PW_LINK_STATE_ERROR, -EIO, strdup("Format negotiation failed"));
}
if (this->input->state >= PW_IMPL_PORT_STATE_READY &&
this->output->state >= PW_IMPL_PORT_STATE_READY)
link_update_state(this, PW_LINK_STATE_ALLOCATING, 0, NULL);
}
static void complete_paused(void *obj, void *data, int res, uint32_t id)
@ -208,13 +218,12 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
}
if (id != SPA_ID_INVALID) {
if (id == impl->input_busy_id) {
input_set_busy_id(this, SPA_ID_INVALID);
} else if (id == impl->output_busy_id) {
output_set_busy_id(this, SPA_ID_INVALID);
} else {
if (id == impl->input_busy_id)
res = input_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
else if (id == impl->output_busy_id)
res = output_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
else
return;
}
}
pw_log_debug("%p: obj:%p port %p complete state:%d: %s", this, obj, port,
@ -225,13 +234,13 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED,
0, NULL);
mix->have_buffers = true;
if (this->rt.in_mix.have_buffers && this->rt.out_mix.have_buffers)
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);
} else {
pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR,
res, spa_aprintf("port error going to PAUSED: %s", spa_strerror(res)));
mix->have_buffers = false;
link_update_state(this, PW_LINK_STATE_ERROR, -EIO, strdup("Buffer allocation failed"));
}
if (this->rt.in_mix.have_buffers && this->rt.out_mix.have_buffers)
link_update_state(this, PW_LINK_STATE_PAUSED, 0, NULL);
}
static void complete_sync(void *obj, void *data, int res, uint32_t id)
@ -395,10 +404,11 @@ static int do_negotiate(struct pw_impl_link *this)
goto error;
}
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(output->node->node, res);
busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
pw_log_info("output set format %d", res);
busy_id = pw_work_queue_add(impl->work, &this->output_link,
spa_node_sync(output->node->node, res),
complete_ready, this);
output_set_busy_id(this, busy_id);
output_set_busy_id(this, busy_id, res);
} else {
complete_ready(&this->output_link, this, res, SPA_ID_INVALID);
}
@ -414,10 +424,11 @@ static int do_negotiate(struct pw_impl_link *this)
goto error;
}
if (SPA_RESULT_IS_ASYNC(res2)) {
res2 = spa_node_sync(input->node->node, res2);
busy_id = pw_work_queue_add(impl->work, &this->input_link, res2,
pw_log_info("input set format %d", res2);
busy_id = pw_work_queue_add(impl->work, &this->input_link,
spa_node_sync(input->node->node, res2),
complete_ready, this);
input_set_busy_id(this, busy_id);
input_set_busy_id(this, busy_id, res2);
if (res == 0)
res = res2;
} else {
@ -579,10 +590,10 @@ static int do_allocation(struct pw_impl_link *this)
goto error_clear;
}
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(output->node->node, res);
busy_id = pw_work_queue_add(impl->work, &this->output_link, res,
busy_id = pw_work_queue_add(impl->work, &this->output_link,
spa_node_sync(output->node->node, res),
complete_paused, this);
output_set_busy_id(this, busy_id);
output_set_busy_id(this, busy_id, res);
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
return 0;
} else {
@ -602,10 +613,10 @@ static int do_allocation(struct pw_impl_link *this)
}
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(input->node->node, res);
busy_id = pw_work_queue_add(impl->work, &this->input_link, res,
busy_id = pw_work_queue_add(impl->work, &this->input_link,
spa_node_sync(input->node->node, res),
complete_paused, this);
input_set_busy_id(this, busy_id);
input_set_busy_id(this, busy_id, res);
} else {
complete_paused(&this->input_link, this, res, SPA_ID_INVALID);
}
@ -742,7 +753,7 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_log_debug("%p: remove input port %p", this, port);
input_set_busy_id(this, SPA_ID_INVALID);
input_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
spa_hook_remove(&impl->input_port_listener);
spa_hook_remove(&impl->input_node_listener);
@ -772,7 +783,7 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_log_debug("%p: remove output port %p", this, port);
output_set_busy_id(this, SPA_ID_INVALID);
output_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
spa_hook_remove(&impl->output_port_listener);
spa_hook_remove(&impl->output_node_listener);
@ -1007,8 +1018,12 @@ static void input_node_result(void *data, int seq, int res, uint32_t type, const
{
struct impl *impl = data;
struct pw_impl_port *port = impl->this.input;
pw_log_trace("%p: input port %p result seq:%d res:%d type:%u",
impl, port, seq, res, type);
pw_log_info("%p: input port %p result seq:%d %d res:%d type:%u",
impl, port, seq, SPA_RESULT_ASYNC_SEQ(seq), res, type);
if (type == SPA_RESULT_TYPE_NODE_ERROR && impl->input_pending_seq == seq)
impl->input_result = res;
node_result(impl, &impl->this.input_link, seq, res, type, result);
}
@ -1016,8 +1031,12 @@ static void output_node_result(void *data, int seq, int res, uint32_t type, cons
{
struct impl *impl = data;
struct pw_impl_port *port = impl->this.output;
pw_log_trace("%p: output port %p result seq:%d res:%d type:%u",
impl, port, seq, res, type);
pw_log_info("%p: output port %p result seq:%d %d res:%d type:%u",
impl, port, seq, SPA_RESULT_ASYNC_SEQ(seq), res, type);
if (type == SPA_RESULT_TYPE_NODE_ERROR && impl->output_pending_seq == seq)
impl->output_result = res;
node_result(impl, &impl->this.output_link, seq, res, type, result);
}