link: improve allocation

Make a structure to track allocation of buffers on ports.
Handle more allocation failures.
Update port status immediately when clearing format
This commit is contained in:
Wim Taymans 2018-02-26 16:59:28 +01:00
parent b8eccc3648
commit 13fcaf74e6
3 changed files with 122 additions and 101 deletions

View file

@ -238,7 +238,7 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
return res; return res;
} }
static struct spa_pod *find_param(struct spa_pod **params, int n_params, uint32_t type) static struct spa_pod *find_param(struct spa_pod **params, uint32_t n_params, uint32_t type)
{ {
uint32_t i; uint32_t i;
@ -312,15 +312,16 @@ static struct spa_pod *find_param(struct spa_pod **params, int n_params, uint32_
* The shared memory block should not contain any types or structure, * The shared memory block should not contain any types or structure,
* just the actual metadata contents. * just the actual metadata contents.
*/ */
static struct spa_buffer **alloc_buffers(struct pw_link *this, static int alloc_buffers(struct pw_link *this,
uint32_t n_buffers, uint32_t n_buffers,
uint32_t n_params, uint32_t n_params,
struct spa_pod **params, struct spa_pod **params,
uint32_t n_datas, uint32_t n_datas,
size_t *data_sizes, size_t *data_sizes,
ssize_t *data_strides, ssize_t *data_strides,
struct pw_memblock **mem) struct allocation *allocation)
{ {
int res;
struct spa_buffer **buffers, *bp; struct spa_buffer **buffers, *bp;
uint32_t i; uint32_t i;
size_t skel_size, data_size, meta_size; size_t skel_size, data_size, meta_size;
@ -369,9 +370,10 @@ static struct spa_buffer **alloc_buffers(struct pw_link *this,
/* pointer to buffer structures */ /* pointer to buffer structures */
bp = SPA_MEMBER(buffers, n_buffers * sizeof(struct spa_buffer *), struct spa_buffer); bp = SPA_MEMBER(buffers, n_buffers * sizeof(struct spa_buffer *), struct spa_buffer);
pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | if ((res = pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD |
PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_MAP_READWRITE |
PW_MEMBLOCK_FLAG_SEAL, n_buffers * data_size, &m); PW_MEMBLOCK_FLAG_SEAL, n_buffers * data_size, &m)) < 0)
return res;
for (i = 0; i < n_buffers; i++) { for (i = 0; i < n_buffers; i++) {
int j; int j;
@ -422,8 +424,11 @@ static struct spa_buffer **alloc_buffers(struct pw_link *this,
} }
} }
} }
*mem = m; allocation->mem = m;
return buffers; allocation->n_buffers = n_buffers;
allocation->buffers = buffers;
return 0;
} }
static int static int
@ -552,27 +557,25 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
spa_debug_port_info(oinfo); spa_debug_port_info(oinfo);
spa_debug_port_info(iinfo); spa_debug_port_info(iinfo);
} }
if (this->buffers == NULL && output->n_buffers) { if (this->allocation.buffers == NULL && output->allocation.n_buffers) {
out_flags = 0; out_flags = 0;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
this->n_buffers = output->n_buffers; this->allocation = output->allocation;
this->buffers = output->buffers; this->allocation_owner = output;
this->buffer_owner = output; pw_log_debug("link %p: reusing %d output buffers %p", this,
pw_log_debug("link %p: reusing %d output buffers %p", this, this->n_buffers, this->allocation.n_buffers, this->allocation.buffers);
this->buffers); } else if (this->allocation.buffers == NULL && input->allocation.n_buffers && input->mix == NULL) {
} else if (this->buffers == NULL && input->n_buffers && input->mix == NULL) {
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = 0; in_flags = 0;
this->n_buffers = input->n_buffers; this->allocation = input->allocation;
this->buffers = input->buffers; this->allocation_owner = input;
this->buffer_owner = input; pw_log_debug("link %p: reusing %d input buffers %p", this,
pw_log_debug("link %p: reusing %d input buffers %p", this, this->n_buffers, this->allocation.n_buffers, this->allocation.buffers);
this->buffers); } else if (this->allocation.buffers == NULL) {
} else if (this->buffers == NULL) {
struct spa_pod **params, *param; struct spa_pod **params, *param;
uint8_t buffer[4096]; uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
int i, offset, n_params; uint32_t i, offset, n_params;
uint32_t max_buffers; uint32_t max_buffers;
size_t minsize = 1024, stride = 0; size_t minsize = 1024, stride = 0;
size_t data_sizes[1]; size_t data_sizes[1];
@ -625,52 +628,62 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
data_sizes[0] = minsize; data_sizes[0] = minsize;
data_strides[0] = stride; data_strides[0] = stride;
this->buffer_owner = this; if ((res = alloc_buffers(this,
this->n_buffers = max_buffers; max_buffers,
this->buffers = alloc_buffers(this, n_params,
this->n_buffers, params,
n_params, 1,
params, data_sizes, data_strides,
1, &this->allocation)) < 0) {
data_sizes, data_strides, asprintf(&error, "error alloc buffers: %d", res);
&this->buffer_mem); goto error;
}
this->allocation_owner = this;
pw_log_debug("link %p: allocating %d buffers %p %zd %zd", this, pw_log_debug("link %p: allocating %d buffers %p %zd %zd", this,
this->n_buffers, this->buffers, minsize, stride); this->allocation.n_buffers, this->allocation.buffers, minsize, stride);
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(output, if ((res = pw_port_alloc_buffers(output,
params, n_params, params, n_params,
this->buffers, &this->n_buffers)) < 0) { this->allocation.buffers,
&this->allocation.n_buffers)) < 0) {
asprintf(&error, "error alloc output buffers: %d", res); asprintf(&error, "error alloc output buffers: %d", res);
goto error; goto error;
} }
if (SPA_RESULT_IS_ASYNC(res)) if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, output->node, res, complete_paused, output); pw_work_queue_add(impl->work, output->node, res, complete_paused, output);
output->buffer_mem = this->buffer_mem;
this->buffer_owner = output; output->allocation = this->allocation;
this->allocation_owner = output;
pw_log_debug("link %p: allocated %d buffers %p from output port", this, pw_log_debug("link %p: allocated %d buffers %p from output port", this,
this->n_buffers, this->buffers); this->allocation.n_buffers, this->allocation.buffers);
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { } else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(input, if ((res = pw_port_alloc_buffers(input,
params, n_params, params, n_params,
this->buffers, &this->n_buffers)) < 0) { this->allocation.buffers,
&this->allocation.n_buffers)) < 0) {
asprintf(&error, "error alloc input buffers: %d", res); asprintf(&error, "error alloc input buffers: %d", res);
goto error; goto error;
} }
if (SPA_RESULT_IS_ASYNC(res)) if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, input->node, res, complete_paused, input); pw_work_queue_add(impl->work, input->node, res, complete_paused, input);
input->buffer_mem = this->buffer_mem;
this->buffer_owner = input; input->allocation = this->allocation;
this->allocation_owner = input;
pw_log_debug("link %p: allocated %d buffers %p from input port", this, pw_log_debug("link %p: allocated %d buffers %p from input port", this,
this->n_buffers, this->buffers); this->allocation.n_buffers, this->allocation.buffers);
} }
} }
if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("link %p: using %d buffers %p on input port", this, pw_log_debug("link %p: using %d buffers %p on input port", this,
this->n_buffers, this->buffers); this->allocation.n_buffers, this->allocation.buffers);
if ((res = pw_port_use_buffers(input, this->buffers, this->n_buffers)) < 0) { if ((res = pw_port_use_buffers(input,
this->allocation.buffers,
this->allocation.n_buffers)) < 0) {
asprintf(&error, "error use input buffers: %d", res); asprintf(&error, "error use input buffers: %d", res);
goto error; goto error;
} }
@ -678,17 +691,19 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_work_queue_add(impl->work, input->node, res, complete_paused, input); pw_work_queue_add(impl->work, input->node, res, complete_paused, input);
} else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { } else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("link %p: using %d buffers %p on output port", this, pw_log_debug("link %p: using %d buffers %p on output port", this,
this->n_buffers, this->buffers); this->allocation.n_buffers, this->allocation.buffers);
if ((res = pw_port_use_buffers(output, this->buffers, this->n_buffers)) < 0) { if ((res = pw_port_use_buffers(output,
this->allocation.buffers,
this->allocation.n_buffers)) < 0) {
asprintf(&error, "error use output buffers: %d", res); asprintf(&error, "error use output buffers: %d", res);
goto error; goto error;
} }
if (SPA_RESULT_IS_ASYNC(res)) if (SPA_RESULT_IS_ASYNC(res))
pw_work_queue_add(impl->work, output->node, res, complete_paused, output); pw_work_queue_add(impl->work, output->node, res, complete_paused, output);
output->buffer_mem = this->buffer_mem; output->allocation = this->allocation;
output->allocated = false; output->allocated = false;
this->buffer_owner = output; this->allocation_owner = output;
} else { } else {
asprintf(&error, "no common buffer alloc found"); asprintf(&error, "no common buffer alloc found");
goto error; goto error;
@ -697,11 +712,9 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
return 0; return 0;
error: error:
output->buffers = NULL; drop_allocation(&output->allocation);
output->n_buffers = 0;
output->allocated = false; output->allocated = false;
input->buffers = NULL; drop_allocation(&input->allocation);
input->n_buffers = 0;
input->allocated = false; input->allocated = false;
pw_link_update_state(this, PW_LINK_STATE_ERROR, error); pw_link_update_state(this, PW_LINK_STATE_ERROR, error);
return res; return res;
@ -767,7 +780,7 @@ static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state)
static int check_states(struct pw_link *this, void *user_data, int res) static int check_states(struct pw_link *this, void *user_data, int res)
{ {
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
uint32_t in_state, out_state; int in_state, out_state;
struct pw_port *input, *output; struct pw_port *input, *output;
if (this->state == PW_LINK_STATE_ERROR) if (this->state == PW_LINK_STATE_ERROR)
@ -840,7 +853,7 @@ output_node_async_complete(void *data, uint32_t seq, int res)
static void clear_port_buffers(struct pw_link *link, struct pw_port *port) static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{ {
if (spa_list_is_empty(&port->links) && link->buffer_owner != port) if (spa_list_is_empty(&port->links) && link->allocation_owner != port)
pw_port_use_buffers(port, NULL, 0); pw_port_use_buffers(port, NULL, 0);
} }
@ -867,8 +880,8 @@ static void input_remove(struct pw_link *this, struct pw_port *port)
spa_list_remove(&this->input_link); spa_list_remove(&this->input_link);
spa_hook_list_call(&this->input->listener_list, struct pw_port_events, link_removed, this); spa_hook_list_call(&this->input->listener_list, struct pw_port_events, link_removed, this);
this->input = NULL;
clear_port_buffers(this, port); clear_port_buffers(this, port);
this->input = NULL;
} }
static int static int
@ -894,8 +907,8 @@ static void output_remove(struct pw_link *this, struct pw_port *port)
spa_list_remove(&this->output_link); spa_list_remove(&this->output_link);
spa_hook_list_call(&this->output->listener_list, struct pw_port_events, link_removed, this); spa_hook_list_call(&this->output->listener_list, struct pw_port_events, link_removed, this);
this->output = NULL;
clear_port_buffers(this, port); clear_port_buffers(this, port);
this->output = NULL;
} }
static void on_port_destroy(struct pw_link *this, struct pw_port *port) static void on_port_destroy(struct pw_link *this, struct pw_port *port)
@ -1138,8 +1151,8 @@ struct pw_link *pw_link_new(struct pw_core *core,
if (output_node->clock) if (output_node->clock)
input_node->clock = output_node->clock; input_node->clock = output_node->clock;
pw_log_debug("link %p: output node %p clock %p, live %d", this, output_node, output_node->clock, pw_log_debug("link %p: output node %p clock %p, live %d",
output_node->live); this, output_node, output_node->clock, output_node->live);
spa_list_append(&output->links, &this->output_link); spa_list_append(&output->links, &this->output_link);
spa_list_append(&input->links, &this->input_link); spa_list_append(&input->links, &this->input_link);
@ -1292,10 +1305,9 @@ void pw_link_destroy(struct pw_link *link)
if (link->info.format) if (link->info.format)
free(link->info.format); free(link->info.format);
if (link->buffer_owner == link) { if (link->allocation_owner == link)
free(link->buffers); free_allocation(&link->allocation);
pw_memblock_free(link->buffer_mem);
}
free(impl); free(impl);
} }

