From 69d431acd49c0229386d4b254367d0d135e6403f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 11 Sep 2023 11:23:25 +0200 Subject: [PATCH] node: clarify port_set_io and SPA_IO_Buffers port_set_io with SPA_IO_Buffer can be used to enable/disable a port when the node is running and so the node should make sure the io update is synchronized with the processing loop. Use spa_loop_invoke to make sure the mixers handle the port_io updates correctly. Setting buffers or a format also needs the port to be disabled so add some checks for this in the mixers. --- spa/include/spa/node/node.h | 12 ++++++-- spa/plugins/audiomixer/audiomixer.c | 27 +++++++++++++++++- spa/plugins/audiomixer/mixer-dsp.c | 32 ++++++++++++++++++++- spa/plugins/control/mixer.c | 43 +++++++++++++++++++++++++---- 4 files changed, 104 insertions(+), 10 deletions(-) diff --git a/spa/include/spa/node/node.h b/spa/include/spa/node/node.h index 933ef178f..f3eb6c027 100644 --- a/spa/include/spa/node/node.h +++ b/spa/include/spa/node/node.h @@ -485,7 +485,9 @@ struct spa_node_methods { * * When \a param is NULL, the parameter will be unset. * - * This function must be called from the main thread. + * This function must be called from the main thread. The node muse be paused + * or the port SPA_IO_Buffers area is NULL when this function is called with + * a param that changes the processing state (like a format change). * * \param node a struct \ref spa_node * \param direction a enum \ref spa_direction @@ -540,7 +542,8 @@ struct spa_node_methods { * When this function returns async, use the spa_node_sync operation to * wait for completion. * - * This function must be called from the main thread. + * This function must be called from the main thread. The node muse be paused + * or the port SPA_IO_Buffers area is NULL when this function is called. * * \param object an object implementing the interface * \param direction a port direction @@ -566,6 +569,11 @@ struct spa_node_methods { * * This function must be called from the main thread. * + * This function can be called when the node is running and the node + * must be prepared to handle changes in io areas while running. This + * is normally done by synchronizing the port io updates with the + * data processing loop. + * * \param direction a spa_direction * \param port_id a port id * \param id the id of the io area, the available ids can be diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index fbe4c795c..2dcd32de1 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -88,6 +89,9 @@ struct impl { struct spa_cpu *cpu; uint32_t cpu_flags; uint32_t max_align; + + struct spa_loop *data_loop; + uint32_t quantum_limit; struct mix_ops ops; @@ -525,6 +529,8 @@ static int port_set_format(void *object, port = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(!this->started || port->io == NULL, -EIO); + if (format == NULL) { if (port->have_format) { port->have_format = false; @@ -632,6 +638,8 @@ impl_node_port_use_buffers(void *object, port = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(!this->started || port->io == NULL, -EIO); + clear_buffers(this, port); if (n_buffers > 0 && !port->have_format) @@ -670,6 +678,19 @@ impl_node_port_use_buffers(void *object, return 0; } +struct io_info { + struct port *port; + void *data; +}; + +static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct io_info *info = user_data; + info->port->io = info->data; + return 0; +} + static int impl_node_port_set_io(void *object, enum spa_direction direction, uint32_t port_id, @@ -677,6 +698,7 @@ impl_node_port_set_io(void *object, { struct impl *this = object; struct port *port; + struct io_info info; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -686,10 +708,13 @@ impl_node_port_set_io(void *object, spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); port = GET_PORT(this, direction, port_id); + info.port = port; + info.data = data; switch (id) { case SPA_IO_Buffers: - port->io = data; + spa_loop_invoke(this->data_loop, + do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); break; default: return -ENOENT; diff --git a/spa/plugins/audiomixer/mixer-dsp.c b/spa/plugins/audiomixer/mixer-dsp.c index 17890253d..31c585b57 100644 --- a/spa/plugins/audiomixer/mixer-dsp.c +++ b/spa/plugins/audiomixer/mixer-dsp.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -85,6 +86,8 @@ struct impl { uint32_t cpu_flags; uint32_t max_align; + struct spa_loop *data_loop; + uint32_t quantum_limit; struct mix_ops ops; @@ -474,6 +477,8 @@ static int port_set_format(void *object, port = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(!this->started || port->io == NULL, -EIO); + if (format == NULL) { if (port->have_format) { port->have_format = false; @@ -569,6 +574,8 @@ impl_node_port_use_buffers(void *object, port = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(!this->started || port->io == NULL, -EIO); + clear_buffers(this, port); if (n_buffers > 0 && !port->have_format) @@ -606,6 +613,19 @@ impl_node_port_use_buffers(void *object, return 0; } +struct io_info { + struct port *port; + void *data; +}; + +static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct io_info *info = user_data; + info->port->io = info->data; + return 0; +} + static int impl_node_port_set_io(void *object, enum spa_direction direction, uint32_t port_id, @@ -613,6 +633,7 @@ impl_node_port_set_io(void *object, { struct impl *this = object; struct port *port; + struct io_info info; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -622,10 +643,13 @@ impl_node_port_set_io(void *object, spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); port = GET_PORT(this, direction, port_id); + info.port = port; + info.data = data; switch (id) { case SPA_IO_Buffers: - port->io = data; + spa_loop_invoke(this->data_loop, + do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); break; default: return -ENOENT; @@ -833,6 +857,12 @@ impl_init(const struct spa_handle_factory *factory, this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); spa_log_topic_init(this->log, log_topic); + this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); + if (this->data_loop == NULL) { + spa_log_error(this->log, "a data loop is needed"); + return -EINVAL; + } + this->cpu = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_CPU); if (this->cpu) { this->cpu_flags = spa_cpu_get_flags(this->cpu); diff --git a/spa/plugins/control/mixer.c b/spa/plugins/control/mixer.c index 96a670e64..b65d4baab 100644 --- a/spa/plugins/control/mixer.c +++ b/spa/plugins/control/mixer.c @@ -6,9 +6,10 @@ #include #include -#include -#include #include +#include +#include +#include #include #include #include @@ -59,6 +60,8 @@ struct impl { struct spa_log *log; + struct spa_loop *data_loop; + uint64_t info_all; struct spa_node_info info; struct spa_param_info params[8]; @@ -412,6 +415,8 @@ static int port_set_format(void *object, port = GET_PORT(this, direction, port_id); + spa_return_val_if_fail(!this->started || port->io == NULL, -EIO); + if (format == NULL) { if (port->have_format) { port->have_format = false; @@ -489,6 +494,8 @@ impl_node_port_use_buffers(void *object, spa_log_debug(this->log, NAME " %p: use buffers %d on port %d:%d", this, n_buffers, direction, port_id); + spa_return_val_if_fail(!this->started || port->io == NULL, -EIO); + clear_buffers(this, port); if (n_buffers > 0 && !port->have_format) @@ -517,6 +524,19 @@ impl_node_port_use_buffers(void *object, return 0; } +struct io_info { + struct port *port; + void *data; +}; + +static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct io_info *info = user_data; + info->port->io = info->data; + return 0; +} + static int impl_node_port_set_io(void *object, enum spa_direction direction, uint32_t port_id, @@ -524,18 +544,23 @@ impl_node_port_set_io(void *object, { struct impl *this = object; struct port *port; + struct io_info info; spa_return_val_if_fail(this != NULL, -EINVAL); - spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); - - port = GET_PORT(this, direction, port_id); spa_log_debug(this->log, NAME " %p: port %d:%d io %d %p/%zd", this, direction, port_id, id, data, size); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + info.port = port; + info.data = data; + switch (id) { case SPA_IO_Buffers: - port->io = data; + spa_loop_invoke(this->data_loop, + do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info); break; default: return -ENOENT; @@ -793,6 +818,12 @@ impl_init(const struct spa_handle_factory *factory, this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); + this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop); + if (this->data_loop == NULL) { + spa_log_error(this->log, "a data loop is needed"); + return -EINVAL; + } + spa_hook_list_init(&this->hooks); this->node.iface = SPA_INTERFACE_INIT(