From 9f2d8737608518dc2a330ec2a11cd0023a30dc09 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 14 Oct 2025 11:37:24 +0200 Subject: [PATCH] examples: set exclusive and reliable flags We need exclusive port use if we negotiated SyncTimeline because there can only be one consumer of the syncobj. We also need to enable reliable transport if synctimeline is supported but the release flag isn't. Add some more logging to the port when the exclusive and reliable states changed. Fixes #4885 --- src/examples/video-play-sync.c | 38 +++++++++++--------- src/examples/video-src-sync.c | 64 ++++++++++++++++++++++++++-------- src/pipewire/impl-port.c | 21 ++++++----- 3 files changed, 85 insertions(+), 38 deletions(-) diff --git a/src/examples/video-play-sync.c b/src/examples/video-play-sync.c index 44b9b2cd6..3ef4ed619 100644 --- a/src/examples/video-play-sync.c +++ b/src/examples/video-play-sync.c @@ -59,6 +59,7 @@ struct data { bool with_synctimeline; bool with_synctimeline_release; + bool force_synctimeline_release; }; static void handle_events(struct data *data) @@ -371,8 +372,13 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_sync_timeline)), 0); if (data->with_synctimeline_release) { - /* drop features flags if not provided by both sides */ - spa_pod_builder_prop(&b, SPA_PARAM_META_features, SPA_POD_PROP_FLAG_DROP); + uint32_t flags = data->force_synctimeline_release ? + /* both sides need compatible features */ + SPA_POD_PROP_FLAG_MANDATORY : + /* drop features flags if not provided by both sides */ + SPA_POD_PROP_FLAG_DROP; + + spa_pod_builder_prop(&b, SPA_PARAM_META_features, flags); spa_pod_builder_int(&b, SPA_META_FEATURE_SYNC_TIMELINE_RELEASE); } params[n_params++] = spa_pod_builder_pop(&b, &f); @@ -397,10 +403,6 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) pw_stream_update_params(stream, params, n_params); } -static void on_stream_add_buffer(void *_data, struct pw_buffer *buffer) -{ -} - /* these are the stream events we listen for */ static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, @@ -408,7 +410,6 @@ static const struct pw_stream_events stream_events = { .io_changed = on_stream_io_changed, .param_changed = on_stream_param_changed, .process = on_process, - .add_buffer = on_stream_add_buffer, }; static int build_format(struct data *data, struct spa_pod_builder *b, const struct spa_pod **params) @@ -453,6 +454,7 @@ static void show_help(struct data *data, const char *name, bool is_error) " -r, --remote Remote daemon name\n" " -S, --sync Enable SyncTimeline\n" " -R, --release Enable RELEASE feature\n" + " -F, --force-release RELEASE feature needs to be present\n" "\n", name); } @@ -465,11 +467,12 @@ int main(int argc, char *argv[]) struct pw_properties *props; int res, n_params; static const struct option long_options[] = { - { "help", no_argument, NULL, 'h' }, - { "version", no_argument, NULL, 'V' }, - { "remote", required_argument, NULL, 'r' }, - { "sync", no_argument, NULL, 'S' }, - { "release", no_argument, NULL, 'R' }, + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'V' }, + { "remote", required_argument, NULL, 'r' }, + { "sync", no_argument, NULL, 'S' }, + { "release", no_argument, NULL, 'R' }, + { "force-release", no_argument, NULL, 'F' }, { NULL, 0, NULL, 0} }; char *opt_remote = NULL; @@ -477,7 +480,7 @@ int main(int argc, char *argv[]) pw_init(&argc, &argv); - while ((c = getopt_long(argc, argv, "hVr:SR", long_options, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "hVr:SRF", long_options, NULL)) != -1) { switch (c) { case 'h': show_help(&data, argv[0], false); @@ -493,11 +496,14 @@ int main(int argc, char *argv[]) case 'r': opt_remote = optarg; break; - case 'S': - data.with_synctimeline = true; - break; + case 'F': + data.force_synctimeline_release = true; + SPA_FALLTHROUGH; case 'R': data.with_synctimeline_release = true; + SPA_FALLTHROUGH; + case 'S': + data.with_synctimeline = true; break; default: show_help(&data, argv[0], true); diff --git a/src/examples/video-src-sync.c b/src/examples/video-src-sync.c index 890b2d8d9..873a27407 100644 --- a/src/examples/video-src-sync.c +++ b/src/examples/video-src-sync.c @@ -51,11 +51,13 @@ struct data { int counter; uint32_t seq; + uint32_t n_buffers; int res; bool with_synctimeline; bool with_synctimeline_release; + bool force_synctimeline_release; }; static void on_process(void *userdata) @@ -227,8 +229,13 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_sync_timeline)), 0); if (data->with_synctimeline_release) { - /* drop features flags if not provided by both sides */ - spa_pod_builder_prop(&b, SPA_PARAM_META_features, SPA_POD_PROP_FLAG_DROP); + uint32_t flags = data->force_synctimeline_release ? + /* both sides need compatible features */ + SPA_POD_PROP_FLAG_MANDATORY : + /* drop features flags if not provided by both sides */ + SPA_POD_PROP_FLAG_DROP; + + spa_pod_builder_prop(&b, SPA_PARAM_META_features, flags); spa_pod_builder_int(&b, SPA_META_FEATURE_SYNC_TIMELINE_RELEASE); } params[n_params++] = spa_pod_builder_pop(&b, &f); @@ -330,15 +337,38 @@ static void on_stream_add_buffer(void *_data, struct pw_buffer *buffer) return; } } - if (spa_buffer_has_meta_features(buf, SPA_META_SyncTimeline, - SPA_META_FEATURE_SYNC_TIMELINE_RELEASE)) { - pw_log_debug("got sync timeline release"); + + if (data->n_buffers++ == 0) { + struct spa_dict_item items[2]; + uint32_t n_items = 0; + bool reliable = false, exclusive = false; + + if (s != NULL) { + /* sync timeline is always exclusive */ + exclusive = true; + if (spa_buffer_has_meta_features(buf, SPA_META_SyncTimeline, + SPA_META_FEATURE_SYNC_TIMELINE_RELEASE)) { + pw_log_info("got sync timeline with release"); + } else { + pw_log_info("got sync timeline"); + /* we need reliable transport without release */ + reliable = true; + } + } + else { + pw_log_info("did not get sync timeline"); + } + items[n_items++] = SPA_DICT_ITEM(PW_KEY_NODE_EXCLUSIVE, exclusive ? "true" : "false"); + items[n_items++] = SPA_DICT_ITEM(PW_KEY_NODE_RELIABLE, reliable ? "true" : "false"); + + pw_stream_update_properties(data->stream, &SPA_DICT(items, n_items)); } } /* close the memfd we set on the buffers here */ static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer) { + struct data *data = _data; struct spa_buffer *buf = buffer->buffer; struct spa_data *d; @@ -351,6 +381,7 @@ static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer) close(d[1].fd); close(d[2].fd); } + data->n_buffers--; } @@ -382,6 +413,7 @@ static void show_help(struct data *data, const char *name, bool is_error) " -r, --remote Remote daemon name\n" " -S, --sync Enable SyncTimeline\n" " -R, --release Enable RELEASE feature\n" + " -F, --force-release RELEASE feature needs to be present\n" "\n", name); } @@ -393,11 +425,12 @@ int main(int argc, char *argv[]) uint8_t buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); static const struct option long_options[] = { - { "help", no_argument, NULL, 'h' }, - { "version", no_argument, NULL, 'V' }, - { "remote", required_argument, NULL, 'r' }, - { "sync", no_argument, NULL, 'S' }, - { "release", no_argument, NULL, 'R' }, + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'V' }, + { "remote", required_argument, NULL, 'r' }, + { "sync", no_argument, NULL, 'S' }, + { "release", no_argument, NULL, 'R' }, + { "force-release", no_argument, NULL, 'F' }, { NULL, 0, NULL, 0} }; char *opt_remote = NULL; @@ -405,7 +438,7 @@ int main(int argc, char *argv[]) pw_init(&argc, &argv); - while ((c = getopt_long(argc, argv, "hVr:SR", long_options, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "hVr:SRF", long_options, NULL)) != -1) { switch (c) { case 'h': show_help(&data, argv[0], false); @@ -421,11 +454,14 @@ int main(int argc, char *argv[]) case 'r': opt_remote = optarg; break; - case 'S': - data.with_synctimeline = true; - break; + case 'F': + data.force_synctimeline_release = true; + SPA_FALLTHROUGH; case 'R': data.with_synctimeline_release = true; + SPA_FALLTHROUGH; + case 'S': + data.with_synctimeline = true; break; default: show_help(&data, argv[0], true); diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index b6b06d87c..27c771446 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -498,7 +498,7 @@ static int check_properties(struct pw_impl_port *port) { struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); struct pw_impl_node *node = port->node; - bool is_control, is_network, is_monitor, is_device, is_duplex, is_virtual; + bool is_control, is_network, is_monitor, is_device, is_duplex, is_virtual, new_bool; const char *media_class, *override_device_prefix, *channel_names; const char *str, *prefix, *path, *desc, *nick, *name; const struct pw_properties *nprops; @@ -520,14 +520,19 @@ static int check_properties(struct pw_impl_port *port) } port->ignore_latency = pw_properties_get_bool(port->properties, PW_KEY_PORT_IGNORE_LATENCY, false); - port->exclusive = pw_properties_get_bool(port->properties, PW_KEY_PORT_EXCLUSIVE, node->exclusive); - port->reliable = pw_properties_get_bool(port->properties, PW_KEY_PORT_RELIABLE, node->reliable); + new_bool = pw_properties_get_bool(port->properties, PW_KEY_PORT_EXCLUSIVE, node->exclusive); + if (new_bool != port->exclusive) { + pw_log_info("%p: exclusive %d->%d", port, port->exclusive, new_bool); + port->exclusive = new_bool; + } - if (port->direction == PW_DIRECTION_OUTPUT) { - if (port->reliable) - impl->mix_node.iface.cb.funcs = &schedule_tee_node_reliable; - else - impl->mix_node.iface.cb.funcs = &schedule_tee_node; + new_bool = pw_properties_get_bool(port->properties, PW_KEY_PORT_RELIABLE, node->reliable); + if (new_bool != port->reliable && port->direction == PW_DIRECTION_OUTPUT) { + pw_log_info("%p: reliable %d->%d", port, port->reliable, new_bool); + port->reliable = new_bool; + impl->mix_node.iface.cb.funcs = new_bool ? + &schedule_tee_node_reliable : + &schedule_tee_node; } /* inherit passive state from parent node */