View file

@ -559,10 +559,8 @@ void pw_port_destroy(struct pw_port *port)
pw_log_debug("port %p: free", port); pw_log_debug("port %p: free", port);
spa_hook_list_call(&port->listener_list, struct pw_port_events, free); spa_hook_list_call(&port->listener_list, struct pw_port_events, free);
if (port->allocated) { if (port->allocated)
free(port->buffers); free_allocation(&port->allocation);
pw_memblock_free(port->buffer_mem);
}
if (port->properties) if (port->properties)
pw_properties_free(port->properties); pw_properties_free(port->properties);
@ -661,23 +659,24 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
const struct spa_pod *param) const struct spa_pod *param)
{ {
int res; int res;
struct pw_node *node = port->node;
struct pw_core *core = node->core;
struct pw_type *t = &core->type;
res = spa_node_port_set_param(port->node->node, port->direction, port->port_id, id, flags, param); res = spa_node_port_set_param(node->node, port->direction, port->port_id, id, flags, param);
pw_log_debug("port %p: set param %s: %d (%s)", port, pw_log_debug("port %p: set param %s: %d (%s)", port,
spa_type_map_get_type(port->node->core->type.map, id), res, spa_strerror(res)); spa_type_map_get_type(t->map, id), res, spa_strerror(res));
if (!SPA_RESULT_IS_ASYNC(res) && id == port->node->core->type.param.idFormat) { if (id == t->param.idFormat) {
if (param == NULL || res < 0) { if (param == NULL || res < 0) {
if (port->allocated) { if (port->allocated) {
free(port->buffers); free_allocation(&port->allocation);
pw_memblock_free(port->buffer_mem); port->allocated = false;
} }
port->buffers = NULL; drop_allocation(&port->allocation);
port->n_buffers = 0;
port->allocated = false;
port_update_state (port, PW_PORT_STATE_CONFIGURE); port_update_state (port, PW_PORT_STATE_CONFIGURE);
} }
else { else if (!SPA_RESULT_IS_ASYNC(res)) {
port_update_state (port, PW_PORT_STATE_READY); port_update_state (port, PW_PORT_STATE_READY);
} }
} }
@ -699,19 +698,17 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
pw_log_debug("port %p: use %d buffers: %d (%s)", port, n_buffers, res, spa_strerror(res)); pw_log_debug("port %p: use %d buffers: %d (%s)", port, n_buffers, res, spa_strerror(res));
if (port->allocated) { if (port->allocated) {
free(port->buffers); free_allocation(&port->allocation);
pw_memblock_free(port->buffer_mem); port->allocated = false;
} }
if (res < 0) { if (res < 0) {
port->buffers = NULL; drop_allocation(&port->allocation);
port->n_buffers = 0;
} else { } else {
port->buffers = buffers; port->allocation.buffers = buffers;
port->n_buffers = n_buffers; port->allocation.n_buffers = n_buffers;
} }
port->allocated = false;
if (port->n_buffers == 0) if (port->allocation.n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY); port_update_state (port, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res)) else if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED); port_update_state (port, PW_PORT_STATE_PAUSED);
@ -735,21 +732,19 @@ int pw_port_alloc_buffers(struct pw_port *port,
pw_log_debug("port %p: alloc %d buffers: %d (%s)", port, *n_buffers, res, spa_strerror(res)); pw_log_debug("port %p: alloc %d buffers: %d (%s)", port, *n_buffers, res, spa_strerror(res));
if (port->allocated) { if (port->allocated) {
free(port->buffers); free_allocation(&port->allocation);
pw_memblock_free(port->buffer_mem);
} }
if (res < 0) { if (res < 0) {
port->buffers = NULL; drop_allocation(&port->allocation);
port->n_buffers = 0;
port->allocated = false; port->allocated = false;
} }
else { else {
port->buffers = buffers; port->allocation.buffers = buffers;
port->n_buffers = *n_buffers; port->allocation.n_buffers = *n_buffers;
port->allocated = true; port->allocated = true;
} }
if (port->n_buffers == 0) if (port->allocation.n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY); port_update_state (port, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res)) else if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED); port_update_state (port, PW_PORT_STATE_PAUSED);

