combine-stream: add support for "on-demand" streams

Instead of just following static target match rules to create output streams,
this feature allows the user to dynamically create more output streams
with custom targets using metadata.
This commit is contained in:
George Kiagiadakis 2023-10-13 21:35:35 +03:00
parent 69d0f586b2
commit 8735d07c0a

View file

@ -46,6 +46,7 @@
* - `node.description`: a human readable name for the stream
* - `combine.mode` = capture | playback | sink | source, default sink
* - `combine.latency-compensate`: use delay buffers to match stream latencies
* - `combine.on-demand-streams`: use metadata to create streams on demand
* - `combine.props = {}`: properties to be passed to the sink/source
* - `stream.props = {}`: properties to be passed to the streams
* - `stream.rules = {}`: rules for matching streams, use create-stream actions
@ -250,6 +251,10 @@ struct impl {
struct pw_registry *registry;
struct spa_hook registry_listener;
struct pw_metadata *metadata;
struct spa_hook metadata_listener;
uint32_t metadata_id;
struct spa_source *update_delay_event;
struct pw_properties *combine_props;
@ -268,6 +273,7 @@ struct impl {
unsigned int do_disconnect:1;
unsigned int latency_compensate:1;
unsigned int on_demand_streams:1;
struct spa_list streams;
uint32_t n_streams;
@ -281,6 +287,7 @@ struct ringbuffer {
struct stream {
uint32_t id;
char *on_demand_id;
struct impl *impl;
@ -402,6 +409,15 @@ static struct stream *find_stream(struct impl *impl, uint32_t id)
return NULL;
}
static struct stream *find_on_demand_stream(struct impl *impl, const char *on_demand_id)
{
struct stream *s;
spa_list_for_each(s, &impl->streams, link)
if (spa_streq(s->on_demand_id, on_demand_id))
return s;
return NULL;
}
static enum pw_direction get_combine_direction(struct impl *impl)
{
if (impl->mode == MODE_SINK || impl->mode == MODE_CAPTURE)
@ -624,6 +640,7 @@ static void remove_stream(struct stream *s, bool destroy)
pw_stream_destroy(s->stream);
}
free(s->on_demand_id);
free(s->delaybuf);
free(s);
}
@ -633,6 +650,14 @@ static void destroy_stream(struct stream *s)
remove_stream(s, true);
}
static void destroy_all_on_demand_streams(struct impl *impl)
{
struct stream *s, *tmp;
spa_list_for_each_safe(s, tmp, &impl->streams, link)
if (s->on_demand_id)
destroy_stream(s);
}
static void stream_destroy(void *d)
{
struct stream *s = d;
@ -721,6 +746,7 @@ static const struct pw_stream_events stream_events = {
struct stream_info {
struct impl *impl;
uint32_t id;
const char *on_demand_id;
const struct spa_dict *props;
struct pw_properties *stream_props;
};
@ -739,13 +765,18 @@ static int create_stream(struct stream_info *info)
enum pw_stream_flags flags;
enum pw_direction direction;
node_name = spa_dict_lookup(info->props, "node.name");
if (node_name == NULL)
node_name = spa_dict_lookup(info->props, "object.serial");
if (node_name == NULL)
return -EIO;
if (info->on_demand_id) {
node_name = info->on_demand_id;
pw_log_info("create on demand stream: %s", node_name);
} else {
node_name = spa_dict_lookup(info->props, PW_KEY_NODE_NAME);
if (node_name == NULL)
node_name = spa_dict_lookup(info->props, PW_KEY_OBJECT_SERIAL);
if (node_name == NULL)
return -EIO;
pw_log_info("create stream for %d %s", info->id, node_name);
pw_log_info("create stream for %d %s", info->id, node_name);
}
s = calloc(1, sizeof(*s));
if (s == NULL)
@ -798,8 +829,14 @@ static int create_stream(struct stream_info *info)
if (pw_properties_get(info->stream_props, PW_KEY_NODE_NAME) == NULL)
pw_properties_setf(info->stream_props, PW_KEY_NODE_NAME,
"output.%s_%s", str, node_name);
if (pw_properties_get(info->stream_props, PW_KEY_TARGET_OBJECT) == NULL)
pw_properties_set(info->stream_props, PW_KEY_TARGET_OBJECT, node_name);
if (info->on_demand_id) {
s->on_demand_id = strdup(info->on_demand_id);
pw_properties_set(info->stream_props, "combine.on-demand-id", s->on_demand_id);
} else {
if (pw_properties_get(info->stream_props, PW_KEY_TARGET_OBJECT) == NULL)
pw_properties_set(info->stream_props, PW_KEY_TARGET_OBJECT, node_name);
}
s->stream = pw_stream_new(impl->core, "Combine stream", info->stream_props);
info->stream_props = NULL;
@ -866,6 +903,62 @@ static int rule_matched(void *data, const char *location, const char *action,
return res;
}
static int metadata_property(void *data, uint32_t id,
const char *key, const char *type, const char *value)
{
struct impl *impl = data;
const char *on_demand_id;
struct stream *s;
if (id != impl->combine_id)
return 0;
if (!key) {
destroy_all_on_demand_streams(impl);
goto out;
}
if (!spa_strstartswith(key, "combine.on-demand-stream."))
return 0;
on_demand_id = key + strlen("combine.on-demand-stream.");
if (*on_demand_id == '\0')
return 0;
if (value) {
struct stream_info info;
s = find_on_demand_stream(impl, on_demand_id);
if (s)
destroy_stream(s);
spa_zero(info);
info.impl = impl;
info.id = SPA_ID_INVALID;
info.on_demand_id = on_demand_id;
info.stream_props = pw_properties_copy(impl->stream_props);
pw_properties_update_string(info.stream_props, value, strlen(value));
create_stream(&info);
pw_properties_free(info.stream_props);
} else {
s = find_on_demand_stream(impl, on_demand_id);
if (s)
destroy_stream(s);
}
out:
update_delay(impl);
return 0;
}
static const struct pw_metadata_events metadata_events = {
PW_VERSION_METADATA_EVENTS,
.property = metadata_property
};
static void registry_event_global(void *data, uint32_t id,
uint32_t permissions, const char *type, uint32_t version,
const struct spa_dict *props)
@ -874,6 +967,22 @@ static void registry_event_global(void *data, uint32_t id,
const char *str;
struct stream_info info;
if (impl->on_demand_streams && spa_streq(type, PW_TYPE_INTERFACE_Metadata)) {
if (!props)
return;
if (!spa_streq(spa_dict_lookup(props, "metadata.name"), "default"))
return;
impl->metadata = pw_registry_bind(impl->registry,
id, type, PW_VERSION_METADATA, 0);
pw_metadata_add_listener(impl->metadata,
&impl->metadata_listener,
&metadata_events, impl);
impl->metadata_id = id;
return;
}
if (!spa_streq(type, PW_TYPE_INTERFACE_Node) || props == NULL)
return;
@ -902,6 +1011,15 @@ static void registry_event_global_remove(void *data, uint32_t id)
struct impl *impl = data;
struct stream *s;
if (impl->metadata && id == impl->metadata_id) {
destroy_all_on_demand_streams(impl);
update_delay(impl);
spa_hook_remove(&impl->metadata_listener);
pw_proxy_destroy((struct pw_proxy*)impl->metadata);
impl->metadata = NULL;
return;
}
s = find_stream(impl, id);
if (s == NULL)
return;
@ -1235,6 +1353,11 @@ static void core_removed(void *d)
pw_proxy_destroy((struct pw_proxy*)impl->registry);
impl->registry = NULL;
}
if (impl->metadata) {
spa_hook_remove(&impl->metadata_listener);
pw_proxy_destroy((struct pw_proxy*)impl->metadata);
impl->metadata = NULL;
}
pw_impl_module_schedule_destroy(impl->module);
}
@ -1256,6 +1379,11 @@ static void impl_destroy(struct impl *impl)
if (impl->update_delay_event)
pw_loop_destroy_source(impl->main_loop, impl->update_delay_event);
if (impl->metadata) {
spa_hook_remove(&impl->metadata_listener);
pw_proxy_destroy((struct pw_proxy*)impl->metadata);
impl->metadata = NULL;
}
if (impl->registry) {
spa_hook_remove(&impl->registry_listener);
pw_proxy_destroy((struct pw_proxy*)impl->registry);
@ -1354,6 +1482,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "combine.latency-compensate")) != NULL)
impl->latency_compensate = spa_atob(str);
if ((str = pw_properties_get(props, "combine.on-demand-streams")) != NULL)
impl->on_demand_streams = spa_atob(str);
impl->combine_props = pw_properties_new(NULL, NULL);
impl->stream_props = pw_properties_new(NULL, NULL);