node: clarify port_set_io and SPA_IO_Buffers

port_set_io with SPA_IO_Buffer can be used to enable/disable a port
when the node is running and so the node should make sure the io update
is synchronized with the processing loop.

Use spa_loop_invoke to make sure the mixers handle the port_io updates
correctly.

Setting buffers or a format also needs the port to be disabled so add
some checks for this in the mixers.
This commit is contained in:
Wim Taymans 2023-09-11 11:23:25 +02:00
parent 9c834427c6
commit 69d431acd4
4 changed files with 104 additions and 10 deletions

View file

@ -485,7 +485,9 @@ struct spa_node_methods {
* *
* When \a param is NULL, the parameter will be unset. * When \a param is NULL, the parameter will be unset.
* *
* This function must be called from the main thread. * This function must be called from the main thread. The node muse be paused
* or the port SPA_IO_Buffers area is NULL when this function is called with
* a param that changes the processing state (like a format change).
* *
* \param node a struct \ref spa_node * \param node a struct \ref spa_node
* \param direction a enum \ref spa_direction * \param direction a enum \ref spa_direction
@ -540,7 +542,8 @@ struct spa_node_methods {
* When this function returns async, use the spa_node_sync operation to * When this function returns async, use the spa_node_sync operation to
* wait for completion. * wait for completion.
* *
* This function must be called from the main thread. * This function must be called from the main thread. The node muse be paused
* or the port SPA_IO_Buffers area is NULL when this function is called.
* *
* \param object an object implementing the interface * \param object an object implementing the interface
* \param direction a port direction * \param direction a port direction
@ -566,6 +569,11 @@ struct spa_node_methods {
* *
* This function must be called from the main thread. * This function must be called from the main thread.
* *
* This function can be called when the node is running and the node
* must be prepared to handle changes in io areas while running. This
* is normally done by synchronizing the port io updates with the
* data processing loop.
*
* \param direction a spa_direction * \param direction a spa_direction
* \param port_id a port id * \param port_id a port id
* \param id the id of the io area, the available ids can be * \param id the id of the io area, the available ids can be

View file

@ -9,6 +9,7 @@
#include <spa/support/plugin.h> #include <spa/support/plugin.h>
#include <spa/support/log.h> #include <spa/support/log.h>
#include <spa/support/cpu.h> #include <spa/support/cpu.h>
#include <spa/support/loop.h>
#include <spa/utils/list.h> #include <spa/utils/list.h>
#include <spa/utils/names.h> #include <spa/utils/names.h>
#include <spa/utils/string.h> #include <spa/utils/string.h>
@ -88,6 +89,9 @@ struct impl {
struct spa_cpu *cpu; struct spa_cpu *cpu;
uint32_t cpu_flags; uint32_t cpu_flags;
uint32_t max_align; uint32_t max_align;
struct spa_loop *data_loop;
uint32_t quantum_limit; uint32_t quantum_limit;
struct mix_ops ops; struct mix_ops ops;
@ -525,6 +529,8 @@ static int port_set_format(void *object,
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(!this->started || port->io == NULL, -EIO);
if (format == NULL) { if (format == NULL) {
if (port->have_format) { if (port->have_format) {
port->have_format = false; port->have_format = false;
@ -632,6 +638,8 @@ impl_node_port_use_buffers(void *object,
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(!this->started || port->io == NULL, -EIO);
clear_buffers(this, port); clear_buffers(this, port);
if (n_buffers > 0 && !port->have_format) if (n_buffers > 0 && !port->have_format)
@ -670,6 +678,19 @@ impl_node_port_use_buffers(void *object,
return 0; return 0;
} }
struct io_info {
struct port *port;
void *data;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
info->port->io = info->data;
return 0;
}
static int static int
impl_node_port_set_io(void *object, impl_node_port_set_io(void *object,
enum spa_direction direction, uint32_t port_id, enum spa_direction direction, uint32_t port_id,
@ -677,6 +698,7 @@ impl_node_port_set_io(void *object,
{ {
struct impl *this = object; struct impl *this = object;
struct port *port; struct port *port;
struct io_info info;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
@ -686,10 +708,13 @@ impl_node_port_set_io(void *object,
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
info.port = port;
info.data = data;
switch (id) { switch (id) {
case SPA_IO_Buffers: case SPA_IO_Buffers:
port->io = data; spa_loop_invoke(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info);
break; break;
default: default:
return -ENOENT; return -ENOENT;

View file

@ -9,6 +9,7 @@
#include <spa/support/plugin.h> #include <spa/support/plugin.h>
#include <spa/support/log.h> #include <spa/support/log.h>
#include <spa/support/cpu.h> #include <spa/support/cpu.h>
#include <spa/support/loop.h>
#include <spa/utils/list.h> #include <spa/utils/list.h>
#include <spa/utils/names.h> #include <spa/utils/names.h>
#include <spa/utils/string.h> #include <spa/utils/string.h>
@ -85,6 +86,8 @@ struct impl {
uint32_t cpu_flags; uint32_t cpu_flags;
uint32_t max_align; uint32_t max_align;
struct spa_loop *data_loop;
uint32_t quantum_limit; uint32_t quantum_limit;
struct mix_ops ops; struct mix_ops ops;
@ -474,6 +477,8 @@ static int port_set_format(void *object,
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(!this->started || port->io == NULL, -EIO);
if (format == NULL) { if (format == NULL) {
if (port->have_format) { if (port->have_format) {
port->have_format = false; port->have_format = false;
@ -569,6 +574,8 @@ impl_node_port_use_buffers(void *object,
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(!this->started || port->io == NULL, -EIO);
clear_buffers(this, port); clear_buffers(this, port);
if (n_buffers > 0 && !port->have_format) if (n_buffers > 0 && !port->have_format)
@ -606,6 +613,19 @@ impl_node_port_use_buffers(void *object,
return 0; return 0;
} }
struct io_info {
struct port *port;
void *data;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
info->port->io = info->data;
return 0;
}
static int static int
impl_node_port_set_io(void *object, impl_node_port_set_io(void *object,
enum spa_direction direction, uint32_t port_id, enum spa_direction direction, uint32_t port_id,
@ -613,6 +633,7 @@ impl_node_port_set_io(void *object,
{ {
struct impl *this = object; struct impl *this = object;
struct port *port; struct port *port;
struct io_info info;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
@ -622,10 +643,13 @@ impl_node_port_set_io(void *object,
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
info.port = port;
info.data = data;
switch (id) { switch (id) {
case SPA_IO_Buffers: case SPA_IO_Buffers:
port->io = data; spa_loop_invoke(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info);
break; break;
default: default:
return -ENOENT; return -ENOENT;
@ -833,6 +857,12 @@ impl_init(const struct spa_handle_factory *factory,
this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
spa_log_topic_init(this->log, log_topic); spa_log_topic_init(this->log, log_topic);
this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop);
if (this->data_loop == NULL) {
spa_log_error(this->log, "a data loop is needed");
return -EINVAL;
}
this->cpu = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_CPU); this->cpu = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_CPU);
if (this->cpu) { if (this->cpu) {
this->cpu_flags = spa_cpu_get_flags(this->cpu); this->cpu_flags = spa_cpu_get_flags(this->cpu);

View file

@ -6,9 +6,10 @@
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
#include <spa/support/plugin.h>
#include <spa/support/log.h>
#include <spa/support/cpu.h> #include <spa/support/cpu.h>
#include <spa/support/log.h>
#include <spa/support/loop.h>
#include <spa/support/plugin.h>
#include <spa/utils/list.h> #include <spa/utils/list.h>
#include <spa/utils/names.h> #include <spa/utils/names.h>
#include <spa/utils/string.h> #include <spa/utils/string.h>
@ -59,6 +60,8 @@ struct impl {
struct spa_log *log; struct spa_log *log;
struct spa_loop *data_loop;
uint64_t info_all; uint64_t info_all;
struct spa_node_info info; struct spa_node_info info;
struct spa_param_info params[8]; struct spa_param_info params[8];
@ -412,6 +415,8 @@ static int port_set_format(void *object,
port = GET_PORT(this, direction, port_id); port = GET_PORT(this, direction, port_id);
spa_return_val_if_fail(!this->started || port->io == NULL, -EIO);
if (format == NULL) { if (format == NULL) {
if (port->have_format) { if (port->have_format) {
port->have_format = false; port->have_format = false;
@ -489,6 +494,8 @@ impl_node_port_use_buffers(void *object,
spa_log_debug(this->log, NAME " %p: use buffers %d on port %d:%d", spa_log_debug(this->log, NAME " %p: use buffers %d on port %d:%d",
this, n_buffers, direction, port_id); this, n_buffers, direction, port_id);
spa_return_val_if_fail(!this->started || port->io == NULL, -EIO);
clear_buffers(this, port); clear_buffers(this, port);
if (n_buffers > 0 && !port->have_format) if (n_buffers > 0 && !port->have_format)
@ -517,6 +524,19 @@ impl_node_port_use_buffers(void *object,
return 0; return 0;
} }
struct io_info {
struct port *port;
void *data;
};
static int do_port_set_io(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct io_info *info = user_data;
info->port->io = info->data;
return 0;
}
static int static int
impl_node_port_set_io(void *object, impl_node_port_set_io(void *object,
enum spa_direction direction, uint32_t port_id, enum spa_direction direction, uint32_t port_id,
@ -524,18 +544,23 @@ impl_node_port_set_io(void *object,
{ {
struct impl *this = object; struct impl *this = object;
struct port *port; struct port *port;
struct io_info info;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id);
spa_log_debug(this->log, NAME " %p: port %d:%d io %d %p/%zd", this, spa_log_debug(this->log, NAME " %p: port %d:%d io %d %p/%zd", this,
direction, port_id, id, data, size); direction, port_id, id, data, size);
spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL);
port = GET_PORT(this, direction, port_id);
info.port = port;
info.data = data;
switch (id) { switch (id) {
case SPA_IO_Buffers: case SPA_IO_Buffers:
port->io = data; spa_loop_invoke(this->data_loop,
do_port_set_io, SPA_ID_INVALID, NULL, 0, true, &info);
break; break;
default: default:
return -ENOENT; return -ENOENT;
@ -793,6 +818,12 @@ impl_init(const struct spa_handle_factory *factory,
this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log); this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop);
if (this->data_loop == NULL) {
spa_log_error(this->log, "a data loop is needed");
return -EINVAL;
}
spa_hook_list_init(&this->hooks); spa_hook_list_init(&this->hooks);
this->node.iface = SPA_INTERFACE_INIT( this->node.iface = SPA_INTERFACE_INIT(