From 41dcac0ecdc3e03ada8787a9dad4b0421c618687 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 24 Aug 2023 16:41:21 +0200 Subject: [PATCH] Port: Add tag param The tag param has a list of arbitrary key/value pairs. Like the Latency param, it travels up and downstream. Mixers will append the info dictionaries or do some more fancy merging. The purpose is to transport arbirary metadata, out-of-band, through the graph and it's used for stream metadata and other stream properties. --- spa/include/spa/param/param-types.h | 1 + spa/include/spa/param/param.h | 1 + spa/include/spa/param/tag-types.h | 39 +++++++ spa/include/spa/param/tag-utils.h | 143 ++++++++++++++++++++++++ spa/include/spa/param/tag.h | 46 ++++++++ spa/include/spa/param/type-info.h | 1 + spa/include/spa/utils/type-info.h | 1 + spa/include/spa/utils/type.h | 1 + spa/plugins/alsa/alsa-pcm-sink.c | 9 +- spa/plugins/audioconvert/audioadapter.c | 58 +++++++++- spa/plugins/audioconvert/audioconvert.c | 88 +++++++++++++-- src/examples/video-play.c | 18 ++- src/examples/video-src.c | 20 +++- src/pipewire/filter.c | 6 +- src/pipewire/impl-link.c | 22 ++++ src/pipewire/impl-port.c | 118 ++++++++++++++++++- src/pipewire/impl-port.h | 4 +- src/pipewire/private.h | 5 + src/pipewire/stream.c | 6 +- test/test-spa-utils.c | 3 +- 20 files changed, 566 insertions(+), 24 deletions(-) create mode 100644 spa/include/spa/param/tag-types.h create mode 100644 spa/include/spa/param/tag-utils.h create mode 100644 spa/include/spa/param/tag.h diff --git a/spa/include/spa/param/param-types.h b/spa/include/spa/param/param-types.h index a278b2eb0..4bed3651d 100644 --- a/spa/include/spa/param/param-types.h +++ b/spa/include/spa/param/param-types.h @@ -40,6 +40,7 @@ static const struct spa_type_info spa_type_param[] = { { SPA_PARAM_Control, SPA_TYPE_Sequence, SPA_TYPE_INFO_PARAM_ID_BASE "Control", NULL }, { SPA_PARAM_Latency, SPA_TYPE_OBJECT_ParamLatency, SPA_TYPE_INFO_PARAM_ID_BASE "Latency", NULL }, { SPA_PARAM_ProcessLatency, SPA_TYPE_OBJECT_ParamProcessLatency, SPA_TYPE_INFO_PARAM_ID_BASE "ProcessLatency", NULL }, + { SPA_PARAM_Tag, SPA_TYPE_OBJECT_ParamTag, SPA_TYPE_INFO_PARAM_ID_BASE "Tag", NULL }, { 0, 0, NULL, NULL }, }; diff --git a/spa/include/spa/param/param.h b/spa/include/spa/param/param.h index 0706411ef..51c442c35 100644 --- a/spa/include/spa/param/param.h +++ b/spa/include/spa/param/param.h @@ -39,6 +39,7 @@ enum spa_param_type { SPA_PARAM_Control, /**< Control parameter, a SPA_TYPE_Sequence */ SPA_PARAM_Latency, /**< latency reporting, a SPA_TYPE_OBJECT_ParamLatency */ SPA_PARAM_ProcessLatency, /**< processing latency, a SPA_TYPE_OBJECT_ParamProcessLatency */ + SPA_PARAM_Tag, /**< tag reporting, a SPA_TYPE_OBJECT_ParamTag. Since 0.3.79 */ }; /** information about a parameter */ diff --git a/spa/include/spa/param/tag-types.h b/spa/include/spa/param/tag-types.h new file mode 100644 index 000000000..573fb4aff --- /dev/null +++ b/spa/include/spa/param/tag-types.h @@ -0,0 +1,39 @@ +/* Simple Plugin API */ +/* SPDX-FileCopyrightText: Copyright © 2018 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_PARAM_TAG_TYPES_H +#define SPA_PARAM_TAG_TYPES_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * \addtogroup spa_param + * \{ + */ + +#include +#include +#include + +#define SPA_TYPE_INFO_PARAM_Tag SPA_TYPE_INFO_PARAM_BASE "Tag" +#define SPA_TYPE_INFO_PARAM_TAG_BASE SPA_TYPE_INFO_PARAM_Tag ":" + +static const struct spa_type_info spa_type_param_tag[] = { + { SPA_PARAM_TAG_START, SPA_TYPE_Id, SPA_TYPE_INFO_PARAM_TAG_BASE, spa_type_param, }, + { SPA_PARAM_TAG_direction, SPA_TYPE_Id, SPA_TYPE_INFO_PARAM_TAG_BASE "direction", spa_type_direction, }, + { SPA_PARAM_TAG_info, SPA_TYPE_Struct, SPA_TYPE_INFO_PARAM_TAG_BASE "info", NULL, }, + { 0, 0, NULL, NULL }, +}; + +/** + * \} + */ + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* SPA_PARAM_TAG_TYPES_H */ diff --git a/spa/include/spa/param/tag-utils.h b/spa/include/spa/param/tag-utils.h new file mode 100644 index 000000000..2bce7b199 --- /dev/null +++ b/spa/include/spa/param/tag-utils.h @@ -0,0 +1,143 @@ +/* Simple Plugin API */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_PARAM_TAG_UTILS_H +#define SPA_PARAM_TAG_UTILS_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * \addtogroup spa_param + * \{ + */ + +#include + +#include +#include +#include +#include + +static inline int +spa_tag_compare(const struct spa_pod *a, const struct spa_pod *b) +{ + return ((a == b) || (a && b && SPA_POD_SIZE(a) == SPA_POD_SIZE(b) && + memcmp(a, b, SPA_POD_SIZE(b)) == 0)) ? 0 : 1; +} + +static inline int +spa_tag_parse(const struct spa_pod *tag, struct spa_tag_info *info, void **state) +{ + int res; + const struct spa_pod_object *obj = (const struct spa_pod_object*)tag; + const struct spa_pod_prop *first, *start, *cur; + + spa_zero(*info); + + if ((res = spa_pod_parse_object(tag, + SPA_TYPE_OBJECT_ParamTag, NULL, + SPA_PARAM_TAG_direction, SPA_POD_Id(&info->direction))) < 0) + return res; + + first = spa_pod_prop_first(&obj->body); + start = *state ? spa_pod_prop_next((struct spa_pod_prop*)*state) : first; + + res = 0; + for (cur = start; spa_pod_prop_is_inside(&obj->body, obj->pod.size, cur); + cur = spa_pod_prop_next(cur)) { + if (cur->key == SPA_PARAM_TAG_info) { + info->info = &cur->value; + *state = (void*)cur; + return 1; + } + } + return 0; +} + +static inline int +spa_tag_info_parse(const struct spa_tag_info *info, struct spa_dict *dict, struct spa_dict_item *items) +{ + struct spa_pod_parser prs; + uint32_t n, n_items; + const char *key, *value; + struct spa_pod_frame f[1]; + + spa_pod_parser_pod(&prs, info->info); + if (spa_pod_parser_push_struct(&prs, &f[0]) < 0 || + spa_pod_parser_get_int(&prs, (int32_t*)&n_items) < 0) + return -EINVAL; + + if (items == NULL) { + dict->n_items = n_items; + return 0; + } + n_items = SPA_MIN(dict->n_items, n_items); + + for (n = 0; n < n_items; n++) { + if (spa_pod_parser_get(&prs, + SPA_POD_String(&key), + SPA_POD_String(&value), + NULL) < 0) + break; + items[n].key = key; + items[n].value = value; + } + dict->items = items; + spa_pod_parser_pop(&prs, &f[0]); + return 0; +} + +static inline void +spa_tag_build_start(struct spa_pod_builder *builder, struct spa_pod_frame *f, + uint32_t id, enum spa_direction direction) +{ + spa_pod_builder_push_object(builder, f, SPA_TYPE_OBJECT_ParamTag, id); + spa_pod_builder_add(builder, + SPA_PARAM_TAG_direction, SPA_POD_Id(direction), + 0); +} + +static inline void +spa_tag_build_add_info(struct spa_pod_builder *builder, const struct spa_pod *info) +{ + spa_pod_builder_add(builder, + SPA_PARAM_TAG_info, SPA_POD_Pod(info), + 0); +} + +static inline void +spa_tag_build_add_dict(struct spa_pod_builder *builder, const struct spa_dict *dict) +{ + uint32_t i, n_items; + struct spa_pod_frame f; + + n_items = dict ? dict->n_items : 0; + + spa_pod_builder_prop(builder, SPA_PARAM_TAG_info, SPA_POD_PROP_FLAG_HINT_DICT); + spa_pod_builder_push_struct(builder, &f); + spa_pod_builder_int(builder, n_items); + for (i = 0; i < n_items; i++) { + spa_pod_builder_string(builder, dict->items[i].key); + spa_pod_builder_string(builder, dict->items[i].value); + } + spa_pod_builder_pop(builder, &f); +} + +static inline struct spa_pod * +spa_tag_build_end(struct spa_pod_builder *builder, struct spa_pod_frame *f) +{ + return (struct spa_pod*)spa_pod_builder_pop(builder, f); +} + +/** + * \} + */ + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* SPA_PARAM_TAG_UTILS_H */ diff --git a/spa/include/spa/param/tag.h b/spa/include/spa/param/tag.h new file mode 100644 index 000000000..8e36ce5c3 --- /dev/null +++ b/spa/include/spa/param/tag.h @@ -0,0 +1,46 @@ +/* Simple Plugin API */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_PARAM_TAG_H +#define SPA_PARAM_TAG_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * \addtogroup spa_param + * \{ + */ + +#include + +/** properties for SPA_TYPE_OBJECT_ParamTag */ +enum spa_param_tag { + SPA_PARAM_TAG_START, + SPA_PARAM_TAG_direction, /**< direction, input/output (Id enum spa_direction) */ + SPA_PARAM_TAG_info, /**< Struct( + * Int: n_items + * (String: key + * String: value)* + * ) */ +}; + +/** helper structure for managing tag objects */ +struct spa_tag_info { + enum spa_direction direction; + const struct spa_pod *info; +}; + +#define SPA_TAG_INFO(dir,...) ((struct spa_tag_info) { .direction = (dir), ## __VA_ARGS__ }) + +/** + * \} + */ + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* SPA_PARAM_TAG_H */ diff --git a/spa/include/spa/param/type-info.h b/spa/include/spa/param/type-info.h index 730a1f536..fee2d030a 100644 --- a/spa/include/spa/param/type-info.h +++ b/spa/include/spa/param/type-info.h @@ -14,5 +14,6 @@ #include #include #include +#include #endif /* SPA_PARAM_TYPE_INFO_H */ diff --git a/spa/include/spa/utils/type-info.h b/spa/include/spa/utils/type-info.h index 7af743b44..1d9cd29b7 100644 --- a/spa/include/spa/utils/type-info.h +++ b/spa/include/spa/utils/type-info.h @@ -83,6 +83,7 @@ static const struct spa_type_info spa_types[] = { { SPA_TYPE_OBJECT_Profiler, SPA_TYPE_Object, SPA_TYPE_INFO_Profiler, spa_type_profiler }, { SPA_TYPE_OBJECT_ParamLatency, SPA_TYPE_Object, SPA_TYPE_INFO_PARAM_Latency, spa_type_param_latency }, { SPA_TYPE_OBJECT_ParamProcessLatency, SPA_TYPE_Object, SPA_TYPE_INFO_PARAM_ProcessLatency, spa_type_param_process_latency }, + { SPA_TYPE_OBJECT_ParamTag, SPA_TYPE_Object, SPA_TYPE_INFO_PARAM_Tag, spa_type_param_tag }, { 0, 0, NULL, NULL } }; diff --git a/spa/include/spa/utils/type.h b/spa/include/spa/utils/type.h index 7c5d9ff9b..65610c11e 100644 --- a/spa/include/spa/utils/type.h +++ b/spa/include/spa/utils/type.h @@ -78,6 +78,7 @@ enum { SPA_TYPE_OBJECT_Profiler, SPA_TYPE_OBJECT_ParamLatency, SPA_TYPE_OBJECT_ParamProcessLatency, + SPA_TYPE_OBJECT_ParamTag, _SPA_TYPE_OBJECT_LAST, /**< not part of ABI */ /* vendor extensions */ diff --git a/spa/plugins/alsa/alsa-pcm-sink.c b/spa/plugins/alsa/alsa-pcm-sink.c index 875f4ffb4..7134323ae 100644 --- a/spa/plugins/alsa/alsa-pcm-sink.c +++ b/spa/plugins/alsa/alsa-pcm-sink.c @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include "alsa-pcm.h" @@ -669,7 +671,7 @@ impl_node_port_set_param(void *object, const struct spa_pod *param) { struct state *this = object; - int res; + int res = 0; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -693,9 +695,12 @@ impl_node_port_set_param(void *object, this->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; this->port_params[PORT_Latency].user++; emit_port_info(this, false); - res = 0; break; } + case SPA_PARAM_Tag: + if (param != NULL) + spa_debug_log_pod(this->log, SPA_LOG_LEVEL_DEBUG, 0, NULL, param); + break; default: res = -ENOENT; break; diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index fa118c03f..e9a669781 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -75,7 +76,8 @@ struct impl { #define IDX_PortConfig 5 #define IDX_Latency 6 #define IDX_ProcessLatency 7 -#define N_NODE_PARAMS 8 +#define IDX_Tag 8 +#define N_NODE_PARAMS 9 struct spa_param_info params[N_NODE_PARAMS]; uint32_t convert_params_flags[N_NODE_PARAMS]; uint32_t follower_params_flags[N_NODE_PARAMS]; @@ -204,6 +206,7 @@ next: case SPA_PARAM_EnumFormat: case SPA_PARAM_Format: case SPA_PARAM_Latency: + case SPA_PARAM_Tag: res = spa_node_port_enum_params_sync(this->follower, this->direction, 0, id, &result.next, filter, &result.param, &b.b); @@ -1093,8 +1096,10 @@ static int recalc_latency(struct impl *this, enum spa_direction direction, uint3 spa_pod_builder_init(&b, buffer, sizeof(buffer)); if ((res = spa_node_port_enum_params_sync(this->follower, direction, port_id, SPA_PARAM_Latency, - &index, NULL, ¶m, &b)) != 1) - return res; + &index, NULL, ¶m, &b)) != 1) { + param = NULL; + break; + } if ((res = spa_latency_parse(param, &latency)) < 0) return res; if (latency.direction == direction) @@ -1108,6 +1113,42 @@ static int recalc_latency(struct impl *this, enum spa_direction direction, uint3 return 0; } +static int recalc_tag(struct impl *this, enum spa_direction direction, uint32_t port_id) +{ + struct spa_pod_builder b = { 0 }; + uint8_t buffer[1024]; + struct spa_pod *param; + uint32_t index = 0; + struct spa_tag_info info; + int res; + + spa_log_debug(this->log, "%p: ", this); + + if (this->target == this->follower) + return 0; + + while (true) { + void *state = NULL; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if ((res = spa_node_port_enum_params_sync(this->follower, + direction, port_id, SPA_PARAM_Tag, + &index, NULL, ¶m, &b)) != 1) { + param = NULL; + break; + } + if ((res = spa_tag_parse(param, &info, &state)) < 0) + return res; + if (info.direction == direction) + break; + } + if ((res = spa_node_port_set_param(this->target, + SPA_DIRECTION_REVERSE(direction), 0, + SPA_PARAM_Tag, 0, param)) < 0) + return res; + + return 0; +} + static void follower_port_info(void *data, enum spa_direction direction, uint32_t port_id, const struct spa_port_info *info) @@ -1144,6 +1185,9 @@ static void follower_port_info(void *data, case SPA_PARAM_Latency: idx = IDX_Latency; break; + case SPA_PARAM_Tag: + idx = IDX_Tag; + break; default: continue; } @@ -1166,6 +1210,11 @@ static void follower_port_info(void *data, spa_log_debug(this->log, "latency: %d (%s)", res, spa_strerror(res)); } + if (idx == IDX_Tag) { + res = recalc_tag(this, direction, port_id); + spa_log_debug(this->log, "tag: %d (%s)", res, + spa_strerror(res)); + } if (idx == IDX_EnumFormat) { spa_log_debug(this->log, "new formats"); configure_format(this, 0, NULL); @@ -1403,7 +1452,7 @@ impl_node_port_set_param(void *object, flags, param)) < 0) return res; - if ((id == SPA_PARAM_Latency) && + if ((id == SPA_PARAM_Latency || id == SPA_PARAM_Tag) && direction == this->direction) { if ((res = spa_node_port_set_param(this->follower, direction, 0, id, flags, param)) < 0) @@ -1707,6 +1756,7 @@ impl_init(const struct spa_handle_factory *factory, this->params[IDX_PortConfig] = SPA_PARAM_INFO(SPA_PARAM_PortConfig, SPA_PARAM_INFO_READWRITE); this->params[IDX_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_READWRITE); this->params[IDX_ProcessLatency] = SPA_PARAM_INFO(SPA_PARAM_ProcessLatency, SPA_PARAM_INFO_READWRITE); + this->params[IDX_Tag] = SPA_PARAM_INFO(SPA_PARAM_Tag, SPA_PARAM_INFO_READWRITE); this->info.params = this->params; this->info.n_params = N_NODE_PARAMS; diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index 3b02381dc..4e81e6c0c 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -138,7 +139,8 @@ struct port { #define IDX_Format 3 #define IDX_Buffers 4 #define IDX_Latency 5 -#define N_PORT_PARAMS 6 +#define IDX_Tag 6 +#define N_PORT_PARAMS 7 struct spa_param_info params[N_PORT_PARAMS]; char position[16]; @@ -171,6 +173,7 @@ struct dir { unsigned int have_format:1; unsigned int have_profile:1; struct spa_latency_info latency; + struct spa_pod *tag; uint32_t remap[MAX_PORTS]; @@ -339,6 +342,7 @@ static int init_port(struct impl *this, enum spa_direction direction, uint32_t p port->params[IDX_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); port->params[IDX_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); port->params[IDX_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_READWRITE); + port->params[IDX_Tag] = SPA_PARAM_INFO(SPA_PARAM_Tag, SPA_PARAM_INFO_READWRITE); port->info.params = port->params; port->info.n_params = N_PORT_PARAMS; @@ -2108,6 +2112,22 @@ impl_node_port_enum_params(void *object, int seq, return 0; } break; + case SPA_PARAM_Tag: + switch (result.index) { + case 0: case 1: + { + uint32_t idx = result.index; + if (port->is_monitor) + idx = idx ^ 1; + param = this->dir[idx].tag; + if (param == NULL) + goto next; + break; + } + default: + return 0; + } + break; default: return -ENOENT; } @@ -2173,6 +2193,48 @@ static int port_set_latency(void *object, return 0; } +static int port_set_tag(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t flags, + const struct spa_pod *tag) +{ + struct impl *this = object; + struct port *port, *oport; + enum spa_direction other = SPA_DIRECTION_REVERSE(direction); + uint32_t i; + + spa_log_debug(this->log, "%p: set tag direction:%d id:%d %p", + this, direction, port_id, tag); + + port = GET_PORT(this, direction, port_id); + if (port->is_monitor) + return 0; + + free(this->dir[other].tag); + this->dir[other].tag = NULL; + + if (tag != NULL) { + struct spa_tag_info info; + void *state = NULL; + if (spa_tag_parse(tag, &info, &state) < 0 || + info.direction != other) + return -EINVAL; + this->dir[other].tag = spa_pod_copy(tag); + } + + for (i = 0; i < this->dir[other].n_ports; i++) { + oport = GET_PORT(this, other, i); + oport->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + oport->params[IDX_Tag].user++; + emit_port_info(this, oport, false); + } + port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + port->params[IDX_Tag].user++; + emit_port_info(this, port, false); + return 0; +} + static int port_set_format(void *object, enum spa_direction direction, uint32_t port_id, @@ -2296,6 +2358,8 @@ impl_node_port_set_param(void *object, switch (id) { case SPA_PARAM_Latency: return port_set_latency(this, direction, port_id, flags, param); + case SPA_PARAM_Tag: + return port_set_tag(this, direction, port_id, flags, param); case SPA_PARAM_Format: return port_set_format(this, direction, port_id, flags, param); default: @@ -3123,19 +3187,27 @@ static int impl_get_interface(struct spa_handle *handle, const char *type, void return 0; } +static void free_dir(struct dir *dir) +{ + uint32_t i; + for (i = 0; i < MAX_PORTS; i++) + free(dir->ports[i]); + if (dir->conv.free) + convert_free(&dir->conv); + free(dir->tag); +} + static int impl_clear(struct spa_handle *handle) { struct impl *this; - uint32_t i; spa_return_val_if_fail(handle != NULL, -EINVAL); this = (struct impl *) handle; - for (i = 0; i < MAX_PORTS; i++) - free(this->dir[SPA_DIRECTION_INPUT].ports[i]); - for (i = 0; i < MAX_PORTS; i++) - free(this->dir[SPA_DIRECTION_OUTPUT].ports[i]); + free_dir(&this->dir[SPA_DIRECTION_INPUT]); + free_dir(&this->dir[SPA_DIRECTION_OUTPUT]); + free(this->empty); free(this->scratch); free(this->tmp[0]); @@ -3143,10 +3215,6 @@ static int impl_clear(struct spa_handle *handle) if (this->resample.free) resample_free(&this->resample); - if (this->dir[0].conv.free) - convert_free(&this->dir[0].conv); - if (this->dir[1].conv.free) - convert_free(&this->dir[1].conv); if (this->wav_file != NULL) wav_file_close(this->wav_file); free (this->vol_ramp_sequence); diff --git a/src/examples/video-play.c b/src/examples/video-play.c index 78a086e60..a98a95c55 100644 --- a/src/examples/video-play.c +++ b/src/examples/video-play.c @@ -14,8 +14,10 @@ #include #include +#include #include #include +#include #include @@ -278,6 +280,10 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) void *d; int32_t mult, size; + if (param != NULL && id == SPA_PARAM_Tag) { + spa_debug_pod(0, NULL, param); + return; + } /* NULL means to clear the format */ if (param == NULL || id != SPA_PARAM_Format) return; @@ -422,7 +428,7 @@ static void do_quit(void *userdata, int signal_number) int main(int argc, char *argv[]) { struct data data = { 0, }; - const struct spa_pod *params[2]; + const struct spa_pod *params[3]; uint8_t buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); struct pw_properties *props; @@ -478,6 +484,16 @@ int main(int argc, char *argv[]) * object to the stack. */ n_params = build_format(&data, &b, params); + { + struct spa_pod_frame f; + struct spa_dict_item items[1]; + /* send a tag, input tags travel upstream */ + spa_tag_build_start(&b, &f, SPA_PARAM_Tag, SPA_DIRECTION_INPUT); + items[0] = SPA_DICT_ITEM_INIT("my-tag-other-key", "my-special-other-tag-value"); + spa_tag_build_add_dict(&b, &SPA_DICT_INIT(items, 1)); + params[n_params++] = spa_tag_build_end(&b, &f); + } + /* now connect the stream, we need a direction (input/output), * an optional target node to connect to, some flags and parameters */ diff --git a/src/examples/video-src.c b/src/examples/video-src.c index b80ee99f7..b1f8f3d0b 100644 --- a/src/examples/video-src.c +++ b/src/examples/video-src.c @@ -14,6 +14,8 @@ #include #include +#include +#include #include @@ -212,6 +214,10 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer)); const struct spa_pod *params[5]; + if (param != NULL && id == SPA_PARAM_Tag) { + spa_debug_pod(0, NULL, param); + return; + } if (param == NULL || id != SPA_PARAM_Format) return; @@ -276,7 +282,7 @@ static void do_quit(void *userdata, int signal_number) int main(int argc, char *argv[]) { struct data data = { 0, }; - const struct spa_pod *params[1]; + const struct spa_pod *params[2]; uint8_t buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); @@ -314,6 +320,16 @@ int main(int argc, char *argv[]) &SPA_RECTANGLE(4096, 4096)), SPA_FORMAT_VIDEO_framerate, SPA_POD_Fraction(&SPA_FRACTION(25, 1))); + { + struct spa_pod_frame f; + struct spa_dict_item items[1]; + /* send a tag, output tags travel downstream */ + spa_tag_build_start(&b, &f, SPA_PARAM_Tag, SPA_DIRECTION_OUTPUT); + items[0] = SPA_DICT_ITEM_INIT("my-tag-key", "my-special-tag-value"); + spa_tag_build_add_dict(&b, &SPA_DICT_INIT(items, 1)); + params[1] = spa_tag_build_end(&b, &f); + } + pw_stream_add_listener(data.stream, &data.stream_listener, &stream_events, @@ -324,7 +340,7 @@ int main(int argc, char *argv[]) PW_ID_ANY, PW_STREAM_FLAG_DRIVER | PW_STREAM_FLAG_MAP_BUFFERS, - params, 1); + params, 2); pw_main_loop_run(data.loop); diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index abfb573ae..dd1475682 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -83,7 +83,8 @@ struct port { #define PORT_Format 3 #define PORT_Buffers 4 #define PORT_Latency 5 -#define N_PORT_PARAMS 6 +#define PORT_Tag 6 +#define N_PORT_PARAMS 7 struct spa_param_info params[N_PORT_PARAMS]; struct spa_io_buffers *io; @@ -189,6 +190,8 @@ static int get_port_param_index(uint32_t id) return PORT_Buffers; case SPA_PARAM_Latency: return PORT_Latency; + case SPA_PARAM_Tag: + return PORT_Tag; default: return -1; } @@ -1849,6 +1852,7 @@ void *pw_filter_add_port(struct pw_filter *filter, p->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); p->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); p->params[PORT_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_WRITE); + p->params[PORT_Tag] = SPA_PARAM_INFO(SPA_PARAM_Tag, SPA_PARAM_INFO_WRITE); p->info.params = p->params; p->info.n_params = N_PORT_PARAMS; diff --git a/src/pipewire/impl-link.c b/src/pipewire/impl-link.c index 5271af940..675fab131 100644 --- a/src/pipewire/impl-link.c +++ b/src/pipewire/impl-link.c @@ -774,6 +774,7 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port) pw_impl_port_emit_link_removed(this->input, this); pw_impl_port_recalc_latency(this->input); + pw_impl_port_recalc_tag(this->input); if ((res = pw_impl_port_use_buffers(port, mix, 0, NULL, 0)) < 0) { pw_log_warn("%p: port %p clear error %s", this, port, spa_strerror(res)); @@ -803,6 +804,7 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port) pw_impl_port_emit_link_removed(this->output, this); pw_impl_port_recalc_latency(this->output); + pw_impl_port_recalc_tag(this->output); /* we don't clear output buffers when the link goes away. They will get * cleared when the node goes to suspend */ @@ -988,6 +990,14 @@ static void input_port_latency_changed(void *data) pw_impl_port_recalc_latency(this->output); } +static void input_port_tag_changed(void *data) +{ + struct impl *impl = data; + struct pw_impl_link *this = &impl->this; + if (!this->feedback) + pw_impl_port_recalc_tag(this->output); +} + static void output_port_latency_changed(void *data) { struct impl *impl = data; @@ -996,11 +1006,20 @@ static void output_port_latency_changed(void *data) pw_impl_port_recalc_latency(this->input); } +static void output_port_tag_changed(void *data) +{ + struct impl *impl = data; + struct pw_impl_link *this = &impl->this; + if (!this->feedback) + pw_impl_port_recalc_tag(this->input); +} + static const struct pw_impl_port_events input_port_events = { PW_VERSION_IMPL_PORT_EVENTS, .param_changed = input_port_param_changed, .state_changed = input_port_state_changed, .latency_changed = input_port_latency_changed, + .tag_changed = input_port_tag_changed, }; static const struct pw_impl_port_events output_port_events = { @@ -1008,6 +1027,7 @@ static const struct pw_impl_port_events output_port_events = { .param_changed = output_port_param_changed, .state_changed = output_port_state_changed, .latency_changed = output_port_latency_changed, + .tag_changed = output_port_tag_changed, }; static void node_result(struct impl *impl, void *obj, @@ -1395,6 +1415,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context, pw_impl_port_recalc_latency(output); pw_impl_port_recalc_latency(input); + pw_impl_port_recalc_tag(output); + pw_impl_port_recalc_tag(input); if (impl->onode != impl->inode) this->peer = pw_node_peer_ref(impl->onode, impl->inode); diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index 2df4a7408..31b2af81b 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include #include "pipewire/impl.h" #include "pipewire/private.h" @@ -444,7 +446,7 @@ static int process_latency_param(void *data, int seq, struct pw_impl_port *this = data; struct spa_latency_info latency; - if (id != SPA_PARAM_Latency) + if (id != SPA_PARAM_Latency || param == NULL) return -EINVAL; if (spa_latency_parse(param, &latency) < 0) @@ -464,6 +466,37 @@ static int process_latency_param(void *data, int seq, return 0; } +static int process_tag_param(void *data, int seq, + uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param) +{ + struct pw_impl_port *this = data; + struct spa_tag_info info; + struct spa_pod *old; + void *state = NULL; + + if (id != SPA_PARAM_Tag || param == NULL) + return -EINVAL; + if (spa_tag_parse(param, &info, &state) < 0) + return 0; + + old = this->tag[info.direction]; + + if (spa_tag_compare(old, param) == 0) + return 0; + + pw_log_debug("port %p: got %s tag %p", this, + pw_direction_as_string(info.direction), param); + if (param) + pw_log_pod(SPA_LOG_LEVEL_DEBUG, param); + + free(old); + this->tag[info.direction] = spa_pod_copy(param); + + if (info.direction == this->direction) + pw_impl_port_emit_tag_changed(this); + + return 0; +} static void update_info(struct pw_impl_port *port, const struct spa_port_info *info) { @@ -514,6 +547,13 @@ static void update_info(struct pw_impl_port *port, const struct spa_port_info *i pw_impl_port_for_each_param(port, 0, id, 0, UINT32_MAX, NULL, process_latency_param, port); break; + case SPA_PARAM_Tag: + port->have_tag_param = + SPA_FLAG_IS_SET(info->params[i].flags, SPA_PARAM_INFO_WRITE); + if (port->node != NULL) + pw_impl_port_for_each_param(port, 0, id, 0, UINT32_MAX, + NULL, process_tag_param, port); + break; default: break; } @@ -1061,6 +1101,7 @@ int pw_impl_port_add(struct pw_impl_port *port, struct pw_impl_node *node) pw_impl_port_for_each_param(port, 0, SPA_PARAM_IO, 0, 0, NULL, check_param_io, port); pw_impl_port_for_each_param(port, 0, SPA_PARAM_Latency, 0, 0, NULL, process_latency_param, port); + pw_impl_port_for_each_param(port, 0, SPA_PARAM_Tag, 0, 0, NULL, process_tag_param, port); nprops = pw_impl_node_get_properties(node); media_class = pw_properties_get(nprops, PW_KEY_MEDIA_CLASS); @@ -1310,6 +1351,8 @@ void pw_impl_port_destroy(struct pw_impl_port *port) pw_param_clear(&impl->param_list, SPA_ID_INVALID); pw_param_clear(&impl->pending_list, SPA_ID_INVALID); + free(port->tag[SPA_DIRECTION_INPUT]); + free(port->tag[SPA_DIRECTION_OUTPUT]); pw_map_clear(&port->mix_port_map); @@ -1572,6 +1615,79 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port) return pw_impl_port_set_param(port, SPA_PARAM_Latency, 0, param); } +int pw_impl_port_recalc_tag(struct pw_impl_port *port) +{ + struct pw_impl_link *l; + struct pw_impl_port *other; + struct spa_pod *param, *tag, *old; + struct spa_pod_dynamic_builder b = { 0 }; + struct spa_pod_frame f; + struct spa_tag_info info; + enum spa_direction direction; + uint8_t buffer[1024]; + int count = 0; + bool changed; + + if (port->destroying) + return 0; + + direction = SPA_DIRECTION_REVERSE(port->direction); + + spa_pod_dynamic_builder_init(&b, buffer, sizeof(buffer), 4096); + spa_tag_build_start(&b.b, &f, SPA_PARAM_Tag, direction); + + if (port->direction == PW_DIRECTION_OUTPUT) { + spa_list_for_each(l, &port->links, output_link) { + other = l->input; + tag = other->tag[other->direction]; + if (tag) { + void *state = NULL; + while (spa_tag_parse(tag, &info, &state) == 1) { + spa_tag_build_add_info(&b.b, info.info); + count++; + } + } + } + } else { + spa_list_for_each(l, &port->links, input_link) { + other = l->output; + tag = other->tag[other->direction]; + if (tag) { + void *state = NULL; + while (spa_tag_parse(tag, &info, &state) == 1) { + spa_tag_build_add_info(&b.b, info.info); + count++; + } + } + } + } + param = count == 0 ? NULL : spa_tag_build_end(&b.b, &f); + + old = port->tag[direction]; + + changed = spa_tag_compare(old, param); + + pw_log_info("port %d: %p %s %s tag %p", + port->info.id, port, changed ? "set" : "keep", + pw_direction_as_string(direction), param); + if (param) + pw_log_pod(SPA_LOG_LEVEL_INFO, param); + + if (changed) { + free(old); + port->tag[direction] = param ? spa_pod_copy(param) : NULL; + } + spa_pod_dynamic_builder_clean(&b); + + if (!changed) + return 0; + + if (!port->have_tag_param) + return 0; + + return pw_impl_port_set_param(port, SPA_PARAM_Tag, 0, port->tag[direction]); +} + SPA_EXPORT int pw_impl_port_is_linked(struct pw_impl_port *port) { diff --git a/src/pipewire/impl-port.h b/src/pipewire/impl-port.h index 08d986633..34b8b7b9c 100644 --- a/src/pipewire/impl-port.h +++ b/src/pipewire/impl-port.h @@ -36,7 +36,7 @@ enum pw_impl_port_state { /** Port events, use \ref pw_impl_port_add_listener */ struct pw_impl_port_events { -#define PW_VERSION_IMPL_PORT_EVENTS 2 +#define PW_VERSION_IMPL_PORT_EVENTS 3 uint32_t version; /** The port is destroyed */ @@ -72,6 +72,8 @@ struct pw_impl_port_events { /** latency changed. Since version 2 */ void (*latency_changed) (void *data); + /** tag changed. Since version 3 */ + void (*tag_changed) (void *data); }; /** Create a new port diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 26dce7e76..cc9567b27 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -782,6 +782,7 @@ struct pw_impl_port_implementation { #define pw_impl_port_emit_control_removed(p,c) pw_impl_port_emit(p, control_removed, 0, c) #define pw_impl_port_emit_param_changed(p,i) pw_impl_port_emit(p, param_changed, 1, i) #define pw_impl_port_emit_latency_changed(p) pw_impl_port_emit(p, latency_changed, 2) +#define pw_impl_port_emit_tag_changed(p) pw_impl_port_emit(p, tag_changed, 3) #define PW_IMPL_PORT_IS_CONTROL(port) SPA_FLAG_MASK((port)->flags, \ PW_IMPL_PORT_FLAG_BUFFERS|PW_IMPL_PORT_FLAG_CONTROL,\ @@ -847,6 +848,9 @@ struct pw_impl_port { unsigned int have_latency_param:1; unsigned int ignore_latency:1; + unsigned int have_tag_param:1; + struct spa_pod *tag[2]; /**< tags */ + void *owner_data; /**< extra owner data */ void *user_data; /**< extra user data */ }; @@ -1221,6 +1225,7 @@ int pw_impl_port_use_buffers(struct pw_impl_port *port, struct pw_impl_port_mix struct spa_buffer **buffers, uint32_t n_buffers); int pw_impl_port_recalc_latency(struct pw_impl_port *port); +int pw_impl_port_recalc_tag(struct pw_impl_port *port); /** Change the state of the node */ int pw_impl_node_set_state(struct pw_impl_node *node, enum pw_node_state state); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 3f43493a1..d5604485d 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -113,7 +113,8 @@ struct stream { #define PORT_Format 3 #define PORT_Buffers 4 #define PORT_Latency 5 -#define N_PORT_PARAMS 6 +#define PORT_Tag 6 +#define N_PORT_PARAMS 7 struct spa_param_info port_params[N_PORT_PARAMS]; struct spa_list param_list; @@ -191,6 +192,8 @@ static int get_port_param_index(uint32_t id) return PORT_Buffers; case SPA_PARAM_Latency: return PORT_Latency; + case SPA_PARAM_Tag: + return PORT_Tag; default: return -1; } @@ -1960,6 +1963,7 @@ pw_stream_connect(struct pw_stream *stream, impl->port_params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); impl->port_params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); impl->port_params[PORT_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_WRITE); + impl->port_params[PORT_Tag] = SPA_PARAM_INFO(SPA_PARAM_Tag, SPA_PARAM_INFO_WRITE); impl->port_info.props = &impl->port_props->dict; impl->port_info.params = impl->port_params; impl->port_info.n_params = N_PORT_PARAMS; diff --git a/test/test-spa-utils.c b/test/test-spa-utils.c index 38c363706..a3052c97b 100644 --- a/test/test-spa-utils.c +++ b/test/test-spa-utils.c @@ -125,7 +125,8 @@ PWTEST(utils_abi) pwtest_int_eq(SPA_TYPE_OBJECT_Profiler, 0x4000a); pwtest_int_eq(SPA_TYPE_OBJECT_ParamLatency, 0x4000b); pwtest_int_eq(SPA_TYPE_OBJECT_ParamProcessLatency, 0x4000c); - pwtest_int_eq(_SPA_TYPE_OBJECT_LAST, 0x4000d); + pwtest_int_eq(SPA_TYPE_OBJECT_ParamTag, 0x4000d); + pwtest_int_eq(_SPA_TYPE_OBJECT_LAST, 0x4000e); pwtest_int_eq(SPA_TYPE_VENDOR_PipeWire, 0x02000000); pwtest_int_eq(SPA_TYPE_VENDOR_Other, 0x7f000000);