port: simplify buffer allocation

Use just one function to do buffer allocation on a port.

Remove some unused variables.
This commit is contained in:
Wim Taymans 2019-08-07 12:56:57 +02:00
parent a319ec55ea
commit 06446e0d64
6 changed files with 91 additions and 185 deletions

View file

@ -549,6 +549,7 @@ static int negotiate_format(struct impl *this)
}
spa_pod_fixate(format);
spa_log_debug(this->log, NAME "%p: configure format:", this);
if (spa_log_level_enabled(this->log, SPA_LOG_LEVEL_DEBUG))
spa_debug_format(0, NULL, format);

View file

@ -334,7 +334,7 @@ static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size)
struct impl *impl = this->impl;
struct pw_memmap *mm;
uint32_t memid, mem_offset, mem_size;
uint32_t tag[5] = { impl->node_id, id };
uint32_t tag[5] = { impl->node_id, id, };
if (impl->this.flags & 1)
return 0;

View file

@ -498,7 +498,7 @@ static int clear_buffers(struct node_data *data, struct mix *mix)
pw_log_debug("port %p: clear buffers mix:%d %zd", port, mix->mix_id, mix->buffers.size);
if ((res = pw_port_use_buffers(port, mix->mix_id, 0, NULL, 0)) < 0) {
if ((res = pw_port_use_buffers(port, &mix->mix, 0, NULL, 0)) < 0) {
pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res));
return res;
}
@ -672,7 +672,7 @@ client_node_port_use_buffers(void *object,
bufs[i] = b;
}
if ((res = pw_port_use_buffers(mix->port, mix_id, flags, bufs, n_buffers)) < 0)
if ((res = pw_port_use_buffers(mix->port, &mix->mix, flags, bufs, n_buffers)) < 0)
goto error_exit_cleanup;
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC) {

View file

@ -228,7 +228,10 @@ static int do_negotiate(struct pw_link *this)
input = this->input;
output = this->output;
if ((res = pw_core_find_format(this->core, output, input, NULL, 0, NULL, &format, &b, &error)) < 0)
/* find a common format for the ports */
if ((res = pw_core_find_format(this->core,
output, input, NULL, 0, NULL,
&format, &b, &error)) < 0)
goto error;
format = spa_pod_copy(format);
@ -236,6 +239,7 @@ static int do_negotiate(struct pw_link *this)
spa_pod_builder_init(&b, buffer, sizeof(buffer));
/* if output port had format and is idle, check if it changed. If so, renegotiate */
if (out_state > PW_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
res = spa_node_port_enum_params_sync(output->node->node,
@ -271,6 +275,7 @@ static int do_negotiate(struct pw_link *this)
changed = false;
}
}
/* if input port had format and is idle, check if it changed. If so, renegotiate */
if (in_state > PW_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE) {
index = 0;
res = spa_node_port_enum_params_sync(input->node->node,
@ -554,28 +559,27 @@ static int select_io(struct pw_link *this)
static int do_allocation(struct pw_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
int res, out_res = 0, in_res = 0;
int res;
uint32_t in_flags, out_flags;
char *error = NULL;
struct pw_port *input, *output;
struct allocation allocation = { NULL, };
bool out_alloc = false;
if (this->info.state > PW_LINK_STATE_ALLOCATING)
return 0;
input = this->input;
output = this->output;
input = this->input;
pw_log_debug("link %p: in_state:%d out_state:%d", this, input->state, output->state);
pw_log_debug("link %p: out-state:%d in-state:%d", this, output->state, input->state);
pw_link_update_state(this, PW_LINK_STATE_ALLOCATING, NULL);
in_flags = input->spa_flags;
out_flags = output->spa_flags;
in_flags = input->spa_flags;
pw_log_debug("link %p: doing alloc buffers %p %p: in_flags:%08x out_flags:%08x",
this, output->node, input->node, in_flags, out_flags);
pw_log_debug("link %p: out-node:%p in-node:%p: out-flags:%08x in-flags:%08x",
this, output->node, input->node, out_flags, in_flags);
if (out_flags & SPA_PORT_FLAG_LIVE) {
pw_log_debug("setting link as live");
@ -591,7 +595,7 @@ static int do_allocation(struct pw_link *this)
struct spa_pod **params, *param;
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
uint32_t i, offset, n_params;
uint32_t i, offset, n_params, flags;
uint32_t max_buffers;
size_t minsize = 8192, stride = 0, align;
uint32_t data_sizes[1];
@ -644,8 +648,11 @@ static int do_allocation(struct pw_link *this)
/* when the output can allocate buffer memory, set the minsize to
* 0 to make sure we don't allocate memory in the shared memory */
if ((out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS))
flags = 0;
if ((out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS)) {
minsize = 0;
flags |= SPA_NODE_BUFFERS_FLAG_ALLOC;
}
data_sizes[0] = minsize;
data_strides[0] = stride;
@ -658,7 +665,7 @@ static int do_allocation(struct pw_link *this)
1,
data_sizes, data_strides,
data_aligns,
in_flags & out_flags,
out_flags & in_flags,
&allocation)) < 0) {
asprintf(&error, "error alloc buffers: %d", res);
goto error;
@ -667,71 +674,46 @@ static int do_allocation(struct pw_link *this)
pw_log_debug("link %p: allocating %d buffers %p %zd %zd", this,
allocation.n_buffers, allocation.buffers, minsize, stride);
if (out_flags & SPA_PORT_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(output,
allocation.buffers,
allocation.n_buffers)) < 0) {
asprintf(&error, "error alloc output buffers: %d", res);
goto error;
}
out_res = res;
out_alloc = true;
pw_log_debug("link %p: allocated %d buffers %p from output port: %s", this,
allocation.n_buffers, allocation.buffers, spa_strerror(out_res));
} else {
pw_log_debug("link %p: using %d buffers %p on output port", this,
allocation.n_buffers, allocation.buffers);
if ((res = pw_port_use_buffers(output,
this->rt.out_mix.port.port_id, 0,
allocation.buffers,
allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use output buffers: %s", this,
spa_strerror(res));
goto error;
}
out_res = res;
if ((res = pw_port_use_buffers(output, &this->rt.out_mix,
flags, allocation.buffers, allocation.n_buffers)) < 0) {
asprintf(&error, "error use output buffers: %d", res);
goto error;
}
move_allocation(&allocation, &output->allocation);
if (SPA_RESULT_IS_ASYNC(out_res)) {
pw_work_queue_add(impl->work, output->node,
spa_node_sync(output->node->node, out_res),
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(output->node->node, res),
pw_work_queue_add(impl->work, output->node, res,
complete_paused, this);
if (out_alloc)
if (flags & SPA_NODE_BUFFERS_FLAG_ALLOC)
return 0;
} else {
complete_paused(output->node, this, out_res, 0);
complete_paused(output->node, this, res, 0);
}
}
pw_log_debug("link %p: using %d buffers %p on input port", this,
output->allocation.n_buffers, output->allocation.buffers);
if ((res = pw_port_use_buffers(input,
this->rt.in_mix.port.port_id,
0,
output->allocation.buffers,
output->allocation.n_buffers)) < 0) {
if ((res = pw_port_use_buffers(input, &this->rt.in_mix, 0,
output->allocation.buffers,
output->allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use input buffers: %s", this,
spa_strerror(res));
goto error;
}
in_res = res;
if (SPA_RESULT_IS_ASYNC(in_res)) {
pw_work_queue_add(impl->work, input->node,
spa_node_sync(input->node->node, in_res),
if (SPA_RESULT_IS_ASYNC(res)) {
res = spa_node_sync(input->node->node, res),
pw_work_queue_add(impl->work, input->node, res,
complete_paused, this);
} else {
complete_paused(input->node, this, in_res, 0);
complete_paused(input->node, this, res, 0);
}
return 0;
error:
free_allocation(&output->allocation);
free_allocation(&input->allocation);
pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
return res;
}
@ -772,14 +754,14 @@ int pw_link_activate(struct pw_link *this)
pw_link_prepare(this);
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0)
return res;
if ((res = port_set_io(this, this->output, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.out_mix)) < 0)
return res;
if ((res = port_set_io(this, this->input, SPA_IO_Buffers, this->io,
sizeof(struct spa_io_buffers), &this->rt.in_mix)) < 0)
return res;
if (this->info.state == PW_LINK_STATE_PAUSED) {
pw_loop_invoke(this->output->node->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, this);
@ -800,36 +782,36 @@ static void check_states(void *obj, void *user_data, int res, uint32_t id)
if (this->info.state == PW_LINK_STATE_PAUSED)
return;
input = this->input;
output = this->output;
input = this->input;
if (input == NULL || output == NULL) {
if (output == NULL || input == NULL) {
pw_link_update_state(this, PW_LINK_STATE_ERROR,
strdup("link without input or output port"));
return;
}
if (input->node->info.state == PW_NODE_STATE_ERROR ||
output->node->info.state == PW_NODE_STATE_ERROR) {
pw_log_warn("link %p: one of the nodes is in error in:%d out:%d", this,
input->node->info.state,
output->node->info.state);
if (output->node->info.state == PW_NODE_STATE_ERROR ||
input->node->info.state == PW_NODE_STATE_ERROR) {
pw_log_warn("link %p: one of the nodes is in error out:%d in:%d", this,
output->node->info.state,
input->node->info.state);
return;
}
in_state = input->state;
out_state = output->state;
in_state = input->state;
pw_log_debug("link %p: input state %d, output state %d", this, in_state, out_state);
pw_log_debug("link %p: output state %d, input state %d", this, out_state, in_state);
if (in_state == PW_PORT_STATE_ERROR || out_state == PW_PORT_STATE_ERROR) {
if (out_state == PW_PORT_STATE_ERROR || in_state == PW_PORT_STATE_ERROR) {
pw_link_update_state(this, PW_LINK_STATE_ERROR, strdup("ports are in error"));
return;
}
if (PW_PORT_IS_CONTROL(output) && PW_PORT_IS_CONTROL(input)) {
pw_port_update_state(input, PW_PORT_STATE_PAUSED, NULL);
pw_port_update_state(output, PW_PORT_STATE_PAUSED, NULL);
pw_port_update_state(input, PW_PORT_STATE_PAUSED, NULL);
pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL);
}
@ -849,31 +831,11 @@ exit:
this, -EBUSY, (pw_work_func_t) check_states, this);
}
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
int res;
struct pw_port_mix *mix;
pw_log_debug("%d %p", spa_list_is_empty(&port->links), port->allocation.mem);
/* we don't clear output buffers when the link goes away. They will get
* cleared when the node goes to suspend */
if (port->direction == PW_DIRECTION_OUTPUT)
return;
if (port->direction == PW_DIRECTION_OUTPUT)
mix = &link->rt.out_mix;
else
mix = &link->rt.in_mix;
if ((res = pw_port_use_buffers(port, mix->port.port_id, 0, NULL, 0)) < 0)
pw_log_warn("link %p: port %p clear error %s", link, port, spa_strerror(res));
}
static void input_remove(struct pw_link *this, struct pw_port *port)
{
struct impl *impl = (struct impl *) this;
struct pw_port_mix *mix = &this->rt.in_mix;
int res;
pw_log_debug("link %p: remove input port %p", this, port);
spa_hook_remove(&impl->input_port_listener);
@ -883,8 +845,9 @@ static void input_remove(struct pw_link *this, struct pw_port *port)
spa_list_remove(&this->input_link);
pw_port_emit_link_removed(this->input, this);
clear_port_buffers(this, port);
if ((res = pw_port_use_buffers(port, mix, 0, NULL, 0)) < 0) {
pw_log_warn("link %p: port %p clear error %s", this, port, spa_strerror(res));
}
pw_port_release_mix(port, mix);
this->input = NULL;
}
@ -902,7 +865,8 @@ static void output_remove(struct pw_link *this, struct pw_port *port)
spa_list_remove(&this->output_link);
pw_port_emit_link_removed(this->output, this);
clear_port_buffers(this, port);
/* we don't clear output buffers when the link goes away. They will get
* cleared when the node goes to suspend */
pw_port_release_mix(port, mix);
this->output = NULL;
@ -996,21 +960,21 @@ int pw_link_deactivate(struct pw_link *this)
pw_loop_invoke(this->output->node->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this);
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, &this->rt.in_mix);
port_set_io(this, this->output, SPA_IO_Buffers, NULL, 0, &this->rt.out_mix);
port_set_io(this, this->input, SPA_IO_Buffers, NULL, 0, &this->rt.in_mix);
impl->activated = false;
}
input_node = this->input->node;
output_node = this->output->node;
input_node = this->input->node;
input_node->n_used_input_links--;
output_node->n_used_output_links--;
input_node->n_used_input_links--;
if (impl->passive) {
input_node->idle_used_input_links--;
output_node->idle_used_output_links--;
input_node->idle_used_input_links--;
}
debug_link(this);
@ -1309,8 +1273,8 @@ struct pw_link *pw_link_new(struct pw_core *core,
if (check_permission(core, output, input, properties) < 0)
goto error_link_not_allowed;
input_node = input->node;
output_node = output->node;
input_node = input->node;
impl = calloc(1, sizeof(struct impl) + user_data_size);
if (impl == NULL)
@ -1318,7 +1282,7 @@ struct pw_link *pw_link_new(struct pw_core *core,
this = &impl->this;
this->feedback = pw_node_can_reach(input_node, output_node);
pw_log_debug("link %p: new %p -> %p", this, input, output);
pw_log_debug("link %p: new out-port:%p -> in-port:%p", this, output, input);
if (user_data_size > 0)
this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void);
@ -1329,8 +1293,8 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->properties = properties;
this->info.state = PW_LINK_STATE_INIT;
this->input = input;
this->output = output;
this->input = input;
if (properties) {
const char *str = pw_properties_get(properties, PW_KEY_LINK_PASSIVE);
@ -1380,7 +1344,7 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->rt.target.signal = impl->inode->rt.target.signal;
this->rt.target.data = impl->inode->rt.target.data;
pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl,
pw_log_debug("link %p: constructed out:%p:%d.%d -> in:%p:%d.%d", impl,
output_node, output->port_id, this->rt.out_mix.port.port_id,
input_node, input->port_id, this->rt.in_mix.port.port_id);
@ -1447,7 +1411,7 @@ int pw_link_register(struct pw_link *link,
struct pw_properties *properties)
{
struct pw_core *core = link->core;
struct pw_node *input_node, *output_node;
struct pw_node *output_node, *input_node;
if (link->registered)
goto error_existed;
@ -1457,16 +1421,16 @@ int pw_link_register(struct pw_link *link,
if (properties == NULL)
return -errno;
input_node = link->input->node;
output_node = link->output->node;
input_node = link->input->node;
link->info.output_node_id = output_node->global->id;
link->info.output_port_id = link->output->global->id;
link->info.input_node_id = input_node->global->id;
link->info.input_port_id = link->input->global->id;
pw_properties_setf(properties, PW_KEY_LINK_INPUT_PORT, "%d", link->info.input_port_id);
pw_properties_setf(properties, PW_KEY_LINK_OUTPUT_PORT, "%d", link->info.output_port_id);
pw_properties_setf(properties, PW_KEY_LINK_INPUT_PORT, "%d", link->info.input_port_id);
link->global = pw_global_new(core,
PW_TYPE_INTERFACE_Link,
@ -1516,8 +1480,8 @@ void pw_link_destroy(struct pw_link *link)
try_unlink_controls(impl, link->output, link->input);
input_remove(link, link->input);
output_remove(link, link->output);
input_remove(link, link->input);
if (link->global) {
spa_hook_remove(&link->global_listener);

View file

@ -1038,7 +1038,6 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
/* setting the format always destroys the negotiated buffers */
free_allocation(&port->allocation);
port->allocated = false;
if (param == NULL || res < 0) {
pw_port_update_state(port, PW_PORT_STATE_CONFIGURE, NULL);
@ -1051,15 +1050,14 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
}
SPA_EXPORT
int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
int pw_port_use_buffers(struct pw_port *port, struct pw_port_mix *mix, uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers)
{
int res = 0;
int res = 0, res2;
struct pw_node *node = port->node;
struct pw_port_mix *mix = NULL;
pw_log_debug("port %p: %d:%d.%d: %d buffers state:%d n_mix:%d", port,
port->direction, port->port_id, mix_id,
port->direction, port->port_id, mix->id,
n_buffers, port->state, port->n_mix);
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
@ -1068,18 +1066,6 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
if (n_buffers > 0 && port->state < PW_PORT_STATE_READY)
return -EIO;
if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) != NULL) {
res = spa_node_port_use_buffers(port->mix,
mix->port.direction, mix->port.port_id, flags,
buffers, n_buffers);
pw_log_debug("port %p: use buffers on mix: %p %d (%s)",
port, port->mix, res, spa_strerror(res));
if (res == -ENOTSUP)
res = 0;
}
if (n_buffers == 0) {
if (port->n_mix == 1)
pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
@ -1091,67 +1077,28 @@ int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id,
flags, buffers, n_buffers);
if (res < 0)
if (res < 0) {
pw_log_error("port %p: use buffers on node: %d (%s)",
port, res, spa_strerror(res));
pw_port_update_state(port, PW_PORT_STATE_ERROR,
"can't use buffers on port");
return res;
}
}
port->allocated = false;
free_allocation(&port->allocation);
pw_port_call_use_buffers(port, flags, buffers, n_buffers);
}
if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res))
pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
return res;
}
SPA_EXPORT
int pw_port_alloc_buffers(struct pw_port *port,
struct spa_buffer **buffers, uint32_t n_buffers)
{
int res, res2;
struct pw_node *node = port->node;
if (port->state < PW_PORT_STATE_READY)
return -EIO;
if ((res = spa_node_port_use_buffers(node->node,
port->direction, port->port_id,
SPA_NODE_BUFFERS_FLAG_ALLOC,
buffers, n_buffers)) < 0) {
pw_log_error("port %p: %d alloc failed: %d (%s)", port, port->port_id,
res, spa_strerror(res));
}
if (res >= 0) {
res2 = pw_port_call_use_buffers(port,
SPA_NODE_BUFFERS_FLAG_ALLOC,
buffers, n_buffers);
if (res2 < 0) {
pw_log_error("port %p: %d implementation alloc failed: %d (%s)",
port, port->port_id, res, spa_strerror(res2));
if ((res2 = pw_port_call_use_buffers(port, flags, buffers, n_buffers)) < 0) {
pw_log_warn("port %p: implementation alloc failed: %d (%s)",
port, res2, spa_strerror(res2));
}
if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res))
pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
}
pw_log_debug("port %p: %d alloc %d buffers: %d (%s)", port,
port->port_id, n_buffers, res, spa_strerror(res));
free_allocation(&port->allocation);
if (res < 0) {
port->allocated = false;
} else {
port->allocated = true;
}
if (n_buffers == 0) {
if (port->n_mix == 1)
pw_port_update_state(port, PW_PORT_STATE_READY, NULL);
}
else if (!SPA_RESULT_IS_ASYNC(res)) {
pw_port_update_state(port, PW_PORT_STATE_PAUSED, NULL);
if ((res2 = spa_node_port_use_buffers(port->mix,
mix->port.direction, mix->port.port_id, flags,
buffers, n_buffers)) < 0) {
if (res2 != -ENOTSUP)
pw_log_warn("port %p: mix use buffers failed: %d (%s)",
port, res2, spa_strerror(res2));
}
return res;

View file

@ -545,8 +545,6 @@ struct pw_port {
#define PW_PORT_MIX_FLAG_MIX_ONLY (1<<1) /**< only negotiate mix ports */
uint32_t mix_flags; /**< flags for the mixing */
unsigned int allocated:1; /**< if buffers are allocated */
struct spa_list mix_list; /**< list of \ref pw_port_mix */
struct pw_map mix_port_map; /**< map from port_id from mixer */
uint32_t n_mix;
@ -874,13 +872,9 @@ int pw_port_set_param(struct pw_port *port,
uint32_t id, uint32_t flags, const struct spa_pod *param);
/** Use buffers on a port \memberof pw_port */
int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, uint32_t flags,
int pw_port_use_buffers(struct pw_port *port, struct pw_port_mix *mix, uint32_t flags,
struct spa_buffer **buffers, uint32_t n_buffers);
/** Allocate memory for buffers on a port \memberof pw_port */
int pw_port_alloc_buffers(struct pw_port *port,
struct spa_buffer **buffers, uint32_t n_buffers);
/** Change the state of the node */
int pw_node_set_state(struct pw_node *node, enum pw_node_state state);