diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index cdbfc2faa..99eea5a55 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -787,6 +787,7 @@ int pw_impl_link_activate(struct pw_impl_link *this) struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); int res; uint32_t io_type, io_size; + bool reliable_driver; pw_log_debug("%p: activate activated:%d state:%s", this, impl->activated, pw_link_state_as_string(this->info.state)); @@ -795,7 +796,15 @@ int pw_impl_link_activate(struct pw_impl_link *this) !impl->input.node->runnable || !impl->output.node->runnable) return 0; - if (this->async) { + /* check if the output node is a driver for the input node and if + * it has reliable scheduling. Because it is a driver, it will always be + * scheduled before the input node and there will not be any concurrent access + * to the io, so we don't need async IO, even when the input is async. This + * avoid the problem of out-of-order buffers after a stall. */ + reliable_driver = (impl->output.node == impl->input.node->driver_node) && + impl->output.node->reliable; + + if (this->async && !reliable_driver) { io_type = SPA_IO_AsyncBuffers; io_size = sizeof(struct spa_io_async_buffers); } else { diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 87fbe58e2..a8f65b06c 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -1158,6 +1158,7 @@ static void check_properties(struct pw_impl_node *node) impl->cache_params = pw_properties_get_bool(node->properties, PW_KEY_NODE_CACHE_PARAMS, true); driver = pw_properties_get_bool(node->properties, PW_KEY_NODE_DRIVER, false); node->exclusive = pw_properties_get_bool(node->properties, PW_KEY_NODE_EXCLUSIVE, false); + node->reliable = pw_properties_get_bool(node->properties, PW_KEY_NODE_RELIABLE, false); if (node->driver != driver) { pw_log_debug("%p: driver %d -> %d", node, node->driver, driver); diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index aff08e4be..b6b06d87c 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -303,6 +303,37 @@ static int tee_process(void *object) return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; } +static int tee_process_reliable(void *object) +{ + struct impl *impl = object; + struct pw_impl_port *this = &impl->this; + struct pw_impl_port_mix *mix; + struct spa_io_buffers *io = &this->rt.io; + uint32_t cycle = this->node->rt.position->clock.cycle & 1; + + if (io->status == SPA_STATUS_HAVE_DATA) { + uint32_t buffer_id = io->buffer_id; + + pw_log_trace_fp("%p: tee input status:%d id:%d cycle:%d", this, io->status, buffer_id, cycle); + + spa_list_for_each(mix, &impl->rt.mix_list, rt.link) { + struct spa_io_buffers *mio = mix->io[cycle]; + + pw_log_trace_fp("%p: port %d %p->%p status:%d id:%d", this, + mix->port.port_id, io, mio, mio->status, mio->buffer_id); + + if (mio->status != SPA_STATUS_HAVE_DATA) { + io->buffer_id = mio->buffer_id; + io->status = SPA_STATUS_NEED_DATA; + mio->buffer_id = buffer_id; + mio->status = SPA_STATUS_HAVE_DATA; + break; + } + } + } + return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA; +} + static int tee_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) { struct impl *impl = object; @@ -322,6 +353,15 @@ static const struct spa_node_methods schedule_tee_node = { .process = tee_process, }; +static const struct spa_node_methods schedule_tee_node_reliable = { + SPA_VERSION_NODE_METHODS, + .add_listener = mix_add_listener, + .port_enum_params = mix_port_enum_params, + .port_set_io = port_set_io, + .port_reuse_buffer = tee_reuse_buffer, + .process = tee_process_reliable, +}; + static int schedule_mix_input(void *object) { struct impl *impl = object; @@ -456,6 +496,7 @@ int pw_impl_port_release_mix(struct pw_impl_port *port, struct pw_impl_port_mix static int check_properties(struct pw_impl_port *port) { + struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); struct pw_impl_node *node = port->node; bool is_control, is_network, is_monitor, is_device, is_duplex, is_virtual; const char *media_class, *override_device_prefix, *channel_names; @@ -480,6 +521,14 @@ static int check_properties(struct pw_impl_port *port) port->ignore_latency = pw_properties_get_bool(port->properties, PW_KEY_PORT_IGNORE_LATENCY, false); port->exclusive = pw_properties_get_bool(port->properties, PW_KEY_PORT_EXCLUSIVE, node->exclusive); + port->reliable = pw_properties_get_bool(port->properties, PW_KEY_PORT_RELIABLE, node->reliable); + + if (port->direction == PW_DIRECTION_OUTPUT) { + if (port->reliable) + impl->mix_node.iface.cb.funcs = &schedule_tee_node_reliable; + else + impl->mix_node.iface.cb.funcs = &schedule_tee_node; + } /* inherit passive state from parent node */ port->passive = pw_properties_get_bool(port->properties, PW_KEY_PORT_PASSIVE, diff --git a/src/pipewire/keys.h b/src/pipewire/keys.h index 3d119fefb..13694bc29 100644 --- a/src/pipewire/keys.h +++ b/src/pipewire/keys.h @@ -232,6 +232,8 @@ extern "C" { #define PW_KEY_NODE_PHYSICAL "node.physical" /**< ports from the node are physical */ #define PW_KEY_NODE_TERMINAL "node.terminal" /**< ports from the node are terminal */ +#define PW_KEY_NODE_RELIABLE "node.reliable" /**< node uses reliable transport 1.6.0 */ + /** Port keys */ #define PW_KEY_PORT_ID "port.id" /**< port id */ #define PW_KEY_PORT_NAME "port.name" /**< port name */ @@ -249,6 +251,7 @@ extern "C" { #define PW_KEY_PORT_IGNORE_LATENCY "port.ignore-latency" /**< latency ignored by peers, since 0.3.71 */ #define PW_KEY_PORT_GROUP "port.group" /**< the port group of the port 1.2.0 */ #define PW_KEY_PORT_EXCLUSIVE "port.exclusive" /**< link port only once 1.6.0 */ +#define PW_KEY_PORT_RELIABLE "port.reliable" /**< port uses reliable transport 1.6.0 */ /** link properties */ #define PW_KEY_LINK_ID "link.id" /**< a link id */ diff --git a/src/pipewire/private.h b/src/pipewire/private.h index ebe0c6d5d..36f85bd9d 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -785,6 +785,7 @@ struct pw_impl_node { unsigned int lazy:1; /**< the graph is lazy scheduling */ unsigned int exclusive:1; /**< ports can only be linked once */ unsigned int leaf:1; /**< node only produces/consumes data */ + unsigned int reliable:1; /**< ports need reliable tee */ uint32_t transport; /**< latest transport request */ @@ -970,6 +971,7 @@ struct pw_impl_port { unsigned int ignore_latency:1; unsigned int have_latency:1; unsigned int exclusive:1; /**< port can only be linked once */ + unsigned int reliable:1; /**< port needs reliable tee */ unsigned int have_tag_param:1; struct spa_pod *tag[2]; /**< tags */ diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 46c0822a6..4085e0885 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -2507,6 +2507,16 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) struct buffer *b; int res; + /* For reliable output streams, only give buffers when both queue AND output IO are clear */ + if (impl->direction == SPA_DIRECTION_OUTPUT && stream->node->reliable) { + struct spa_io_buffers *io = impl->io; + + if (!queue_is_empty(impl, &impl->queued) || io->status == SPA_STATUS_HAVE_DATA) { + errno = EAGAIN; + return NULL; + } + } + if ((b = queue_pop(impl, &impl->dequeued)) == NULL) { res = -errno; pw_log_trace_fp("%p: no more buffers: %m", stream);