examples: set exclusive and reliable flags

We need exclusive port use if we negotiated SyncTimeline because there
can only be one consumer of the syncobj.

We also need to enable reliable transport if synctimeline is supported
but the release flag isn't.

Add some more logging to the port when the exclusive and reliable states
changed.

Fixes #4885
This commit is contained in:
Wim Taymans 2025-10-14 11:37:24 +02:00
parent 20d2a331be
commit 9f2d873760
3 changed files with 85 additions and 38 deletions

View file

@ -59,6 +59,7 @@ struct data {
bool with_synctimeline; bool with_synctimeline;
bool with_synctimeline_release; bool with_synctimeline_release;
bool force_synctimeline_release;
}; };
static void handle_events(struct data *data) static void handle_events(struct data *data)
@ -371,8 +372,13 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_sync_timeline)), SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_sync_timeline)),
0); 0);
if (data->with_synctimeline_release) { if (data->with_synctimeline_release) {
/* drop features flags if not provided by both sides */ uint32_t flags = data->force_synctimeline_release ?
spa_pod_builder_prop(&b, SPA_PARAM_META_features, SPA_POD_PROP_FLAG_DROP); /* both sides need compatible features */
SPA_POD_PROP_FLAG_MANDATORY :
/* drop features flags if not provided by both sides */
SPA_POD_PROP_FLAG_DROP;
spa_pod_builder_prop(&b, SPA_PARAM_META_features, flags);
spa_pod_builder_int(&b, SPA_META_FEATURE_SYNC_TIMELINE_RELEASE); spa_pod_builder_int(&b, SPA_META_FEATURE_SYNC_TIMELINE_RELEASE);
} }
params[n_params++] = spa_pod_builder_pop(&b, &f); params[n_params++] = spa_pod_builder_pop(&b, &f);
@ -397,10 +403,6 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
pw_stream_update_params(stream, params, n_params); pw_stream_update_params(stream, params, n_params);
} }
static void on_stream_add_buffer(void *_data, struct pw_buffer *buffer)
{
}
/* these are the stream events we listen for */ /* these are the stream events we listen for */
static const struct pw_stream_events stream_events = { static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS, PW_VERSION_STREAM_EVENTS,
@ -408,7 +410,6 @@ static const struct pw_stream_events stream_events = {
.io_changed = on_stream_io_changed, .io_changed = on_stream_io_changed,
.param_changed = on_stream_param_changed, .param_changed = on_stream_param_changed,
.process = on_process, .process = on_process,
.add_buffer = on_stream_add_buffer,
}; };
static int build_format(struct data *data, struct spa_pod_builder *b, const struct spa_pod **params) static int build_format(struct data *data, struct spa_pod_builder *b, const struct spa_pod **params)
@ -453,6 +454,7 @@ static void show_help(struct data *data, const char *name, bool is_error)
" -r, --remote Remote daemon name\n" " -r, --remote Remote daemon name\n"
" -S, --sync Enable SyncTimeline\n" " -S, --sync Enable SyncTimeline\n"
" -R, --release Enable RELEASE feature\n" " -R, --release Enable RELEASE feature\n"
" -F, --force-release RELEASE feature needs to be present\n"
"\n", name); "\n", name);
} }
@ -465,11 +467,12 @@ int main(int argc, char *argv[])
struct pw_properties *props; struct pw_properties *props;
int res, n_params; int res, n_params;
static const struct option long_options[] = { static const struct option long_options[] = {
{ "help", no_argument, NULL, 'h' }, { "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'V' }, { "version", no_argument, NULL, 'V' },
{ "remote", required_argument, NULL, 'r' }, { "remote", required_argument, NULL, 'r' },
{ "sync", no_argument, NULL, 'S' }, { "sync", no_argument, NULL, 'S' },
{ "release", no_argument, NULL, 'R' }, { "release", no_argument, NULL, 'R' },
{ "force-release", no_argument, NULL, 'F' },
{ NULL, 0, NULL, 0} { NULL, 0, NULL, 0}
}; };
char *opt_remote = NULL; char *opt_remote = NULL;
@ -477,7 +480,7 @@ int main(int argc, char *argv[])
pw_init(&argc, &argv); pw_init(&argc, &argv);
while ((c = getopt_long(argc, argv, "hVr:SR", long_options, NULL)) != -1) { while ((c = getopt_long(argc, argv, "hVr:SRF", long_options, NULL)) != -1) {
switch (c) { switch (c) {
case 'h': case 'h':
show_help(&data, argv[0], false); show_help(&data, argv[0], false);
@ -493,11 +496,14 @@ int main(int argc, char *argv[])
case 'r': case 'r':
opt_remote = optarg; opt_remote = optarg;
break; break;
case 'S': case 'F':
data.with_synctimeline = true; data.force_synctimeline_release = true;
break; SPA_FALLTHROUGH;
case 'R': case 'R':
data.with_synctimeline_release = true; data.with_synctimeline_release = true;
SPA_FALLTHROUGH;
case 'S':
data.with_synctimeline = true;
break; break;
default: default:
show_help(&data, argv[0], true); show_help(&data, argv[0], true);

View file

@ -51,11 +51,13 @@ struct data {
int counter; int counter;
uint32_t seq; uint32_t seq;
uint32_t n_buffers;
int res; int res;
bool with_synctimeline; bool with_synctimeline;
bool with_synctimeline_release; bool with_synctimeline_release;
bool force_synctimeline_release;
}; };
static void on_process(void *userdata) static void on_process(void *userdata)
@ -227,8 +229,13 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_sync_timeline)), SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_sync_timeline)),
0); 0);
if (data->with_synctimeline_release) { if (data->with_synctimeline_release) {
/* drop features flags if not provided by both sides */ uint32_t flags = data->force_synctimeline_release ?
spa_pod_builder_prop(&b, SPA_PARAM_META_features, SPA_POD_PROP_FLAG_DROP); /* both sides need compatible features */
SPA_POD_PROP_FLAG_MANDATORY :
/* drop features flags if not provided by both sides */
SPA_POD_PROP_FLAG_DROP;
spa_pod_builder_prop(&b, SPA_PARAM_META_features, flags);
spa_pod_builder_int(&b, SPA_META_FEATURE_SYNC_TIMELINE_RELEASE); spa_pod_builder_int(&b, SPA_META_FEATURE_SYNC_TIMELINE_RELEASE);
} }
params[n_params++] = spa_pod_builder_pop(&b, &f); params[n_params++] = spa_pod_builder_pop(&b, &f);
@ -330,15 +337,38 @@ static void on_stream_add_buffer(void *_data, struct pw_buffer *buffer)
return; return;
} }
} }
if (spa_buffer_has_meta_features(buf, SPA_META_SyncTimeline,
SPA_META_FEATURE_SYNC_TIMELINE_RELEASE)) { if (data->n_buffers++ == 0) {
pw_log_debug("got sync timeline release"); struct spa_dict_item items[2];
uint32_t n_items = 0;
bool reliable = false, exclusive = false;
if (s != NULL) {
/* sync timeline is always exclusive */
exclusive = true;
if (spa_buffer_has_meta_features(buf, SPA_META_SyncTimeline,
SPA_META_FEATURE_SYNC_TIMELINE_RELEASE)) {
pw_log_info("got sync timeline with release");
} else {
pw_log_info("got sync timeline");
/* we need reliable transport without release */
reliable = true;
}
}
else {
pw_log_info("did not get sync timeline");
}
items[n_items++] = SPA_DICT_ITEM(PW_KEY_NODE_EXCLUSIVE, exclusive ? "true" : "false");
items[n_items++] = SPA_DICT_ITEM(PW_KEY_NODE_RELIABLE, reliable ? "true" : "false");
pw_stream_update_properties(data->stream, &SPA_DICT(items, n_items));
} }
} }
/* close the memfd we set on the buffers here */ /* close the memfd we set on the buffers here */
static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer) static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer)
{ {
struct data *data = _data;
struct spa_buffer *buf = buffer->buffer; struct spa_buffer *buf = buffer->buffer;
struct spa_data *d; struct spa_data *d;
@ -351,6 +381,7 @@ static void on_stream_remove_buffer(void *_data, struct pw_buffer *buffer)
close(d[1].fd); close(d[1].fd);
close(d[2].fd); close(d[2].fd);
} }
data->n_buffers--;
} }
@ -382,6 +413,7 @@ static void show_help(struct data *data, const char *name, bool is_error)
" -r, --remote Remote daemon name\n" " -r, --remote Remote daemon name\n"
" -S, --sync Enable SyncTimeline\n" " -S, --sync Enable SyncTimeline\n"
" -R, --release Enable RELEASE feature\n" " -R, --release Enable RELEASE feature\n"
" -F, --force-release RELEASE feature needs to be present\n"
"\n", name); "\n", name);
} }
@ -393,11 +425,12 @@ int main(int argc, char *argv[])
uint8_t buffer[1024]; uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
static const struct option long_options[] = { static const struct option long_options[] = {
{ "help", no_argument, NULL, 'h' }, { "help", no_argument, NULL, 'h' },
{ "version", no_argument, NULL, 'V' }, { "version", no_argument, NULL, 'V' },
{ "remote", required_argument, NULL, 'r' }, { "remote", required_argument, NULL, 'r' },
{ "sync", no_argument, NULL, 'S' }, { "sync", no_argument, NULL, 'S' },
{ "release", no_argument, NULL, 'R' }, { "release", no_argument, NULL, 'R' },
{ "force-release", no_argument, NULL, 'F' },
{ NULL, 0, NULL, 0} { NULL, 0, NULL, 0}
}; };
char *opt_remote = NULL; char *opt_remote = NULL;
@ -405,7 +438,7 @@ int main(int argc, char *argv[])
pw_init(&argc, &argv); pw_init(&argc, &argv);
while ((c = getopt_long(argc, argv, "hVr:SR", long_options, NULL)) != -1) { while ((c = getopt_long(argc, argv, "hVr:SRF", long_options, NULL)) != -1) {
switch (c) { switch (c) {
case 'h': case 'h':
show_help(&data, argv[0], false); show_help(&data, argv[0], false);
@ -421,11 +454,14 @@ int main(int argc, char *argv[])
case 'r': case 'r':
opt_remote = optarg; opt_remote = optarg;
break; break;
case 'S': case 'F':
data.with_synctimeline = true; data.force_synctimeline_release = true;
break; SPA_FALLTHROUGH;
case 'R': case 'R':
data.with_synctimeline_release = true; data.with_synctimeline_release = true;
SPA_FALLTHROUGH;
case 'S':
data.with_synctimeline = true;
break; break;
default: default:
show_help(&data, argv[0], true); show_help(&data, argv[0], true);

View file

@ -498,7 +498,7 @@ static int check_properties(struct pw_impl_port *port)
{ {
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
struct pw_impl_node *node = port->node; struct pw_impl_node *node = port->node;
bool is_control, is_network, is_monitor, is_device, is_duplex, is_virtual; bool is_control, is_network, is_monitor, is_device, is_duplex, is_virtual, new_bool;
const char *media_class, *override_device_prefix, *channel_names; const char *media_class, *override_device_prefix, *channel_names;
const char *str, *prefix, *path, *desc, *nick, *name; const char *str, *prefix, *path, *desc, *nick, *name;
const struct pw_properties *nprops; const struct pw_properties *nprops;
@ -520,14 +520,19 @@ static int check_properties(struct pw_impl_port *port)
} }
port->ignore_latency = pw_properties_get_bool(port->properties, PW_KEY_PORT_IGNORE_LATENCY, false); port->ignore_latency = pw_properties_get_bool(port->properties, PW_KEY_PORT_IGNORE_LATENCY, false);
port->exclusive = pw_properties_get_bool(port->properties, PW_KEY_PORT_EXCLUSIVE, node->exclusive); new_bool = pw_properties_get_bool(port->properties, PW_KEY_PORT_EXCLUSIVE, node->exclusive);
port->reliable = pw_properties_get_bool(port->properties, PW_KEY_PORT_RELIABLE, node->reliable); if (new_bool != port->exclusive) {
pw_log_info("%p: exclusive %d->%d", port, port->exclusive, new_bool);
port->exclusive = new_bool;
}
if (port->direction == PW_DIRECTION_OUTPUT) { new_bool = pw_properties_get_bool(port->properties, PW_KEY_PORT_RELIABLE, node->reliable);
if (port->reliable) if (new_bool != port->reliable && port->direction == PW_DIRECTION_OUTPUT) {
impl->mix_node.iface.cb.funcs = &schedule_tee_node_reliable; pw_log_info("%p: reliable %d->%d", port, port->reliable, new_bool);
else port->reliable = new_bool;
impl->mix_node.iface.cb.funcs = &schedule_tee_node; impl->mix_node.iface.cb.funcs = new_bool ?
&schedule_tee_node_reliable :
&schedule_tee_node;
} }
/* inherit passive state from parent node */ /* inherit passive state from parent node */