impl-link: refactor format negotiation

Make a new port_info structure that holds the link port information for
input and output. We can use this in some places and remove some
redundant code.

We can also pass a reference to this port_info as the work-queue
object, which makes it more natural to find the associated port info in
the various work queue callbacks.

Move the private pw_context_find_format to the link implementation,
where is it actually used.

Rework the format negotiation code to use an array of 2 port_info
structures with the two ports to negotiate. The negiotiation will
always use the first port_info as higher priority.

Make sure a driver port has a lower priority than the other port. We want
to negotiate the source/sink to something close that what is
provided/requested by the client. The client can always adapt to the
driver port format fields by giving a "don't care" value for the format
property (either unspecified or with an out of range default value).
This commit is contained in:
Wim Taymans 2025-04-16 10:21:08 +02:00
parent abfad78fb3
commit 7d6e2a6417
3 changed files with 365 additions and 421 deletions

View file

@ -886,194 +886,6 @@ SPA_PRINTF_FUNC(7, 8) int pw_context_debug_port_params(struct pw_context *this,
return 0;
}
/** Find a common format between two ports
*
* \param context a context object
* \param output an output port
* \param input an input port
* \param props extra properties
* \param n_format_filters number of format filters
* \param format_filters array of format filters
* \param[out] format the common format between the ports
* \param builder builder to use for processing
* \param[out] error an error when something is wrong
* \return a common format of NULL on error
*
* Find a common format between the given ports. The format will
* be restricted to a subset given with the format filters.
*/
int pw_context_find_format(struct pw_context *context,
struct pw_impl_port *output,
uint32_t output_mix,
struct pw_impl_port *input,
uint32_t input_mix,
struct pw_properties *props,
uint32_t n_format_filters,
struct spa_pod **format_filters,
struct spa_pod **format,
struct spa_pod_builder *builder,
char **error)
{
uint32_t out_state, in_state;
int res;
uint32_t iidx = 0, oidx = 0;
struct spa_pod_builder fb = { 0 };
uint8_t fbuf[4096];
struct spa_pod *filter;
struct spa_node *in_node, *out_node;
uint32_t in_port, out_port;
out_state = output->state;
in_state = input->state;
if (output_mix == SPA_ID_INVALID) {
out_node = output->node->node;
out_port = output->port_id;
} else {
out_node = output->mix;
out_port = output_mix;
}
if (input_mix == SPA_ID_INVALID) {
in_node = input->node->node;
in_port = input->port_id;
} else {
in_node = input->mix;
in_port = input_mix;
}
pw_log_debug("%p: finding best format %d %d", context, out_state, in_state);
/* when a port is configured but the node is idle, we can reconfigure with a different format */
if (out_state > PW_IMPL_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE)
out_state = PW_IMPL_PORT_STATE_CONFIGURE;
if (in_state > PW_IMPL_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE)
in_state = PW_IMPL_PORT_STATE_CONFIGURE;
pw_log_debug("%p: states %d %d", context, out_state, in_state);
if (in_state == PW_IMPL_PORT_STATE_CONFIGURE && out_state > PW_IMPL_PORT_STATE_CONFIGURE) {
/* only input needs format */
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
if ((res = spa_node_port_enum_params_sync(out_node,
output->direction, out_port,
SPA_PARAM_Format, &oidx,
NULL, &filter, &fb)) != 1) {
if (res < 0)
*error = spa_aprintf("error get output format: %s", spa_strerror(res));
else
*error = spa_aprintf("no output formats");
goto error;
}
pw_log_debug("%p: Got output format:", context);
pw_log_format(SPA_LOG_LEVEL_DEBUG, filter);
if ((res = spa_node_port_enum_params_sync(in_node,
input->direction, in_port,
SPA_PARAM_EnumFormat, &iidx,
filter, format, builder)) <= 0) {
if (res == -ENOENT || res == 0) {
pw_log_debug("%p: no input format filter, using output format: %s",
context, spa_strerror(res));
uint32_t offset = builder->state.offset;
res = spa_pod_builder_raw_padded(builder, filter, SPA_POD_SIZE(filter));
if (res < 0) {
*error = spa_aprintf("failed to add pod");
goto error;
}
*format = spa_pod_builder_deref(builder, offset);
} else {
*error = spa_aprintf("error input enum formats: %s", spa_strerror(res));
goto error;
}
}
} else if (out_state >= PW_IMPL_PORT_STATE_CONFIGURE && in_state > PW_IMPL_PORT_STATE_CONFIGURE) {
/* only output needs format */
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
if ((res = spa_node_port_enum_params_sync(in_node,
input->direction, in_port,
SPA_PARAM_Format, &iidx,
NULL, &filter, &fb)) != 1) {
if (res < 0)
*error = spa_aprintf("error get input format: %s", spa_strerror(res));
else
*error = spa_aprintf("no input format");
goto error;
}
pw_log_debug("%p: Got input format:", context);
pw_log_format(SPA_LOG_LEVEL_DEBUG, filter);
if ((res = spa_node_port_enum_params_sync(out_node,
output->direction, out_port,
SPA_PARAM_EnumFormat, &oidx,
filter, format, builder)) <= 0) {
if (res == -ENOENT || res == 0) {
pw_log_debug("%p: no output format filter, using input format: %s",
context, spa_strerror(res));
uint32_t offset = builder->state.offset;
res = spa_pod_builder_raw_padded(builder, filter, SPA_POD_SIZE(filter));
if (res < 0) {
*error = spa_aprintf("failed to add pod");
goto error;
}
*format = spa_pod_builder_deref(builder, offset);
} else {
*error = spa_aprintf("error output enum formats: %s", spa_strerror(res));
goto error;
}
}
} else if (in_state == PW_IMPL_PORT_STATE_CONFIGURE && out_state == PW_IMPL_PORT_STATE_CONFIGURE) {
again:
/* both ports need a format */
pw_log_debug("%p: do enum input %d", context, iidx);
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
if ((res = spa_node_port_enum_params_sync(in_node,
input->direction, in_port,
SPA_PARAM_EnumFormat, &iidx,
NULL, &filter, &fb)) != 1) {
if (res == -ENOENT) {
pw_log_debug("%p: no input filter", context);
filter = NULL;
} else {
if (res < 0)
*error = spa_aprintf("error input enum formats: %s", spa_strerror(res));
else
*error = spa_aprintf("no more input formats");
goto error;
}
}
pw_log_debug("%p: enum output %d with filter: %p", context, oidx, filter);
pw_log_format(SPA_LOG_LEVEL_DEBUG, filter);
if ((res = spa_node_port_enum_params_sync(out_node,
output->direction, out_port,
SPA_PARAM_EnumFormat, &oidx,
filter, format, builder)) != 1) {
if (res == 0 && filter != NULL) {
oidx = 0;
goto again;
}
*error = spa_aprintf("error output enum formats: %s", spa_strerror(res));
goto error;
}
pw_log_debug("%p: Got filtered:", context);
pw_log_format(SPA_LOG_LEVEL_DEBUG, *format);
} else {
res = -EBADF;
*error = spa_aprintf("error bad node state");
goto error;
}
return res;
error:
if (res == 0)
res = -EINVAL;
return res;
}
static int ensure_state(struct pw_impl_node *node, bool running)
{
enum pw_node_state state = node->info.state;

View file

@ -24,6 +24,18 @@ PW_LOG_TOPIC_EXTERN(log_link);
#define pw_link_resource_info(r,...) pw_resource_call(r,struct pw_link_events,info,0,__VA_ARGS__)
struct port_info {
uint32_t busy_id;
int pending_seq;
int result;
struct spa_hook port_listener;
struct spa_hook node_listener;
struct spa_hook global_listener;
struct pw_impl_port *port;
struct pw_impl_node *node;
struct pw_impl_port_mix *mix;
};
/** \cond */
struct impl {
struct pw_impl_link this;
@ -32,26 +44,14 @@ struct impl {
struct pw_work_queue *work;
uint32_t output_busy_id;
uint32_t input_busy_id;
int output_pending_seq;
int input_pending_seq;
int output_result;
int input_result;
struct port_info input;
struct port_info output;
struct spa_pod *format_filter;
struct pw_properties *properties;
struct spa_hook input_port_listener;
struct spa_hook input_node_listener;
struct spa_hook input_global_listener;
struct spa_hook output_port_listener;
struct spa_hook output_node_listener;
struct spa_hook output_global_listener;
struct spa_io_buffers io[2];
struct pw_impl_node *inode, *onode;
bool async;
};
@ -73,35 +73,18 @@ static void info_changed(struct pw_impl_link *link)
link->info.change_mask = 0;
}
static inline int input_set_busy_id(struct pw_impl_link *link, uint32_t id, int pending_seq)
static inline int port_set_busy_id(struct pw_impl_link *link, struct port_info *info, uint32_t id, int pending_seq)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
int res = impl->input_result;
if (impl->input_busy_id != SPA_ID_INVALID)
link->input->busy_count--;
int res = info->result;
if (info->busy_id != SPA_ID_INVALID)
info->port->busy_count--;
if (id != SPA_ID_INVALID)
link->input->busy_count++;
impl->input_busy_id = id;
impl->input_pending_seq = SPA_RESULT_ASYNC_SEQ(pending_seq);
impl->input_result = 0;
if (link->input->busy_count < 0)
pw_log_error("%s: invalid busy count:%d", link->name, link->input->busy_count);
return res;
}
static inline int output_set_busy_id(struct pw_impl_link *link, uint32_t id, int pending_seq)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
int res = impl->output_result;
if (impl->output_busy_id != SPA_ID_INVALID)
link->output->busy_count--;
if (id != SPA_ID_INVALID)
link->output->busy_count++;
impl->output_busy_id = id;
impl->output_pending_seq = SPA_RESULT_ASYNC_SEQ(pending_seq);
impl->output_result = 0;
if (link->output->busy_count < 0)
pw_log_error("%s: invalid busy count:%d", link->name, link->output->busy_count);
info->port->busy_count++;
info->busy_id = id;
info->pending_seq = SPA_RESULT_ASYNC_SEQ(pending_seq);
info->result = 0;
if (info->port->busy_count < 0)
pw_log_error("%s: invalid busy count:%d", link->name, info->port->busy_count);
return res;
}
@ -160,30 +143,23 @@ static void link_update_state(struct pw_impl_link *link, enum pw_link_state stat
link->prepared = false;
link->preparing = false;
output_set_busy_id(link, SPA_ID_INVALID, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->output_link, SPA_ID_INVALID);
port_set_busy_id(link, &impl->output, SPA_ID_INVALID, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &impl->output, SPA_ID_INVALID);
input_set_busy_id(link, SPA_ID_INVALID, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &link->input_link, SPA_ID_INVALID);
port_set_busy_id(link, &impl->input, SPA_ID_INVALID, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, &impl->input, SPA_ID_INVALID);
}
}
static void complete_ready(void *obj, void *data, int res, uint32_t id)
{
struct pw_impl_port *port;
struct port_info *info = obj;
struct pw_impl_port *port = info->port;
struct pw_impl_link *this = data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
if (obj == &this->input_link)
port = this->input;
else
port = this->output;
if (id != SPA_ID_INVALID) {
if (id == impl->input_busy_id)
res = input_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
else if (id == impl->output_busy_id)
res = output_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
if (id == info->busy_id)
res = port_set_busy_id(this, info, SPA_ID_INVALID, SPA_ID_INVALID);
else
return;
}
@ -205,24 +181,14 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id)
static void complete_paused(void *obj, void *data, int res, uint32_t id)
{
struct pw_impl_port *port;
struct port_info *info = obj;
struct pw_impl_port *port = info->port;
struct pw_impl_port_mix *mix = info->mix;
struct pw_impl_link *this = data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_impl_port_mix *mix;
if (obj == &this->input_link) {
port = this->input;
mix = &this->rt.in_mix;
} else {
port = this->output;
mix = &this->rt.out_mix;
}
if (id != SPA_ID_INVALID) {
if (id == impl->input_busy_id)
res = input_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
else if (id == impl->output_busy_id)
res = output_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
if (id == info->busy_id)
res = port_set_busy_id(this, info, SPA_ID_INVALID, SPA_ID_INVALID);
else
return;
}
@ -246,18 +212,181 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id)
static void complete_sync(void *obj, void *data, int res, uint32_t id)
{
struct pw_impl_port *port;
struct port_info *info = obj;
struct pw_impl_port *port = info->port;
struct pw_impl_link *this = data;
if (obj == &this->input_link)
port = this->input;
else
port = this->output;
pw_log_debug("%p: obj:%p port %p complete state:%d: %s", this, obj, port,
port->state, spa_strerror(res));
}
/* find a common format. info[0] has the higher priority.
* Either the format contains a valid common format or error is set. */
static int link_find_format(struct pw_impl_link *this,
struct port_info *info[2],
uint32_t port_id[2],
struct spa_pod **format,
struct spa_pod_builder *builder,
char **error)
{
int res;
uint32_t state[2];
uint32_t idx[2] = { 0, 0 };
struct spa_pod_builder fb = { 0 };
uint8_t fbuf[4096];
struct spa_pod *filter;
struct spa_node *node[2];
const char *dir[2];
state[0] = info[0]->port->state;
state[1] = info[1]->port->state;
node[0] = info[0]->node->node;
node[1] = info[1]->node->node;
port_id[0] = info[0]->port->port_id;
port_id[1] = info[1]->port->port_id;
dir[0] = pw_direction_as_string(info[0]->port->direction);
dir[1] = pw_direction_as_string(info[1]->port->direction);
pw_log_debug("%p: finding best format %d %d", this, state[0], state[1]);
/* when a port is configured but the node is idle, we can reconfigure with a different format */
if (state[1] > PW_IMPL_PORT_STATE_CONFIGURE && info[1]->node->info.state == PW_NODE_STATE_IDLE)
state[1] = PW_IMPL_PORT_STATE_CONFIGURE;
if (state[0] > PW_IMPL_PORT_STATE_CONFIGURE && info[0]->node->info.state == PW_NODE_STATE_IDLE)
state[0] = PW_IMPL_PORT_STATE_CONFIGURE;
pw_log_debug("%p: states %d %d", this, state[0], state[1]);
if (state[0] == PW_IMPL_PORT_STATE_CONFIGURE && state[1] > PW_IMPL_PORT_STATE_CONFIGURE) {
/* only port 0 needs format, take format from port 1 and filter */
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
if ((res = spa_node_port_enum_params_sync(node[1],
info[1]->port->direction, port_id[1],
SPA_PARAM_Format, &idx[1],
NULL, &filter, &fb)) != 1) {
if (res < 0)
*error = spa_aprintf("error get %s format: %s", dir[1],
spa_strerror(res));
else
*error = spa_aprintf("no %s formats", dir[1]);
goto error;
}
pw_log_debug("%p: Got %s format:", this, dir[1]);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, filter);
if ((res = spa_node_port_enum_params_sync(node[0],
info[0]->port->direction, port_id[0],
SPA_PARAM_EnumFormat, &idx[0],
filter, format, builder)) <= 0) {
if (res == -ENOENT || res == 0) {
pw_log_debug("%p: no %s format filter, using %s format: %s",
this, dir[0], dir[1], spa_strerror(res));
uint32_t offset = builder->state.offset;
res = spa_pod_builder_raw_padded(builder, filter, SPA_POD_SIZE(filter));
if (res < 0) {
*error = spa_aprintf("failed to add pod");
goto error;
}
*format = spa_pod_builder_deref(builder, offset);
} else {
*error = spa_aprintf("error %s enum formats: %s", dir[0],
spa_strerror(res));
goto error;
}
}
} else if (state[1] >= PW_IMPL_PORT_STATE_CONFIGURE && state[0] > PW_IMPL_PORT_STATE_CONFIGURE) {
/* only port 1 needs format, take and filter format from port 0 */
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
if ((res = spa_node_port_enum_params_sync(node[0],
info[0]->port->direction, port_id[0],
SPA_PARAM_Format, &idx[0],
NULL, &filter, &fb)) != 1) {
if (res < 0)
*error = spa_aprintf("error get %s format: %s", dir[0],
spa_strerror(res));
else
*error = spa_aprintf("no %s format", dir[0]);
goto error;
}
pw_log_debug("%p: Got %s format:", this, dir[0]);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, filter);
if ((res = spa_node_port_enum_params_sync(node[1],
info[1]->port->direction, port_id[1],
SPA_PARAM_EnumFormat, &idx[1],
filter, format, builder)) <= 0) {
if (res == -ENOENT || res == 0) {
pw_log_debug("%p: no %s format filter, using %s format: %s",
this, dir[1], dir[0], spa_strerror(res));
uint32_t offset = builder->state.offset;
res = spa_pod_builder_raw_padded(builder, filter, SPA_POD_SIZE(filter));
if (res < 0) {
*error = spa_aprintf("failed to add pod");
goto error;
}
*format = spa_pod_builder_deref(builder, offset);
} else {
*error = spa_aprintf("error %s enum formats: %s", dir[1],
spa_strerror(res));
goto error;
}
}
} else if (state[0] == PW_IMPL_PORT_STATE_CONFIGURE && state[1] == PW_IMPL_PORT_STATE_CONFIGURE) {
again:
/* both ports need a format, we start with a format from port 0 and use that
* as a filter for port 1. Because the filter has higher priority, its
* defaults will be prefered. */
pw_log_debug("%p: do enum %s %d", this, dir[0], idx[0]);
spa_pod_builder_init(&fb, fbuf, sizeof(fbuf));
if ((res = spa_node_port_enum_params_sync(node[0],
info[0]->port->direction, port_id[0],
SPA_PARAM_EnumFormat, &idx[0],
NULL, &filter, &fb)) != 1) {
if (res == -ENOENT) {
pw_log_debug("%p: no %s filter", this, dir[0]);
filter = NULL;
} else {
if (res < 0)
*error = spa_aprintf("error %s enum formats: %s", dir[0],
spa_strerror(res));
else
*error = spa_aprintf("no more %s formats", dir[0]);
goto error;
}
}
pw_log_debug("%p: enum %s %d with filter: %p", this, dir[1], idx[1], filter);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, filter);
if ((res = spa_node_port_enum_params_sync(node[1],
info[1]->port->direction, port_id[1],
SPA_PARAM_EnumFormat, &idx[1],
filter, format, builder)) != 1) {
if (res == 0 && filter != NULL) {
idx[1] = 0;
goto again;
}
*error = spa_aprintf("error %s enum formats: %s", dir[1], spa_strerror(res));
goto error;
}
pw_log_debug("%p: Got filtered:", this);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, *format);
} else {
res = -EBADF;
*error = spa_aprintf("error bad node state");
goto error;
}
return res;
error:
if (res == 0)
res = -EINVAL;
return res;
}
static int do_negotiate(struct pw_impl_link *this)
{
struct pw_context *context = this->context;
@ -265,65 +394,71 @@ static int do_negotiate(struct pw_impl_link *this)
int res = -EIO, res2;
struct spa_pod *format = NULL, *current;
char *error = NULL;
bool changed = true;
struct pw_impl_port *input, *output;
bool changed = false;
struct port_info *info[2];
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
uint32_t index, busy_id;
uint32_t in_state, out_state;
struct spa_node *in_node, *out_node;
uint32_t in_port, out_port;
uint32_t state[2];
struct spa_node *node[2];
uint32_t port_id[2];
const char *dir[2];
if (this->info.state >= PW_LINK_STATE_NEGOTIATING)
return 0;
input = this->input;
output = this->output;
/* driver nodes have lower priority for selecting the format.
* Higher priority nodes go into info[0] */
if (this->output->node->driver) {
info[0] = &impl->input;
info[1] = &impl->output;
} else {
info[0] = &impl->output;
info[1] = &impl->input;
}
state[0] = info[0]->port->state;
state[1] = info[1]->port->state;
in_state = input->state;
out_state = output->state;
dir[0] = pw_direction_as_string(info[0]->port->direction),
dir[1] = pw_direction_as_string(info[1]->port->direction),
pw_log_debug("%p: in_state:%d out_state:%d", this, in_state, out_state);
pw_log_info("%p: %s:%d -> %s:%d", this, dir[0], state[0], dir[1], state[1]);
if (in_state != PW_IMPL_PORT_STATE_CONFIGURE && out_state != PW_IMPL_PORT_STATE_CONFIGURE)
if (state[0] != PW_IMPL_PORT_STATE_CONFIGURE && state[1] != PW_IMPL_PORT_STATE_CONFIGURE)
return 0;
link_update_state(this, PW_LINK_STATE_NEGOTIATING, 0, NULL);
input = this->input;
output = this->output;
#if 0
in_node = input->mix;
in_port = this->rt.in_mix.port.port_id;
out_node = output->mix;
out_port = this->rt.out_mix.port.port_id;
node[0] = info[0]->port->mix;
port_id[0] = this->rt.in_mix.port.port_id;
node[1] = info[1]->port->mix;
port_id[1] = this->rt.out_mix.port.port_id;
#else
in_node = input->node->node;
in_port = input->port_id;
out_node = output->node->node;
out_port = output->port_id;
node[0] = info[0]->node->node;
port_id[0] = info[0]->port->port_id;
node[1] = info[1]->node->node;
port_id[1] = info[1]->port->port_id;
#endif
/* find a common format for the ports */
if ((res = pw_context_find_format(context,
output, SPA_ID_INVALID,
input, SPA_ID_INVALID,
NULL, 0, NULL,
&format, &b, &error)) < 0) {
if ((res = link_find_format(this, info, port_id, &format, &b, &error)) < 0) {
format = NULL;
goto error;
}
format = spa_pod_copy(format);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, format);
spa_pod_fixate(format);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, format);
spa_pod_builder_init(&b, buffer, sizeof(buffer));
/* if output port had format and is idle, check if it changed. If so, renegotiate */
if (out_state > PW_IMPL_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE) {
/* if port 1 had format and is idle, check if it changed. If so, renegotiate */
if (state[1] > PW_IMPL_PORT_STATE_CONFIGURE && info[1]->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
res = spa_node_port_enum_params_sync(out_node,
output->direction, out_port,
res = spa_node_port_enum_params_sync(node[1],
info[1]->port->direction, port_id[1],
SPA_PARAM_Format, &index,
NULL, &current, &b);
switch (res) {
@ -337,27 +472,29 @@ static int do_negotiate(struct pw_impl_link *this)
res = -EBADF;
SPA_FALLTHROUGH
default:
error = spa_aprintf("error get output format: %s", spa_strerror(res));
error = spa_aprintf("error get %s format: %s",
pw_direction_as_string(info[1]->port->direction),
spa_strerror(res));
goto error;
}
if (current == NULL || spa_pod_compare(current, format) != 0) {
pw_log_debug("%p: output format change, renegotiate", this);
pw_log_debug("%p: %s format change, renegotiate", this,
pw_direction_as_string(info[1]->port->direction));
if (current)
pw_log_pod(SPA_LOG_LEVEL_DEBUG, current);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, format);
pw_impl_node_set_state(output->node, PW_NODE_STATE_SUSPENDED);
out_state = PW_IMPL_PORT_STATE_CONFIGURE;
pw_impl_node_set_state(info[1]->node, PW_NODE_STATE_SUSPENDED);
state[1] = PW_IMPL_PORT_STATE_CONFIGURE;
}
else {
pw_log_debug("%p: format was already set", this);
changed = false;
}
}
/* if input port had format and is idle, check if it changed. If so, renegotiate */
if (in_state > PW_IMPL_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE) {
/* if port 0 had format and is idle, check if it changed. If so, renegotiate */
if (state[0] > PW_IMPL_PORT_STATE_CONFIGURE && info[0]->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
res = spa_node_port_enum_params_sync(in_node,
input->direction, in_port,
res = spa_node_port_enum_params_sync(node[0],
info[0]->port->direction, port_id[0],
SPA_PARAM_Format, &index,
NULL, &current, &b);
switch (res) {
@ -371,20 +508,22 @@ static int do_negotiate(struct pw_impl_link *this)
res = -EBADF;
SPA_FALLTHROUGH
default:
error = spa_aprintf("error get input format: %s", spa_strerror(res));
error = spa_aprintf("error get %s format: %s",
pw_direction_as_string(info[0]->port->direction),
spa_strerror(res));
goto error;
}
if (current == NULL || spa_pod_compare(current, format) != 0) {
pw_log_debug("%p: input format change, renegotiate", this);
pw_log_debug("%p: %s format change, renegotiate", this,
pw_direction_as_string(info[0]->port->direction));
if (current)
pw_log_pod(SPA_LOG_LEVEL_DEBUG, current);
pw_log_pod(SPA_LOG_LEVEL_DEBUG, format);
pw_impl_node_set_state(input->node, PW_NODE_STATE_SUSPENDED);
in_state = PW_IMPL_PORT_STATE_CONFIGURE;
pw_impl_node_set_state(info[0]->node, PW_NODE_STATE_SUSPENDED);
state[0] = PW_IMPL_PORT_STATE_CONFIGURE;
}
else {
pw_log_debug("%p: format was already set", this);
changed = false;
}
}
@ -393,47 +532,51 @@ static int do_negotiate(struct pw_impl_link *this)
format, spa_pod_is_fixated(format));
pw_log_pod(SPA_LOG_LEVEL_INFO, format);
if (out_state == PW_IMPL_PORT_STATE_CONFIGURE) {
pw_log_debug("%p: doing set format on output", this);
if ((res = pw_impl_port_set_param(output,
if (state[1] == PW_IMPL_PORT_STATE_CONFIGURE) {
pw_log_debug("%p: doing set format on %s", this, dir[1]);
if ((res = pw_impl_port_set_param(info[1]->port,
SPA_PARAM_Format, 0,
format)) < 0) {
error = spa_aprintf("error set output format: %d (%s)", res, spa_strerror(res));
pw_log_error("tried to set output format:");
error = spa_aprintf("error set %s format: %d (%s)", dir[1],
res, spa_strerror(res));
pw_log_error("tried to set %s format:", dir[1]);
pw_log_pod(SPA_LOG_LEVEL_ERROR, format);
goto error;
}
pw_log_debug("%s set format: %d", dir[1], res);
if (SPA_RESULT_IS_ASYNC(res)) {
pw_log_debug("output set format %d", res);
busy_id = pw_work_queue_add(impl->work, &this->output_link,
spa_node_sync(output->node->node, res),
busy_id = pw_work_queue_add(impl->work, info[1],
spa_node_sync(info[1]->node->node, res),
complete_ready, this);
output_set_busy_id(this, busy_id, res);
port_set_busy_id(this, info[1], busy_id, res);
} else {
complete_ready(&this->output_link, this, res, SPA_ID_INVALID);
complete_ready(info[1], this, res, SPA_ID_INVALID);
}
changed = true;
}
if (in_state == PW_IMPL_PORT_STATE_CONFIGURE) {
pw_log_debug("%p: doing set format on input", this);
if ((res2 = pw_impl_port_set_param(input,
if (state[0] == PW_IMPL_PORT_STATE_CONFIGURE) {
pw_log_debug("%p: doing set format on %s", this, dir[0]);
if ((res2 = pw_impl_port_set_param(info[0]->port,
SPA_PARAM_Format, 0,
format)) < 0) {
error = spa_aprintf("error set input format: %d (%s)", res2, spa_strerror(res2));
pw_log_error("tried to set input format:");
error = spa_aprintf("error set %s format: %d (%s)", dir[0],
res2, spa_strerror(res2));
pw_log_error("tried to set %s format:", dir[0]);
pw_log_pod(SPA_LOG_LEVEL_ERROR, format);
goto error;
}
pw_log_debug("%s set format: %d", dir[0], res2);
if (SPA_RESULT_IS_ASYNC(res2)) {
pw_log_debug("input set format %d", res2);
busy_id = pw_work_queue_add(impl->work, &this->input_link,
spa_node_sync(input->node->node, res2),
busy_id = pw_work_queue_add(impl->work, info[0],
spa_node_sync(info[0]->node->node, res2),
complete_ready, this);
input_set_busy_id(this, busy_id, res2);
port_set_busy_id(this, info[0], busy_id, res2);
if (res == 0)
res = res2;
} else {
complete_ready(&this->input_link, this, res2, SPA_ID_INVALID);
complete_ready(info[0], this, res2, SPA_ID_INVALID);
}
changed = true;
}
free(this->info.format);
@ -446,21 +589,23 @@ static int do_negotiate(struct pw_impl_link *this)
return res;
error:
pw_context_debug_port_params(context, in_node,
input->direction, in_port,
pw_context_debug_port_params(context, node[0],
info[0]->port->direction, port_id[0],
SPA_PARAM_EnumFormat, res, "input format (%s)", error);
pw_context_debug_port_params(context, out_node,
output->direction, out_port,
pw_context_debug_port_params(context, node[1],
info[1]->port->direction, port_id[1],
SPA_PARAM_EnumFormat, res, "output format (%s)", error);
link_update_state(this, PW_LINK_STATE_ERROR, res, error);
free(format);
return res;
}
static int port_set_io(struct pw_impl_link *this, struct pw_impl_port *port, uint32_t id,
void *data, size_t size, struct pw_impl_port_mix *mix)
static int port_set_io(struct pw_impl_link *this, struct port_info *info, uint32_t id,
void *data, size_t size)
{
int res = 0;
struct pw_impl_port *port = info->port;
struct pw_impl_port_mix *mix = info->mix;
pw_log_debug("%p: %s port %p %d.%d set io: %d %p %zd", this,
pw_direction_as_string(port->direction),
@ -590,14 +735,14 @@ static int do_allocation(struct pw_impl_link *this)
goto error_clear;
}
if (SPA_RESULT_IS_ASYNC(res)) {
busy_id = pw_work_queue_add(impl->work, &this->output_link,
busy_id = pw_work_queue_add(impl->work, &impl->output,
spa_node_sync(output->node->node, res),
complete_paused, this);
output_set_busy_id(this, busy_id, res);
port_set_busy_id(this, &impl->output, busy_id, res);
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
return 0;
} else {
complete_paused(&this->output_link, this, res, SPA_ID_INVALID);
complete_paused(&impl->output, this, res, SPA_ID_INVALID);
}
}
@ -613,12 +758,12 @@ static int do_allocation(struct pw_impl_link *this)
}
if (SPA_RESULT_IS_ASYNC(res)) {
busy_id = pw_work_queue_add(impl->work, &this->input_link,
busy_id = pw_work_queue_add(impl->work, &impl->input,
spa_node_sync(input->node->node, res),
complete_paused, this);
input_set_busy_id(this, busy_id, res);
port_set_busy_id(this, &impl->input, busy_id, res);
} else {
complete_paused(&this->input_link, this, res, SPA_ID_INVALID);
complete_paused(&impl->input, this, res, SPA_ID_INVALID);
}
return 0;
@ -639,7 +784,7 @@ int pw_impl_link_activate(struct pw_impl_link *this)
pw_link_state_as_string(this->info.state));
if (this->destroyed || impl->activated || !this->prepared ||
!impl->inode->runnable || !impl->onode->runnable)
!impl->input.node->runnable || !impl->output.node->runnable)
return 0;
if (impl->async) {
@ -650,11 +795,9 @@ int pw_impl_link_activate(struct pw_impl_link *this)
io_size = sizeof(struct spa_io_buffers);
}
if ((res = port_set_io(this, this->input, io_type, this->io,
io_size, &this->rt.in_mix)) < 0)
if ((res = port_set_io(this, &impl->input, io_type, this->io, io_size)) < 0)
goto error;
if ((res = port_set_io(this, this->output, io_type, this->io,
io_size, &this->rt.out_mix)) < 0)
if ((res = port_set_io(this, &impl->output, io_type, this->io, io_size)) < 0)
goto error_clean;
impl->activated = true;
@ -664,7 +807,7 @@ int pw_impl_link_activate(struct pw_impl_link *this)
return 0;
error_clean:
port_set_io(this, this->input, io_type, NULL, 0, &this->rt.in_mix);
port_set_io(this, &impl->input, io_type, NULL, 0);
error:
pw_log_error("%p: can't activate link: %s", this, spa_strerror(res));
return res;
@ -719,13 +862,13 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
if (output->busy_count > 0) {
pw_log_debug("%p: output port %p was busy %d", this, output, output->busy_count);
res = spa_node_sync(output->node->node, 0);
pw_work_queue_add(impl->work, &this->output_link, res, complete_sync, this);
pw_work_queue_add(impl->work, &impl->output, res, complete_sync, this);
goto exit;
}
else if (input->busy_count > 0) {
pw_log_debug("%p: input port %p was busy %d", this, input, input->busy_count);
res = spa_node_sync(input->node->node, 0);
pw_work_queue_add(impl->work, &this->input_link, res, complete_sync, this);
pw_work_queue_add(impl->work, &impl->input, res, complete_sync, this);
goto exit;
}
@ -745,61 +888,65 @@ exit:
this, -EBUSY, (pw_work_func_t) check_states, this);
}
static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port)
static void input_remove(struct pw_impl_link *this)
{
struct impl *impl = (struct impl *) this;
struct pw_impl_port_mix *mix = &this->rt.in_mix;
struct port_info *info = &impl->input;
struct pw_impl_port_mix *mix = info->mix;
struct pw_impl_port *port = info->port;
int res;
pw_log_debug("%p: remove input port %p", this, port);
input_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
port_set_busy_id(this, info, SPA_ID_INVALID, SPA_ID_INVALID);
spa_hook_remove(&impl->input_port_listener);
spa_hook_remove(&impl->input_node_listener);
spa_hook_remove(&impl->input_global_listener);
spa_hook_remove(&info->port_listener);
spa_hook_remove(&info->node_listener);
spa_hook_remove(&info->global_listener);
spa_list_remove(&this->input_link);
pw_impl_port_emit_link_removed(this->input, this);
pw_impl_port_emit_link_removed(port, this);
pw_impl_port_recalc_latency(this->input);
pw_impl_port_recalc_tag(this->input);
pw_impl_port_recalc_latency(port);
pw_impl_port_recalc_tag(port);
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, mix)) < 0)
if ((res = port_set_io(this, info, SPA_IO_Buffers, NULL, 0)) < 0)
pw_log_warn("%p: port %p set_io error %s", this, port, spa_strerror(res));
if ((res = pw_impl_port_use_buffers(port, mix, 0, NULL, 0)) < 0)
pw_log_warn("%p: port %p clear error %s", this, port, spa_strerror(res));
pw_impl_port_release_mix(port, mix);
pw_work_queue_cancel(impl->work, &this->input_link, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, info, SPA_ID_INVALID);
this->input = NULL;
}
static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port)
static void output_remove(struct pw_impl_link *this)
{
struct impl *impl = (struct impl *) this;
struct pw_impl_port_mix *mix = &this->rt.out_mix;
struct port_info *info = &impl->output;
struct pw_impl_port_mix *mix = info->mix;
struct pw_impl_port *port = info->port;
pw_log_debug("%p: remove output port %p", this, port);
output_set_busy_id(this, SPA_ID_INVALID, SPA_ID_INVALID);
port_set_busy_id(this, info, SPA_ID_INVALID, SPA_ID_INVALID);
spa_hook_remove(&impl->output_port_listener);
spa_hook_remove(&impl->output_node_listener);
spa_hook_remove(&impl->output_global_listener);
spa_hook_remove(&info->port_listener);
spa_hook_remove(&info->node_listener);
spa_hook_remove(&info->global_listener);
spa_list_remove(&this->output_link);
pw_impl_port_emit_link_removed(this->output, this);
pw_impl_port_emit_link_removed(port, this);
pw_impl_port_recalc_latency(this->output);
pw_impl_port_recalc_tag(this->output);
pw_impl_port_recalc_latency(port);
pw_impl_port_recalc_tag(port);
/* we don't clear output buffers when the link goes away. They will get
* cleared when the node goes to suspend */
pw_impl_port_release_mix(port, mix);
pw_work_queue_cancel(impl->work, &this->output_link, SPA_ID_INVALID);
pw_work_queue_cancel(impl->work, info, SPA_ID_INVALID);
this->output = NULL;
}
@ -809,9 +956,9 @@ int pw_impl_link_prepare(struct pw_impl_link *this)
pw_log_debug("%p: prepared:%d preparing:%d in_active:%d out_active:%d passive:%u",
this, this->prepared, this->preparing,
impl->inode->active, impl->onode->active, this->passive);
impl->input.node->active, impl->output.node->active, this->passive);
if (!impl->inode->active || !impl->onode->active)
if (!impl->input.node->active || !impl->output.node->active)
return 0;
if (this->destroyed || this->preparing || this->prepared)
@ -834,10 +981,8 @@ int pw_impl_link_deactivate(struct pw_impl_link *this)
if (!impl->activated)
return 0;
port_set_io(this, this->output, SPA_IO_Buffers, NULL, 0,
&this->rt.out_mix);
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0,
&this->rt.in_mix);
port_set_io(this, &impl->output, SPA_IO_Buffers, NULL, 0);
port_set_io(this, &impl->input, SPA_IO_Buffers, NULL, 0);
impl->activated = false;
pw_log_info("(%s) deactivated", this->name);
@ -1010,6 +1155,16 @@ static const struct pw_impl_port_events output_port_events = {
static void node_result(struct impl *impl, void *obj,
int seq, int res, uint32_t type, const void *result)
{
struct port_info *info = obj;
struct pw_impl_port *port = info->port;
pw_log_trace("%p: %s port %p result seq:%d %d res:%d type:%u",
impl, pw_direction_as_string(port->direction),
port, seq, SPA_RESULT_ASYNC_SEQ(seq), res, type);
if (type == SPA_RESULT_TYPE_NODE_ERROR && info->pending_seq == seq)
info->result = res;
if (SPA_RESULT_IS_ASYNC(seq))
pw_work_queue_complete(impl->work, obj, SPA_RESULT_ASYNC_SEQ(seq), res);
}
@ -1017,27 +1172,13 @@ static void node_result(struct impl *impl, void *obj,
static void input_node_result(void *data, int seq, int res, uint32_t type, const void *result)
{
struct impl *impl = data;
struct pw_impl_port *port = impl->this.input;
pw_log_trace("%p: input port %p result seq:%d %d res:%d type:%u",
impl, port, seq, SPA_RESULT_ASYNC_SEQ(seq), res, type);
if (type == SPA_RESULT_TYPE_NODE_ERROR && impl->input_pending_seq == seq)
impl->input_result = res;
node_result(impl, &impl->this.input_link, seq, res, type, result);
node_result(impl, &impl->input, seq, res, type, result);
}
static void output_node_result(void *data, int seq, int res, uint32_t type, const void *result)
{
struct impl *impl = data;
struct pw_impl_port *port = impl->this.output;
pw_log_trace("%p: output port %p result seq:%d %d res:%d type:%u",
impl, port, seq, SPA_RESULT_ASYNC_SEQ(seq), res, type);
if (type == SPA_RESULT_TYPE_NODE_ERROR && impl->output_pending_seq == seq)
impl->output_result = res;
node_result(impl, &impl->this.output_link, seq, res, type, result);
node_result(impl, &impl->output, seq, res, type, result);
}
static void node_active_changed(void *data, bool active)
@ -1312,8 +1453,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
if (impl == NULL)
goto error_no_mem;
impl->input_busy_id = SPA_ID_INVALID;
impl->output_busy_id = SPA_ID_INVALID;
impl->input.busy_id = SPA_ID_INVALID;
impl->output.busy_id = SPA_ID_INVALID;
this = &impl->this;
this->feedback = pw_impl_node_can_reach(input_node, output_node, 0);
@ -1363,12 +1504,12 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
if ((res = pw_impl_port_init_mix(input, &this->rt.in_mix)) < 0)
goto error_input_mix;
pw_impl_port_add_listener(input, &impl->input_port_listener, &input_port_events, impl);
pw_impl_node_add_listener(input_node, &impl->input_node_listener, &input_node_events, impl);
pw_global_add_listener(input->global, &impl->input_global_listener, &input_global_events, impl);
pw_impl_port_add_listener(output, &impl->output_port_listener, &output_port_events, impl);
pw_impl_node_add_listener(output_node, &impl->output_node_listener, &output_node_events, impl);
pw_global_add_listener(output->global, &impl->output_global_listener, &output_global_events, impl);
pw_impl_port_add_listener(input, &impl->input.port_listener, &input_port_events, impl);
pw_impl_node_add_listener(input_node, &impl->input.node_listener, &input_node_events, impl);
pw_global_add_listener(input->global, &impl->input.global_listener, &input_global_events, impl);
pw_impl_port_add_listener(output, &impl->output.port_listener, &output_port_events, impl);
pw_impl_node_add_listener(output_node, &impl->output.node_listener, &output_node_events, impl);
pw_global_add_listener(output->global, &impl->output.global_listener, &output_global_events, impl);
input_node->live = output_node->live;
@ -1380,14 +1521,18 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
select_io(this);
impl->input.port = input;
impl->output.port = output;
if (this->feedback) {
impl->inode = output_node;
impl->onode = input_node;
impl->input.node = output_node;
impl->output.node = input_node;
}
else {
impl->onode = output_node;
impl->inode = input_node;
impl->output.node = output_node;
impl->input.node = input_node;
}
impl->input.mix = &this->rt.in_mix;
impl->output.mix = &this->rt.out_mix;
pw_log_debug("%p: constructed out:%p:%d.%d -> in:%p:%d.%d", impl,
output_node, output->port_id, this->rt.out_mix.port.port_id,
@ -1411,8 +1556,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
pw_impl_port_recalc_tag(output);
pw_impl_port_recalc_tag(input);
if (impl->onode != impl->inode)
this->peer = pw_node_peer_ref(impl->onode, impl->inode);
if (impl->output.node != impl->input.node)
this->peer = pw_node_peer_ref(impl->output.node, impl->input.node);
return this;
@ -1555,8 +1700,8 @@ void pw_impl_link_destroy(struct pw_impl_link *link)
try_unlink_controls(impl, link->output, link->input);
output_remove(link, link->output);
input_remove(link, link->input);
output_remove(link);
input_remove(link);
if (link->global) {
spa_hook_remove(&link->global_listener);

View file

@ -1265,19 +1265,6 @@ struct pw_control {
void *user_data;
};
/** Find a good format between 2 ports */
int pw_context_find_format(struct pw_context *context,
struct pw_impl_port *output,
uint32_t output_mix,
struct pw_impl_port *input,
uint32_t input_mix,
struct pw_properties *props,
uint32_t n_format_filters,
struct spa_pod **format_filters,
struct spa_pod **format,
struct spa_pod_builder *builder,
char **error);
int pw_context_debug_port_params(struct pw_context *context,
struct spa_node *node, enum spa_direction direction,
uint32_t port_id, uint32_t id, int err, const char *debug, ...);