impl-node: add a NODE_RELIABLE property

Add a PW_KEY_NODE_RELIABLE and PW_KEY_PORT_RELIABLE property. the port
property value is inherited from the parent when not explicitly set.

Setting the property on a port will activate a more reliable tee, that
actually only recycles buffers that were consumed. It will also activate
a mode in stream that gives out new buffers only when the previous one
was recycled and nothing else is queued. This is necessary to avoid
queuing in the stream when the other side is not consuming.

When a link is async but the output node is a driver of the input, we
can avoid async io. This also removes a potential out-of-order buffer
recycling when the node resumes at a different cycle.

See #4885
This commit is contained in:
Wim Taymans 2025-09-19 09:48:13 +02:00
parent c89acd3e1c
commit 20d2a331be
6 changed files with 75 additions and 1 deletions

View file

@ -787,6 +787,7 @@ int pw_impl_link_activate(struct pw_impl_link *this)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
int res;
uint32_t io_type, io_size;
bool reliable_driver;
pw_log_debug("%p: activate activated:%d state:%s", this, impl->activated,
pw_link_state_as_string(this->info.state));
@ -795,7 +796,15 @@ int pw_impl_link_activate(struct pw_impl_link *this)
!impl->input.node->runnable || !impl->output.node->runnable)
return 0;
if (this->async) {
/* check if the output node is a driver for the input node and if
* it has reliable scheduling. Because it is a driver, it will always be
* scheduled before the input node and there will not be any concurrent access
* to the io, so we don't need async IO, even when the input is async. This
* avoid the problem of out-of-order buffers after a stall. */
reliable_driver = (impl->output.node == impl->input.node->driver_node) &&
impl->output.node->reliable;
if (this->async && !reliable_driver) {
io_type = SPA_IO_AsyncBuffers;
io_size = sizeof(struct spa_io_async_buffers);
} else {

View file

@ -1158,6 +1158,7 @@ static void check_properties(struct pw_impl_node *node)
impl->cache_params = pw_properties_get_bool(node->properties, PW_KEY_NODE_CACHE_PARAMS, true);
driver = pw_properties_get_bool(node->properties, PW_KEY_NODE_DRIVER, false);
node->exclusive = pw_properties_get_bool(node->properties, PW_KEY_NODE_EXCLUSIVE, false);
node->reliable = pw_properties_get_bool(node->properties, PW_KEY_NODE_RELIABLE, false);
if (node->driver != driver) {
pw_log_debug("%p: driver %d -> %d", node, node->driver, driver);

View file

@ -303,6 +303,37 @@ static int tee_process(void *object)
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
}
static int tee_process_reliable(void *object)
{
struct impl *impl = object;
struct pw_impl_port *this = &impl->this;
struct pw_impl_port_mix *mix;
struct spa_io_buffers *io = &this->rt.io;
uint32_t cycle = this->node->rt.position->clock.cycle & 1;
if (io->status == SPA_STATUS_HAVE_DATA) {
uint32_t buffer_id = io->buffer_id;
pw_log_trace_fp("%p: tee input status:%d id:%d cycle:%d", this, io->status, buffer_id, cycle);
spa_list_for_each(mix, &impl->rt.mix_list, rt.link) {
struct spa_io_buffers *mio = mix->io[cycle];
pw_log_trace_fp("%p: port %d %p->%p status:%d id:%d", this,
mix->port.port_id, io, mio, mio->status, mio->buffer_id);
if (mio->status != SPA_STATUS_HAVE_DATA) {
io->buffer_id = mio->buffer_id;
io->status = SPA_STATUS_NEED_DATA;
mio->buffer_id = buffer_id;
mio->status = SPA_STATUS_HAVE_DATA;
break;
}
}
}
return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
}
static int tee_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
{
struct impl *impl = object;
@ -322,6 +353,15 @@ static const struct spa_node_methods schedule_tee_node = {
.process = tee_process,
};
static const struct spa_node_methods schedule_tee_node_reliable = {
SPA_VERSION_NODE_METHODS,
.add_listener = mix_add_listener,
.port_enum_params = mix_port_enum_params,
.port_set_io = port_set_io,
.port_reuse_buffer = tee_reuse_buffer,
.process = tee_process_reliable,
};
static int schedule_mix_input(void *object)
{
struct impl *impl = object;
@ -456,6 +496,7 @@ int pw_impl_port_release_mix(struct pw_impl_port *port, struct pw_impl_port_mix
static int check_properties(struct pw_impl_port *port)
{
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
struct pw_impl_node *node = port->node;
bool is_control, is_network, is_monitor, is_device, is_duplex, is_virtual;
const char *media_class, *override_device_prefix, *channel_names;
@ -480,6 +521,14 @@ static int check_properties(struct pw_impl_port *port)
port->ignore_latency = pw_properties_get_bool(port->properties, PW_KEY_PORT_IGNORE_LATENCY, false);
port->exclusive = pw_properties_get_bool(port->properties, PW_KEY_PORT_EXCLUSIVE, node->exclusive);
port->reliable = pw_properties_get_bool(port->properties, PW_KEY_PORT_RELIABLE, node->reliable);
if (port->direction == PW_DIRECTION_OUTPUT) {
if (port->reliable)
impl->mix_node.iface.cb.funcs = &schedule_tee_node_reliable;
else
impl->mix_node.iface.cb.funcs = &schedule_tee_node;
}
/* inherit passive state from parent node */
port->passive = pw_properties_get_bool(port->properties, PW_KEY_PORT_PASSIVE,

View file

@ -232,6 +232,8 @@ extern "C" {
#define PW_KEY_NODE_PHYSICAL "node.physical" /**< ports from the node are physical */
#define PW_KEY_NODE_TERMINAL "node.terminal" /**< ports from the node are terminal */
#define PW_KEY_NODE_RELIABLE "node.reliable" /**< node uses reliable transport 1.6.0 */
/** Port keys */
#define PW_KEY_PORT_ID "port.id" /**< port id */
#define PW_KEY_PORT_NAME "port.name" /**< port name */
@ -249,6 +251,7 @@ extern "C" {
#define PW_KEY_PORT_IGNORE_LATENCY "port.ignore-latency" /**< latency ignored by peers, since 0.3.71 */
#define PW_KEY_PORT_GROUP "port.group" /**< the port group of the port 1.2.0 */
#define PW_KEY_PORT_EXCLUSIVE "port.exclusive" /**< link port only once 1.6.0 */
#define PW_KEY_PORT_RELIABLE "port.reliable" /**< port uses reliable transport 1.6.0 */
/** link properties */
#define PW_KEY_LINK_ID "link.id" /**< a link id */

View file

@ -785,6 +785,7 @@ struct pw_impl_node {
unsigned int lazy:1; /**< the graph is lazy scheduling */
unsigned int exclusive:1; /**< ports can only be linked once */
unsigned int leaf:1; /**< node only produces/consumes data */
unsigned int reliable:1; /**< ports need reliable tee */
uint32_t transport; /**< latest transport request */
@ -970,6 +971,7 @@ struct pw_impl_port {
unsigned int ignore_latency:1;
unsigned int have_latency:1;
unsigned int exclusive:1; /**< port can only be linked once */
unsigned int reliable:1; /**< port needs reliable tee */
unsigned int have_tag_param:1;
struct spa_pod *tag[2]; /**< tags */

View file

@ -2507,6 +2507,16 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
struct buffer *b;
int res;
/* For reliable output streams, only give buffers when both queue AND output IO are clear */
if (impl->direction == SPA_DIRECTION_OUTPUT && stream->node->reliable) {
struct spa_io_buffers *io = impl->io;
if (!queue_is_empty(impl, &impl->queued) || io->status == SPA_STATUS_HAVE_DATA) {
errno = EAGAIN;
return NULL;
}
}
if ((b = queue_pop(impl, &impl->dequeued)) == NULL) {
res = -errno;
pw_log_trace_fp("%p: no more buffers: %m", stream);