Port: Add tag param

The tag param has a list of arbitrary key/value pairs. Like the Latency
param, it travels up and downstream. Mixers will append the info
dictionaries or do some more fancy merging.

The purpose is to transport arbirary metadata, out-of-band, through the
graph and it's used for stream metadata and other stream properties.
This commit is contained in:
Wim Taymans 2023-08-24 16:41:21 +02:00
parent 6bf42e9bcd
commit 41dcac0ecd
20 changed files with 566 additions and 24 deletions

View file

@ -14,8 +14,10 @@
#include <spa/utils/result.h>
#include <spa/param/video/format-utils.h>
#include <spa/param/tag-utils.h>
#include <spa/param/props.h>
#include <spa/debug/format.h>
#include <spa/debug/pod.h>
#include <pipewire/pipewire.h>
@ -278,6 +280,10 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
void *d;
int32_t mult, size;
if (param != NULL && id == SPA_PARAM_Tag) {
spa_debug_pod(0, NULL, param);
return;
}
/* NULL means to clear the format */
if (param == NULL || id != SPA_PARAM_Format)
return;
@ -422,7 +428,7 @@ static void do_quit(void *userdata, int signal_number)
int main(int argc, char *argv[])
{
struct data data = { 0, };
const struct spa_pod *params[2];
const struct spa_pod *params[3];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
struct pw_properties *props;
@ -478,6 +484,16 @@ int main(int argc, char *argv[])
* object to the stack. */
n_params = build_format(&data, &b, params);
{
struct spa_pod_frame f;
struct spa_dict_item items[1];
/* send a tag, input tags travel upstream */
spa_tag_build_start(&b, &f, SPA_PARAM_Tag, SPA_DIRECTION_INPUT);
items[0] = SPA_DICT_ITEM_INIT("my-tag-other-key", "my-special-other-tag-value");
spa_tag_build_add_dict(&b, &SPA_DICT_INIT(items, 1));
params[n_params++] = spa_tag_build_end(&b, &f);
}
/* now connect the stream, we need a direction (input/output),
* an optional target node to connect to, some flags and parameters
*/

View file

@ -14,6 +14,8 @@
#include <math.h>
#include <spa/param/video/format-utils.h>
#include <spa/param/tag-utils.h>
#include <spa/debug/pod.h>
#include <pipewire/pipewire.h>
@ -212,6 +214,10 @@ on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param)
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer));
const struct spa_pod *params[5];
if (param != NULL && id == SPA_PARAM_Tag) {
spa_debug_pod(0, NULL, param);
return;
}
if (param == NULL || id != SPA_PARAM_Format)
return;
@ -276,7 +282,7 @@ static void do_quit(void *userdata, int signal_number)
int main(int argc, char *argv[])
{
struct data data = { 0, };
const struct spa_pod *params[1];
const struct spa_pod *params[2];
uint8_t buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
@ -314,6 +320,16 @@ int main(int argc, char *argv[])
&SPA_RECTANGLE(4096, 4096)),
SPA_FORMAT_VIDEO_framerate, SPA_POD_Fraction(&SPA_FRACTION(25, 1)));
{
struct spa_pod_frame f;
struct spa_dict_item items[1];
/* send a tag, output tags travel downstream */
spa_tag_build_start(&b, &f, SPA_PARAM_Tag, SPA_DIRECTION_OUTPUT);
items[0] = SPA_DICT_ITEM_INIT("my-tag-key", "my-special-tag-value");
spa_tag_build_add_dict(&b, &SPA_DICT_INIT(items, 1));
params[1] = spa_tag_build_end(&b, &f);
}
pw_stream_add_listener(data.stream,
&data.stream_listener,
&stream_events,
@ -324,7 +340,7 @@ int main(int argc, char *argv[])
PW_ID_ANY,
PW_STREAM_FLAG_DRIVER |
PW_STREAM_FLAG_MAP_BUFFERS,
params, 1);
params, 2);
pw_main_loop_run(data.loop);

View file

