loop: add block flag to invoke

Remove async shutdown and block instead.
This commit is contained in:
Wim Taymans 2017-06-26 10:41:19 +02:00
parent e776425846
commit bc56c59597
14 changed files with 165 additions and 203 deletions

View file

@ -81,6 +81,9 @@ pw_protocol_get_interface(struct pw_protocol *protocol,
{
struct pw_protocol_iface *protocol_iface;
if (protocol == NULL)
return NULL;
spa_list_for_each(protocol_iface, &protocol->iface_list, link) {
const struct pw_interface *iface = server ? protocol_iface->server_iface :
protocol_iface->client_iface;

View file

@ -38,8 +38,6 @@
struct impl {
struct pw_link this;
int refcount;
struct pw_work_queue *work;
struct spa_format *format_filter;
@ -831,41 +829,16 @@ bool pw_link_deactivate(struct pw_link *this)
return true;
}
static void pw_link_free(struct pw_link *link)
{
struct impl *impl = SPA_CONTAINER_OF(link, struct impl, this);
pw_log_debug("link %p: free", link);
pw_signal_emit(&link->free_signal, link);
pw_work_queue_destroy(impl->work);
if (link->info.format)
free(link->info.format);
if (impl->buffer_owner == link)
pw_memblock_free(&impl->buffer_mem);
free(impl);
}
static void link_unbind_func(void *data)
{
struct pw_resource *resource = data;
struct pw_link *this = resource->object;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
spa_list_remove(&resource->link);
if (--impl->refcount == 0)
pw_link_free(this);
}
static int
link_bind_func(struct pw_global *global, struct pw_client *client, uint32_t version, uint32_t id)
{
struct pw_link *this = global->object;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_resource *resource;
resource = pw_resource_new(client, id, global->type, 0);
@ -874,7 +847,6 @@ link_bind_func(struct pw_global *global, struct pw_client *client, uint32_t vers
goto no_mem;
pw_resource_set_implementation(resource, global->object, PW_VERSION_LINK, NULL, link_unbind_func);
impl->refcount++;
pw_log_debug("link %p: bound to %d", global->object, resource->id);
@ -913,7 +885,6 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->core = core;
this->properties = properties;
this->state = PW_LINK_STATE_INIT;
impl->refcount = 1;
this->input = input;
this->output = output;
@ -971,50 +942,10 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
}
}
static int
do_link_remove_done(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
if (this->input) {
spa_list_remove(&this->input_link);
this->input->node->n_used_input_links--;
clear_port_buffers(this, this->input);
if (this->input->node->n_used_input_links == 0 &&
this->input->node->n_used_output_links == 0 &&
this->input->node->info.state > PW_NODE_STATE_IDLE)
pw_node_update_state(this->input->node, PW_NODE_STATE_IDLE, NULL);
this->input = NULL;
}
if (this->output) {
spa_list_remove(&this->output_link);
this->output->node->n_used_output_links--;
clear_port_buffers(this, this->output);
if (this->output->node->n_used_input_links == 0 &&
this->output->node->n_used_output_links == 0 &&
this->output->node->info.state > PW_NODE_STATE_IDLE)
pw_node_update_state(this->output->node, PW_NODE_STATE_IDLE, NULL);
this->output = NULL;
}
if (--impl->refcount == 0)
pw_link_free(this);
return SPA_RESULT_OK;
}
static int
do_link_remove(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
int res;
struct pw_link *this = user_data;
if (this->rt.input) {
@ -1025,9 +956,7 @@ do_link_remove(struct spa_loop *loop,
spa_list_remove(&this->rt.output_link);
this->rt.output = NULL;
}
res = pw_loop_invoke(this->core->main_loop->loop, do_link_remove_done, seq, 0, NULL, this);
return res;
return SPA_RESULT_OK;
}
void pw_link_destroy(struct pw_link *link)
@ -1048,18 +977,54 @@ void pw_link_destroy(struct pw_link *link)
pw_signal_remove(&impl->input_port_destroy);
pw_signal_remove(&impl->input_async_complete);
impl->refcount++;
pw_loop_invoke(link->input->node->data_loop->loop,
do_link_remove, 1, 0, NULL, link);
do_link_remove, 1, 0, NULL, true, link);
}
if (link->output) {
pw_signal_remove(&impl->output_port_destroy);
pw_signal_remove(&impl->output_async_complete);
impl->refcount++;
pw_loop_invoke(link->output->node->data_loop->loop,
do_link_remove, 2, 0, NULL, link);
do_link_remove, 2, 0, NULL, true, link);
}
if (--impl->refcount == 0)
pw_link_free(link);
if (link->input) {
spa_list_remove(&link->input_link);
link->input->node->n_used_input_links--;
clear_port_buffers(link, link->input);
if (link->input->node->n_used_input_links == 0 &&
link->input->node->n_used_output_links == 0 &&
link->input->node->info.state > PW_NODE_STATE_IDLE)
pw_node_update_state(link->input->node, PW_NODE_STATE_IDLE, NULL);
link->input = NULL;
}
if (link->output) {
spa_list_remove(&link->output_link);
link->output->node->n_used_output_links--;
clear_port_buffers(link, link->output);
if (link->output->node->n_used_input_links == 0 &&
link->output->node->n_used_output_links == 0 &&
link->output->node->info.state > PW_NODE_STATE_IDLE)
pw_node_update_state(link->output->node, PW_NODE_STATE_IDLE, NULL);
link->output = NULL;
}
pw_log_debug("link %p: free", link);
pw_signal_emit(&link->free_signal, link);
pw_work_queue_destroy(impl->work);
if (link->info.format)
free(link->info.format);
if (impl->buffer_owner == link)
pw_memblock_free(&impl->buffer_mem);
free(impl);
}