View file

@ -189,6 +189,24 @@ struct pw_main_loop {
bool running; bool running;
}; };
struct allocation {
struct pw_memblock *mem; /**< allocated buffer memory */
struct spa_buffer **buffers; /**< port buffers */
uint32_t n_buffers; /**< number of port buffers */
};
static inline void drop_allocation(struct allocation *alloc)
{
alloc->buffers = NULL;
alloc->n_buffers = 0;
}
static inline void free_allocation(struct allocation *alloc)
{
pw_memblock_free(alloc->mem);
free(alloc->buffers);
}
struct pw_link { struct pw_link {
struct pw_core *core; /**< core object */ struct pw_core *core; /**< core object */
struct spa_list link; /**< link in core link_list */ struct spa_list link; /**< link in core link_list */
@ -213,10 +231,8 @@ struct pw_link {
struct spa_hook_list listener_list; struct spa_hook_list listener_list;
void *buffer_owner; void *allocation_owner;
struct pw_memblock *buffer_mem; struct allocation allocation;
struct spa_buffer **buffers;
uint32_t n_buffers;
struct { struct {
struct spa_graph_port out_port; struct spa_graph_port out_port;
@ -304,9 +320,7 @@ struct pw_port {
struct spa_io_buffers io; /**< io area of the port */ struct spa_io_buffers io; /**< io area of the port */
bool allocated; /**< if buffers are allocated */ bool allocated; /**< if buffers are allocated */
struct pw_memblock *buffer_mem; /**< allocated buffer memory */ struct allocation allocation;
struct spa_buffer **buffers; /**< port buffers */
uint32_t n_buffers; /**< number of port buffers */
struct spa_list links; /**< list of \ref pw_link */ struct spa_list links; /**< list of \ref pw_link */