@ -83,7 +83,8 @@ struct port {
#define PORT_Format 3
#define PORT_Buffers 4
#define PORT_Latency 5
#define N_PORT_PARAMS 6
#define PORT_Tag 6
#define N_PORT_PARAMS 7
struct spa_param_info params[N_PORT_PARAMS];
struct spa_io_buffers *io;
@ -189,6 +190,8 @@ static int get_port_param_index(uint32_t id)
return PORT_Buffers;
case SPA_PARAM_Latency:
return PORT_Latency;
case SPA_PARAM_Tag:
return PORT_Tag;
default:
return -1;
}
@ -1849,6 +1852,7 @@ void *pw_filter_add_port(struct pw_filter *filter,
p->params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
p->params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
p->params[PORT_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_WRITE);
p->params[PORT_Tag] = SPA_PARAM_INFO(SPA_PARAM_Tag, SPA_PARAM_INFO_WRITE);
p->info.params = p->params;
p->info.n_params = N_PORT_PARAMS;

View file

@ -774,6 +774,7 @@ static void input_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_impl_port_emit_link_removed(this->input, this);
pw_impl_port_recalc_latency(this->input);
pw_impl_port_recalc_tag(this->input);
if ((res = pw_impl_port_use_buffers(port, mix, 0, NULL, 0)) < 0) {
pw_log_warn("%p: port %p clear error %s", this, port, spa_strerror(res));
@ -803,6 +804,7 @@ static void output_remove(struct pw_impl_link *this, struct pw_impl_port *port)
pw_impl_port_emit_link_removed(this->output, this);
pw_impl_port_recalc_latency(this->output);
pw_impl_port_recalc_tag(this->output);
/* we don't clear output buffers when the link goes away. They will get
* cleared when the node goes to suspend */
@ -988,6 +990,14 @@ static void input_port_latency_changed(void *data)
pw_impl_port_recalc_latency(this->output);
}
static void input_port_tag_changed(void *data)
{
struct impl *impl = data;
struct pw_impl_link *this = &impl->this;
if (!this->feedback)
pw_impl_port_recalc_tag(this->output);
}
static void output_port_latency_changed(void *data)
{
struct impl *impl = data;
@ -996,11 +1006,20 @@ static void output_port_latency_changed(void *data)
pw_impl_port_recalc_latency(this->input);
}
static void output_port_tag_changed(void *data)
{
struct impl *impl = data;
struct pw_impl_link *this = &impl->this;
if (!this->feedback)
pw_impl_port_recalc_tag(this->input);
}
static const struct pw_impl_port_events input_port_events = {
PW_VERSION_IMPL_PORT_EVENTS,
.param_changed = input_port_param_changed,
.state_changed = input_port_state_changed,
.latency_changed = input_port_latency_changed,
.tag_changed = input_port_tag_changed,
};
static const struct pw_impl_port_events output_port_events = {
@ -1008,6 +1027,7 @@ static const struct pw_impl_port_events output_port_events = {
.param_changed = output_port_param_changed,
.state_changed = output_port_state_changed,
.latency_changed = output_port_latency_changed,
.tag_changed = output_port_tag_changed,
};
static void node_result(struct impl *impl, void *obj,
@ -1395,6 +1415,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
pw_impl_port_recalc_latency(output);
pw_impl_port_recalc_latency(input);
pw_impl_port_recalc_tag(output);
pw_impl_port_recalc_tag(input);
if (impl->onode != impl->inode)
this->peer = pw_node_peer_ref(impl->onode, impl->inode);

View file

@ -9,6 +9,7 @@
#include <spa/pod/parser.h>
#include <spa/param/audio/format-utils.h>
#include <spa/param/tag-utils.h>
#include <spa/node/utils.h>
#include <spa/utils/names.h>
#include <spa/utils/string.h>
@ -16,6 +17,7 @@
#include <spa/debug/types.h>
#include <spa/pod/filter.h>
#include <spa/pod/dynamic.h>
#include <spa/debug/pod.h>
#include "pipewire/impl.h"
#include "pipewire/private.h"
@ -444,7 +446,7 @@ static int process_latency_param(void *data, int seq,
struct pw_impl_port *this = data;
struct spa_latency_info latency;
if (id != SPA_PARAM_Latency)
if (id != SPA_PARAM_Latency || param == NULL)
return -EINVAL;
if (spa_latency_parse(param, &latency) < 0)
@ -464,6 +466,37 @@ static int process_latency_param(void *data, int seq,
return 0;
}
static int process_tag_param(void *data, int seq,
uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param)
{
struct pw_impl_port *this = data;
struct spa_tag_info info;
struct spa_pod *old;
void *state = NULL;
if (id != SPA_PARAM_Tag || param == NULL)
return -EINVAL;
if (spa_tag_parse(param, &info, &state) < 0)
return 0;
old = this->tag[info.direction];
if (spa_tag_compare(old, param) == 0)
return 0;
pw_log_debug("port %p: got %s tag %p", this,
pw_direction_as_string(info.direction), param);
if (param)
pw_log_pod(SPA_LOG_LEVEL_DEBUG, param);
free(old);
this->tag[info.direction] = spa_pod_copy(param);
if (info.direction == this->direction)
pw_impl_port_emit_tag_changed(this);
return 0;
}
static void update_info(struct pw_impl_port *port, const struct spa_port_info *info)
{
@ -514,6 +547,13 @@ static void update_info(struct pw_impl_port *port, const struct spa_port_info *i
pw_impl_port_for_each_param(port, 0, id, 0, UINT32_MAX,
NULL, process_latency_param, port);
break;
case SPA_PARAM_Tag:
port->have_tag_param =
SPA_FLAG_IS_SET(info->params[i].flags, SPA_PARAM_INFO_WRITE);
if (port->node != NULL)
pw_impl_port_for_each_param(port, 0, id, 0, UINT32_MAX,
NULL, process_tag_param, port);
break;
default:
break;
}
@ -1061,6 +1101,7 @@ int pw_impl_port_add(struct pw_impl_port *port, struct pw_impl_node *node)
pw_impl_port_for_each_param(port, 0, SPA_PARAM_IO, 0, 0, NULL, check_param_io, port);
pw_impl_port_for_each_param(port, 0, SPA_PARAM_Latency, 0, 0, NULL, process_latency_param, port);
pw_impl_port_for_each_param(port, 0, SPA_PARAM_Tag, 0, 0, NULL, process_tag_param, port);
nprops = pw_impl_node_get_properties(node);
media_class = pw_properties_get(nprops, PW_KEY_MEDIA_CLASS);
@ -1310,6 +1351,8 @@ void pw_impl_port_destroy(struct pw_impl_port *port)
pw_param_clear(&impl->param_list, SPA_ID_INVALID);
pw_param_clear(&impl->pending_list, SPA_ID_INVALID);
free(port->tag[SPA_DIRECTION_INPUT]);
free(port->tag[SPA_DIRECTION_OUTPUT]);
pw_map_clear(&port->mix_port_map);
@ -1572,6 +1615,79 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port)
return pw_impl_port_set_param(port, SPA_PARAM_Latency, 0, param);
}
int pw_impl_port_recalc_tag(struct pw_impl_port *port)
{
struct pw_impl_link *l;
struct pw_impl_port *other;
struct spa_pod *param, *tag, *old;
struct spa_pod_dynamic_builder b = { 0 };
struct spa_pod_frame f;
struct spa_tag_info info;
enum spa_direction direction;
uint8_t buffer[1024];
int count = 0;
bool changed;
if (port->destroying)
return 0;
direction = SPA_DIRECTION_REVERSE(port->direction);
spa_pod_dynamic_builder_init(&b, buffer, sizeof(buffer), 4096);
spa_tag_build_start(&b.b, &f, SPA_PARAM_Tag, direction);
if (port->direction == PW_DIRECTION_OUTPUT) {
spa_list_for_each(l, &port->links, output_link) {
other = l->input;
tag = other->tag[other->direction];
if (tag) {
void *state = NULL;
while (spa_tag_parse(tag, &info, &state) == 1) {
spa_tag_build_add_info(&b.b, info.info);
count++;
}
}
}
} else {
spa_list_for_each(l, &port->links, input_link) {
other = l->output;
tag = other->tag[other->direction];
if (tag) {
void *state = NULL;
while (spa_tag_parse(tag, &info, &state) == 1) {
spa_tag_build_add_info(&b.b, info.info);
count++;
}
}
}
}
param = count == 0 ? NULL : spa_tag_build_end(&b.b, &f);
old = port->tag[direction];
changed = spa_tag_compare(old, param);
pw_log_info("port %d: %p %s %s tag %p",
port->info.id, port, changed ? "set" : "keep",
pw_direction_as_string(direction), param);
if (param)
pw_log_pod(SPA_LOG_LEVEL_INFO, param);
if (changed) {
free(old);
port->tag[direction] = param ? spa_pod_copy(param) : NULL;
}
spa_pod_dynamic_builder_clean(&b);
if (!changed)
return 0;
if (!port->have_tag_param)
return 0;
return pw_impl_port_set_param(port, SPA_PARAM_Tag, 0, port->tag[direction]);
}
SPA_EXPORT
int pw_impl_port_is_linked(struct pw_impl_port *port)
{

View file

@ -36,7 +36,7 @@ enum pw_impl_port_state {
/** Port events, use \ref pw_impl_port_add_listener */
struct pw_impl_port_events {
#define PW_VERSION_IMPL_PORT_EVENTS 2
#define PW_VERSION_IMPL_PORT_EVENTS 3
uint32_t version;
/** The port is destroyed */
@ -72,6 +72,8 @@ struct pw_impl_port_events {
/** latency changed. Since version 2 */
void (*latency_changed) (void *data);
/** tag changed. Since version 3 */
void (*tag_changed) (void *data);
};
/** Create a new port

View file

@ -782,6 +782,7 @@ struct pw_impl_port_implementation {
#define pw_impl_port_emit_control_removed(p,c) pw_impl_port_emit(p, control_removed, 0, c)
#define pw_impl_port_emit_param_changed(p,i) pw_impl_port_emit(p, param_changed, 1, i)
#define pw_impl_port_emit_latency_changed(p) pw_impl_port_emit(p, latency_changed, 2)
#define pw_impl_port_emit_tag_changed(p) pw_impl_port_emit(p, tag_changed, 3)
#define PW_IMPL_PORT_IS_CONTROL(port) SPA_FLAG_MASK((port)->flags, \
PW_IMPL_PORT_FLAG_BUFFERS|PW_IMPL_PORT_FLAG_CONTROL,\
@ -847,6 +848,9 @@ struct pw_impl_port {
unsigned int have_latency_param:1;
unsigned int ignore_latency:1;
unsigned int have_tag_param:1;
struct spa_pod *tag[2]; /**< tags */
void *owner_data; /**< extra owner data */
void *user_data; /**< extra user data */
};
@ -1221,6 +1225,7 @@ int pw_impl_port_use_buffers(struct pw_impl_port *port, struct pw_impl_port_mix
struct spa_buffer **buffers, uint32_t n_buffers);
int pw_impl_port_recalc_latency(struct pw_impl_port *port);
int pw_impl_port_recalc_tag(struct pw_impl_port *port);
/** Change the state of the node */
int pw_impl_node_set_state(struct pw_impl_node *node, enum pw_node_state state);

View file

@ -113,7 +113,8 @@ struct stream {
#define PORT_Format 3
#define PORT_Buffers 4
#define PORT_Latency 5
#define N_PORT_PARAMS 6
#define PORT_Tag 6
#define N_PORT_PARAMS 7
struct spa_param_info port_params[N_PORT_PARAMS];
struct spa_list param_list;
@ -191,6 +192,8 @@ static int get_port_param_index(uint32_t id)
return PORT_Buffers;
case SPA_PARAM_Latency:
return PORT_Latency;
case SPA_PARAM_Tag:
return PORT_Tag;
default:
return -1;
}
@ -1960,6 +1963,7 @@ pw_stream_connect(struct pw_stream *stream,
impl->port_params[PORT_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
impl->port_params[PORT_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
impl->port_params[PORT_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_WRITE);
impl->port_params[PORT_Tag] = SPA_PARAM_INFO(SPA_PARAM_Tag, SPA_PARAM_INFO_WRITE);
impl->port_info.props = &impl->port_props->dict;
impl->port_info.params = impl->port_params;
impl->port_info.n_params = N_PORT_PARAMS;