View file

@ -597,46 +597,12 @@ struct pw_node *pw_node_new(struct pw_core *core,
return NULL;
}
static int
do_node_remove_done(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_node *this = user_data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_port *port, *tmp;
pw_log_debug("node %p: remove done, destroy ports", this);
spa_list_for_each_safe(port, tmp, &this->input_ports, link)
pw_port_destroy(port);
spa_list_for_each_safe(port, tmp, &this->output_ports, link)
pw_port_destroy(port);
pw_log_debug("node %p: free", this);
pw_signal_emit(&this->free_signal, this);
pw_work_queue_destroy(impl->work);
if (this->input_port_map)
free(this->input_port_map);
if (this->output_port_map)
free(this->output_port_map);
if (this->properties)
pw_properties_free(this->properties);
clear_info(this);
free(impl);
return SPA_RESULT_OK;
}
static int
do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_node *this = user_data;
struct pw_port *port, *tmp;
int res;
pause_node(this);
@ -656,10 +622,7 @@ do_node_remove(struct spa_loop *loop,
link->rt.output = NULL;
}
}
res = pw_loop_invoke(this->core->main_loop->loop, do_node_remove_done, seq, 0, NULL, this);
return res;
return SPA_RESULT_OK;
}
/** Destroy a node
@ -674,6 +637,7 @@ void pw_node_destroy(struct pw_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
struct pw_resource *resource, *tmp;
struct pw_port *port, *tmpp;
pw_log_debug("node %p: destroy", impl);
pw_signal_emit(&node->destroy_signal, node);
@ -686,7 +650,30 @@ void pw_node_destroy(struct pw_node *node)
spa_list_for_each_safe(resource, tmp, &node->resource_list, link)
pw_resource_destroy(resource);
pw_loop_invoke(node->data_loop->loop, do_node_remove, 1, 0, NULL, node);
pw_loop_invoke(node->data_loop->loop, do_node_remove, 1, 0, NULL, true, node);
pw_log_debug("node %p: destroy ports", node);
spa_list_for_each_safe(port, tmpp, &node->input_ports, link)
pw_port_destroy(port);
spa_list_for_each_safe(port, tmpp, &node->output_ports, link)
pw_port_destroy(port);
pw_log_debug("node %p: free", node);
pw_signal_emit(&node->free_signal, node);
pw_work_queue_destroy(impl->work);
if (node->input_port_map)
free(node->input_port_map);
if (node->output_port_map)
free(node->output_port_map);
if (node->properties)
pw_properties_free(node->properties);
clear_info(node);
free(impl);
}
/**

View file

@ -140,10 +140,10 @@ struct pw_link *pw_port_link(struct pw_port *output_port,
pw_loop_invoke(output_node->data_loop->loop,
do_add_link,
SPA_ID_INVALID, sizeof(struct pw_link *), &link, output_port);
SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, output_port);
pw_loop_invoke(input_node->data_loop->loop,
do_add_link,
SPA_ID_INVALID, sizeof(struct pw_link *), &link, input_port);
SPA_ID_INVALID, sizeof(struct pw_link *), &link, false, input_port);
}
return link;
@ -175,13 +175,35 @@ int pw_port_pause_rt(struct pw_port *port)
}
static int
do_remove_link_done(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
do_remove_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
struct pw_node *node = port->node;
struct pw_link *link = ((struct pw_link **) data)[0];
if (port->direction == PW_DIRECTION_INPUT) {
pw_port_pause_rt(link->rt.input);
spa_list_remove(&link->rt.input_link);
link->rt.input = NULL;
} else {
pw_port_pause_rt(link->rt.output);
spa_list_remove(&link->rt.output_link);
link->rt.output = NULL;
}
return SPA_RESULT_OK;
}
int pw_port_unlink(struct pw_port *port, struct pw_link *link)
{
int res;
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
struct pw_node *node = port->node;
pw_log_debug("port %p: start unlink %p", port, link);
res = pw_loop_invoke(node->data_loop->loop,
do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, true, port);
pw_log_debug("port %p: finish unlink", port);
if (port->direction == PW_DIRECTION_OUTPUT) {
if (link->output) {
@ -199,7 +221,7 @@ do_remove_link_done(struct spa_loop *loop,
if (!port->allocated && port->state > PW_PORT_STATE_READY) {
pw_log_debug("port %p: clear buffers on port", port);
spa_node_port_use_buffers(port->node->node,
spa_node_port_use_buffers(node->node,
port->direction, port->port_id, NULL, 0);
port->buffers = NULL;
port->n_buffers = 0;
@ -210,52 +232,26 @@ do_remove_link_done(struct spa_loop *loop,
if (node->n_used_output_links == 0 && node->n_used_input_links == 0) {
pw_node_update_state(node, PW_NODE_STATE_IDLE, NULL);
}
return SPA_RESULT_OK;
}
static int
do_remove_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
struct pw_node *this = port->node;
struct pw_link *link = ((struct pw_link **) data)[0];
int res;
if (port->direction == PW_DIRECTION_INPUT) {
pw_port_pause_rt(link->rt.input);
spa_list_remove(&link->rt.input_link);
link->rt.input = NULL;
} else {
pw_port_pause_rt(link->rt.output);
spa_list_remove(&link->rt.output_link);
link->rt.output = NULL;
}
res = pw_loop_invoke(this->core->main_loop->loop,
do_remove_link_done, seq, sizeof(struct pw_link *), &link, port);
return res;
}
int pw_port_unlink(struct pw_port *port, struct pw_link *link)
static int
do_clear_buffers(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
return pw_port_pause_rt(port);
}
int pw_port_clear_buffers(struct pw_port *port)
{
int res;
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
pw_log_debug("port %p: start unlink %p", port, link);
pw_log_debug("port %p: clear buffers", port);
res = pw_loop_invoke(port->node->data_loop->loop,
do_remove_link, impl->seq++, sizeof(struct pw_link *), &link, port);
return res;
}
static int
do_clear_buffers_done(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
int res;
do_clear_buffers, impl->seq++, 0, NULL, true, port);
if (port->state <= PW_PORT_STATE_READY)
return SPA_RESULT_OK;
@ -269,29 +265,3 @@ do_clear_buffers_done(struct spa_loop *loop,
return res;
}
static int
do_clear_buffers(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
struct pw_node *node = port->node;
int res;
pw_port_pause_rt(port);
res = pw_loop_invoke(node->core->main_loop->loop,
do_clear_buffers_done, seq, 0, NULL, port);
return res;
}
int pw_port_clear_buffers(struct pw_port *port)
{
int res;
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
pw_log_debug("port %p: clear buffers", port);
res = pw_loop_invoke(port->node->data_loop->loop,
do_clear_buffers, impl->seq++, 0, NULL, port);
return res;
}

View file

@ -91,6 +91,7 @@ struct spa_loop {
uint32_t seq,
size_t size,
void *data,
bool block,
void *user_data);
};

View file

@ -115,6 +115,7 @@ static int do_command(struct spa_loop *loop, bool async, uint32_t seq, size_t si
seq,
sizeof(res),
&res,
false,
this);
}
return res;
@ -142,6 +143,7 @@ static int impl_node_send_command(struct spa_node *node, struct spa_command *com
++this->seq,
SPA_POD_SIZE(command),
command,
false,
this);
} else

View file

@ -112,6 +112,7 @@ static int do_start(struct spa_loop *loop, bool async, uint32_t seq, size_t size
seq,
sizeof(res),
&res,
false,
this);
}
return res;
@ -130,6 +131,7 @@ static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size
seq,
sizeof(res),
&res,
false,
this);
}
return res;
@ -151,7 +153,7 @@ static int impl_node_send_command(struct spa_node *node, struct spa_command *com
if (this->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
return spa_loop_invoke(this->data_loop, do_start, ++this->seq, 0, NULL, this);
return spa_loop_invoke(this->data_loop, do_start, ++this->seq, 0, NULL, false, this);
} else if (SPA_COMMAND_TYPE(command) == this->type.command_node.Pause) {
if (!this->have_format)
return SPA_RESULT_NO_FORMAT;
@ -159,7 +161,7 @@ static int impl_node_send_command(struct spa_node *node, struct spa_command *com
if (this->n_buffers == 0)
return SPA_RESULT_NO_BUFFERS;
return spa_loop_invoke(this->data_loop, do_pause, ++this->seq, 0, NULL, this);
return spa_loop_invoke(this->data_loop, do_pause, ++this->seq, 0, NULL, false, this);
} else
return SPA_RESULT_NOT_IMPLEMENTED;

View file

@ -47,7 +47,9 @@ struct invoke_item {
uint32_t seq;
size_t size;
void *data;
bool block;
void *user_data;
int res;
};
struct type {
@ -56,6 +58,8 @@ struct type {
uint32_t loop_utils;
};
static void loop_signal_event(struct spa_source *source);
static inline void init_type(struct type *type, struct spa_type_map *map)
{
type->loop = spa_type_map_get_id(map, SPA_TYPE__Loop);
@ -80,7 +84,8 @@ struct impl {
int epoll_fd;
pthread_t thread;
struct spa_source *event;
struct spa_source *wakeup;
int ack_fd;
struct spa_ringbuffer buffer;
uint8_t buffer_data[DATAS_SIZE];
@ -187,7 +192,12 @@ static void loop_remove_source(struct spa_source *source)
static int
loop_invoke(struct spa_loop *loop,
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data)
spa_invoke_func_t func,
uint32_t seq,
size_t size,
void *data,
bool block,
void *user_data)
{
struct impl *impl = SPA_CONTAINER_OF(loop, struct impl, loop);
bool in_thread = pthread_equal(impl->thread, pthread_self());
@ -199,6 +209,7 @@ loop_invoke(struct spa_loop *loop,
} else {
int32_t filled, avail;
uint32_t idx, offset, l0;
uint64_t count = 1;
filled = spa_ringbuffer_get_write_index(&impl->buffer, &idx);
if (filled < 0 || filled > impl->buffer.size) {
@ -220,6 +231,7 @@ loop_invoke(struct spa_loop *loop,
item->func = func;
item->seq = seq;
item->size = size;
item->block = block;
item->user_data = user_data;
if (l0 > sizeof(struct invoke_item) + size) {
@ -235,17 +247,25 @@ loop_invoke(struct spa_loop *loop,
spa_ringbuffer_write_update(&impl->buffer, idx + item->item_size);
spa_loop_utils_signal_event(&impl->utils, impl->event);
spa_loop_utils_signal_event(&impl->utils, impl->wakeup);
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC(seq);
else
res = SPA_RESULT_OK;
if (block) {
if (read(impl->ack_fd, &count, sizeof(uint64_t)) != sizeof(uint64_t))
spa_log_warn(impl->log, NAME " %p: failed to read event fd: %s",
impl, strerror(errno));
res = item->res;
}
else {
if (seq != SPA_ID_INVALID)
res = SPA_RESULT_RETURN_ASYNC(seq);
else
res = SPA_RESULT_OK;
}
}
return res;
}
static void event_func(struct spa_loop_utils *utils, struct spa_source *source, void *data)
static void wakeup_func(struct spa_loop_utils *utils, struct spa_source *source, void *data)
{
struct impl *impl = data;
uint32_t index;
@ -253,9 +273,16 @@ static void event_func(struct spa_loop_utils *utils, struct spa_source *source,
while (spa_ringbuffer_get_read_index(&impl->buffer, &index) > 0) {
struct invoke_item *item =
SPA_MEMBER(impl->buffer_data, index & impl->buffer.mask, struct invoke_item);
item->func(&impl->loop, true, item->seq, item->size, item->data,
item->res = item->func(&impl->loop, true, item->seq, item->size, item->data,
item->user_data);
spa_ringbuffer_read_update(&impl->buffer, index + item->item_size);
if (item->block) {
uint64_t count = 1;
if (write(impl->ack_fd, &count, sizeof(uint64_t)) != sizeof(uint64_t))
spa_log_warn(impl->log, NAME " %p: failed to write event fd: %s",
impl, strerror(errno));
}
}
}
@ -705,7 +732,8 @@ impl_init(const struct spa_handle_factory *factory,
spa_ringbuffer_init(&impl->buffer, DATAS_SIZE);
impl->event = spa_loop_utils_add_event(&impl->utils, event_func, impl);
impl->wakeup = spa_loop_utils_add_event(&impl->utils, wakeup_func, impl);
impl->ack_fd = eventfd(0, EFD_CLOEXEC);
spa_log_info(impl->log, NAME " %p: initialized", impl);

View file

@ -261,6 +261,7 @@ static int do_pause(struct spa_loop *loop,
seq,
sizeof(res),
&res,
false,
this);
}
return res;
@ -300,6 +301,7 @@ static int do_start(struct spa_loop *loop,
seq,
sizeof(res),
&res,
false,
this);
}
return SPA_RESULT_OK;
@ -332,6 +334,7 @@ static int impl_node_send_command(struct spa_node *node, struct spa_command *com
++this->seq,
SPA_POD_SIZE(command),
command,
false,
this);
} else if (SPA_COMMAND_TYPE(command) == this->type.command_node.Pause) {
struct port *state = &this->out_ports[0];
@ -347,6 +350,7 @@ static int impl_node_send_command(struct spa_node *node, struct spa_command *com
++this->seq,
SPA_POD_SIZE(command),
command,
false,
this);
} else if (SPA_COMMAND_TYPE(command) == this->type.command_node.ClockUpdate) {
return SPA_RESULT_OK;

View file

@ -274,7 +274,7 @@ static void do_remove_source(struct spa_source *source)
static int
do_invoke(struct spa_loop *loop,
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data)
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data)
{
return func(loop, false, seq, size, data, user_data);
}

View file

@ -319,7 +319,7 @@ static void do_remove_source(struct spa_source *source)
static int
do_invoke(struct spa_loop *loop,
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data)
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data)
{
return func(loop, false, seq, size, data, user_data);
}

View file

@ -331,7 +331,7 @@ static void do_remove_source(struct spa_source *source)
static int
do_invoke(struct spa_loop *loop,
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data)
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data)
{
return func(loop, false, seq, size, data, user_data);
}

View file

@ -269,7 +269,7 @@ static void do_remove_source(struct spa_source *source)
static int
do_invoke(struct spa_loop *loop,
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data)
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data)
{
return func(loop, false, seq, size, data, user_data);
}

View file

@ -291,7 +291,7 @@ static void do_remove_source(struct spa_source *source)
static int
do_invoke(struct spa_loop *loop,
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, void *user_data)
spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data)
{
return func(loop, false, seq, size, data, user_data);
}