From ebeae802ad34a9c9515804ce656c9c0455cc92ae Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 7 Sep 2023 10:01:40 +0200 Subject: [PATCH] audioconvert: improve tag and latency handling Don't just forward the tag and latency events to the follower but let the audioconvert aggregate and emit the updated tag/latency event that is then configured on the follower. When using the DSP mode of the audioconvert, this results in an accumulated latency/tag from all the DSP ports instead of just the last DSP port param update. Put properties with media. prefix in tags in pw-cat. --- spa/plugins/audioconvert/audioadapter.c | 241 +++++++++++++++--------- spa/plugins/audioconvert/audioconvert.c | 90 ++++++--- src/pipewire/impl-port.c | 16 +- src/pipewire/private.h | 1 + src/tools/pw-cat.c | 29 ++- 5 files changed, 246 insertions(+), 131 deletions(-) diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index e9a669781..3f4068a26 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -95,6 +95,7 @@ struct impl { unsigned int async:1; unsigned int passthrough:1; unsigned int follower_removing:1; + unsigned int in_recalc; }; /** \endcond */ @@ -544,6 +545,80 @@ extern const struct spa_handle_factory spa_audioconvert_factory; static const struct spa_node_events follower_node_events; +static int recalc_latency(struct impl *this, struct spa_node *src, enum spa_direction direction, + uint32_t port_id, struct spa_node *dst) +{ + struct spa_pod_builder b = { 0 }; + uint8_t buffer[1024]; + struct spa_pod *param; + uint32_t index = 0; + struct spa_latency_info latency; + int res; + + spa_log_info(this->log, "%p: %d:%d", this, direction, port_id); + + if (this->target == this->follower) + return 0; + + while (true) { + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if ((res = spa_node_port_enum_params_sync(src, + direction, port_id, SPA_PARAM_Latency, + &index, NULL, ¶m, &b)) != 1) { + param = NULL; + break; + } + if ((res = spa_latency_parse(param, &latency)) < 0) + return res; + if (latency.direction == direction) + break; + } + if ((res = spa_node_port_set_param(dst, + SPA_DIRECTION_REVERSE(direction), 0, + SPA_PARAM_Latency, 0, param)) < 0) + return res; + + return 0; +} + +static int recalc_tag(struct impl *this, struct spa_node *src, enum spa_direction direction, + uint32_t port_id, struct spa_node *dst) +{ + struct spa_pod_builder b = { 0 }; + uint8_t buffer[1024]; + struct spa_pod *param; + uint32_t index = 0; + struct spa_tag_info info; + int res; + + spa_log_debug(this->log, "%p: %d:%d", this, direction, port_id); + + if (this->target == this->follower) + return 0; + + while (true) { + void *state = NULL; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if ((res = spa_node_port_enum_params_sync(src, + direction, port_id, SPA_PARAM_Tag, + &index, NULL, ¶m, &b)) != 1) { + param = NULL; + break; + } + if ((res = spa_tag_parse(param, &info, &state)) < 0) + return res; + if (info.direction == direction) + break; + } + if ((res = spa_node_port_set_param(dst, + SPA_DIRECTION_REVERSE(direction), 0, + SPA_PARAM_Tag, 0, param)) < 0) + return res; + + return 0; +} + + static int reconfigure_mode(struct impl *this, bool passthrough, enum spa_direction direction, struct spa_pod *format) { @@ -593,6 +668,8 @@ static int reconfigure_mode(struct impl *this, bool passthrough, emit_node_info(this, false); + spa_log_debug(this->log, "%p: passthrough mode %d", this, passthrough); + return 0; } @@ -676,6 +753,8 @@ static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, if (this->target != this->follower) { if ((res = spa_node_set_param(this->target, id, flags, param)) < 0) return res; + + res = recalc_latency(this, this->follower, this->direction, 0, this->convert); } break; } @@ -956,6 +1035,61 @@ static void convert_node_info(void *data, const struct spa_node_info *info) emit_node_info(this, false); } +static void follower_convert_port_info(void *data, + enum spa_direction direction, uint32_t port_id, + const struct spa_port_info *info) +{ + struct impl *this = data; + uint32_t i; + int res; + + spa_log_info(this->log, "%p: convert port info %s %p %08"PRIx64, this, + this->direction == SPA_DIRECTION_INPUT ? + "Input" : "Output", info, info->change_mask); + + if (info->change_mask & SPA_PORT_CHANGE_MASK_PARAMS) { + for (i = 0; i < info->n_params; i++) { + uint32_t idx; + + switch (info->params[i].id) { + case SPA_PARAM_Latency: + idx = IDX_Latency; + break; + case SPA_PARAM_Tag: + idx = IDX_Tag; + break; + default: + continue; + } + + if (!this->add_listener && + this->convert_params_flags[idx] == info->params[i].flags) + continue; + + this->convert_params_flags[idx] = info->params[i].flags; + + if (this->add_listener) + continue; + + if (idx == IDX_Latency) { + this->in_recalc++; + res = recalc_latency(this, this->convert, direction, port_id, this->follower); + this->in_recalc--; + spa_log_debug(this->log, "latency: %d (%s)", res, + spa_strerror(res)); + } + if (idx == IDX_Tag) { + this->in_recalc++; + res = recalc_tag(this, this->convert, direction, port_id, this->follower); + this->in_recalc--; + spa_log_debug(this->log, "tag: %d (%s)", res, + spa_strerror(res)); + } + spa_log_debug(this->log, "param %d changed", info->params[i].id); + } + } +} + static void convert_port_info(void *data, enum spa_direction direction, uint32_t port_id, const struct spa_port_info *info) @@ -964,10 +1098,11 @@ static void convert_port_info(void *data, struct spa_port_info pi; if (direction != this->direction) { - /* skip the converter output port into the follower */ - if (port_id == 0) + if (port_id == 0) { + /* handle the converter output port into the follower separately */ + follower_convert_port_info(this, direction, port_id, info); return; - else + } else /* the monitor ports are exposed */ port_id--; } else if (info) { @@ -1075,78 +1210,6 @@ static void follower_info(void *data, const struct spa_node_info *info) spa_zero(this->info.props); this->info.change_mask &= ~SPA_NODE_CHANGE_MASK_PROPS; - -} - -static int recalc_latency(struct impl *this, enum spa_direction direction, uint32_t port_id) -{ - struct spa_pod_builder b = { 0 }; - uint8_t buffer[1024]; - struct spa_pod *param; - uint32_t index = 0; - struct spa_latency_info latency; - int res; - - spa_log_debug(this->log, "%p: ", this); - - if (this->target == this->follower) - return 0; - - while (true) { - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - if ((res = spa_node_port_enum_params_sync(this->follower, - direction, port_id, SPA_PARAM_Latency, - &index, NULL, ¶m, &b)) != 1) { - param = NULL; - break; - } - if ((res = spa_latency_parse(param, &latency)) < 0) - return res; - if (latency.direction == direction) - break; - } - if ((res = spa_node_port_set_param(this->target, - SPA_DIRECTION_REVERSE(direction), 0, - SPA_PARAM_Latency, 0, param)) < 0) - return res; - - return 0; -} - -static int recalc_tag(struct impl *this, enum spa_direction direction, uint32_t port_id) -{ - struct spa_pod_builder b = { 0 }; - uint8_t buffer[1024]; - struct spa_pod *param; - uint32_t index = 0; - struct spa_tag_info info; - int res; - - spa_log_debug(this->log, "%p: ", this); - - if (this->target == this->follower) - return 0; - - while (true) { - void *state = NULL; - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - if ((res = spa_node_port_enum_params_sync(this->follower, - direction, port_id, SPA_PARAM_Tag, - &index, NULL, ¶m, &b)) != 1) { - param = NULL; - break; - } - if ((res = spa_tag_parse(param, &info, &state)) < 0) - return res; - if (info.direction == direction) - break; - } - if ((res = spa_node_port_set_param(this->target, - SPA_DIRECTION_REVERSE(direction), 0, - SPA_PARAM_Tag, 0, param)) < 0) - return res; - - return 0; } static void follower_port_info(void *data, @@ -1167,9 +1230,10 @@ static void follower_port_info(void *data, SPA_PORT_FLAG_PHYSICAL | SPA_PORT_FLAG_TERMINAL); - spa_log_debug(this->log, "%p: follower port info %s %p %08"PRIx64, this, + spa_log_debug(this->log, "%p: follower port info %s %p %08"PRIx64" recalc:%u", this, this->direction == SPA_DIRECTION_INPUT ? - "Input" : "Output", info, info->change_mask); + "Input" : "Output", info, info->change_mask, + this->in_recalc); if (info->change_mask & SPA_PORT_CHANGE_MASK_PARAMS) { for (i = 0; i < info->n_params; i++) { @@ -1205,13 +1269,13 @@ static void follower_port_info(void *data, if (this->add_listener) continue; - if (idx == IDX_Latency) { - res = recalc_latency(this, direction, port_id); + if (idx == IDX_Latency && this->in_recalc == 0) { + res = recalc_latency(this, this->follower, direction, port_id, this->target); spa_log_debug(this->log, "latency: %d (%s)", res, spa_strerror(res)); } - if (idx == IDX_Tag) { - res = recalc_tag(this, direction, port_id); + if (idx == IDX_Tag && this->in_recalc == 0) { + res = recalc_tag(this, this->follower, direction, port_id, this->target); spa_log_debug(this->log, "tag: %d (%s)", res, spa_strerror(res)); } @@ -1439,7 +1503,6 @@ impl_node_port_set_param(void *object, const struct spa_pod *param) { struct impl *this = object; - int res; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -1448,18 +1511,8 @@ impl_node_port_set_param(void *object, if (direction != this->direction) port_id++; - if ((res = spa_node_port_set_param(this->target, direction, port_id, id, - flags, param)) < 0) - return res; - - if ((id == SPA_PARAM_Latency || id == SPA_PARAM_Tag) && - direction == this->direction) { - if ((res = spa_node_port_set_param(this->follower, direction, 0, id, - flags, param)) < 0) - return res; - } - - return res; + return spa_node_port_set_param(this->target, direction, port_id, id, + flags, param); } static int diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index 4e81e6c0c..ee67ac9d1 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -147,6 +147,9 @@ struct port { struct buffer buffers[MAX_BUFFERS]; uint32_t n_buffers; + struct spa_latency_info latency[2]; + unsigned int have_latency:1; + struct spa_audio_info format; unsigned int have_format:1; unsigned int is_dsp:1; @@ -172,7 +175,6 @@ struct dir { struct spa_audio_info format; unsigned int have_format:1; unsigned int have_profile:1; - struct spa_latency_info latency; struct spa_pod *tag; uint32_t remap[MAX_PORTS]; @@ -326,6 +328,8 @@ static int init_port(struct impl *this, enum spa_direction direction, uint32_t p } port->direction = direction; port->id = port_id; + port->latency[SPA_DIRECTION_INPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_INPUT); + port->latency[SPA_DIRECTION_OUTPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); name = spa_debug_type_find_short_name(spa_type_audio_channel, position); snprintf(port->position, sizeof(port->position), "%s", name ? name : "UNK"); @@ -1000,7 +1004,7 @@ static struct spa_pod *generate_ramp_up_seq(struct impl *this) spa_log_info(this->log, "generating ramp up sequence from %f to %f with a" " step value %f at scale %d", p->prev_volume, p->volume, volume_step, p->vrp.scale); do { - // spa_log_debug(this->log, "volume accum %f", get_volume_at_scale(this, volume_accum)); + spa_log_trace(this->log, "volume accum %f", get_volume_at_scale(this, volume_accum)); spa_pod_builder_control(&b.b, volume_offs, SPA_CONTROL_Properties); spa_pod_builder_add_object(&b.b, SPA_TYPE_OBJECT_Props, 0, @@ -1029,7 +1033,7 @@ static struct spa_pod *generate_ramp_down_seq(struct impl *this) spa_log_info(this->log, "generating ramp down sequence from %f to %f with a" " step value %f at scale %d", p->prev_volume, p->volume, volume_step, p->vrp.scale); do { - // spa_log_debug(this->log, "volume accum %f", get_volume_at_scale(this, volume_accum)); + spa_log_trace(this->log, "volume accum %f", get_volume_at_scale(this, volume_accum)); spa_pod_builder_control(&b.b, volume_offs, SPA_CONTROL_Properties); spa_pod_builder_add_object(&b.b, SPA_TYPE_OBJECT_Props, 0, @@ -2105,7 +2109,7 @@ impl_node_port_enum_params(void *object, int seq, uint32_t idx = result.index; if (port->is_monitor) idx = idx ^ 1; - param = spa_latency_build(&b, id, &this->dir[idx].latency); + param = spa_latency_build(&b, id, &port->latency[idx]); break; } default: @@ -2162,34 +2166,70 @@ static int port_set_latency(void *object, struct impl *this = object; struct port *port, *oport; enum spa_direction other = SPA_DIRECTION_REVERSE(direction); + struct spa_latency_info info; + bool have_latency, emit = false;; uint32_t i; - spa_log_debug(this->log, "%p: set latency direction:%d id:%d", - this, direction, port_id); + spa_log_debug(this->log, "%p: set latency direction:%d id:%d %p", + this, direction, port_id, latency); port = GET_PORT(this, direction, port_id); if (port->is_monitor) return 0; if (latency == NULL) { - this->dir[other].latency = SPA_LATENCY_INFO(other); + info = SPA_LATENCY_INFO(other); + have_latency = false; } else { - struct spa_latency_info info; if (spa_latency_parse(latency, &info) < 0 || info.direction != other) return -EINVAL; - this->dir[other].latency = info; + have_latency = true; } + emit = spa_latency_info_compare(&info, &port->latency[other]) != 0 || + port->have_latency == have_latency; + + port->latency[other] = info; + port->have_latency = have_latency; + + spa_log_debug(this->log, "%p: set %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, this, + info.direction == SPA_DIRECTION_INPUT ? "input" : "output", + info.min_quantum, info.max_quantum, + info.min_rate, info.max_rate, + info.min_ns, info.max_ns); + + spa_latency_info_combine_start(&info, other); + for (i = 0; i < this->dir[direction].n_ports; i++) { + oport = GET_PORT(this, direction, i); + if (oport->is_monitor || !oport->have_latency) + continue; + spa_log_debug(this->log, "%p: combine %d", this, i); + spa_latency_info_combine(&info, &oport->latency[other]); + } + spa_latency_info_combine_finish(&info); + + spa_log_debug(this->log, "%p: combined %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, this, + info.direction == SPA_DIRECTION_INPUT ? "input" : "output", + info.min_quantum, info.max_quantum, + info.min_rate, info.max_rate, + info.min_ns, info.max_ns); for (i = 0; i < this->dir[other].n_ports; i++) { oport = GET_PORT(this, other, i); - oport->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; - oport->params[IDX_Latency].user++; - emit_port_info(this, oport, false); + + spa_log_debug(this->log, "%p: change %d", this, i); + if (spa_latency_info_compare(&info, &oport->latency[other]) != 0) { + oport->latency[other] = info; + oport->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + oport->params[IDX_Latency].user++; + emit_port_info(this, oport, false); + } + } + if (emit) { + port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + port->params[IDX_Latency].user++; + emit_port_info(this, port, false); } - port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; - port->params[IDX_Latency].user++; - emit_port_info(this, port, false); return 0; } @@ -2211,23 +2251,23 @@ static int port_set_tag(void *object, if (port->is_monitor) return 0; - free(this->dir[other].tag); - this->dir[other].tag = NULL; - if (tag != NULL) { struct spa_tag_info info; void *state = NULL; if (spa_tag_parse(tag, &info, &state) < 0 || info.direction != other) return -EINVAL; - this->dir[other].tag = spa_pod_copy(tag); } + if (spa_tag_compare(tag, this->dir[other].tag) != 0) { + free(this->dir[other].tag); + this->dir[other].tag = tag ? spa_pod_copy(tag) : NULL; - for (i = 0; i < this->dir[other].n_ports; i++) { - oport = GET_PORT(this, other, i); - oport->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; - oport->params[IDX_Tag].user++; - emit_port_info(this, oport, false); + for (i = 0; i < this->dir[other].n_ports; i++) { + oport = GET_PORT(this, other, i); + oport->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + oport->params[IDX_Tag].user++; + emit_port_info(this, oport, false); + } } port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; port->params[IDX_Tag].user++; @@ -3326,9 +3366,7 @@ impl_init(const struct spa_handle_factory *factory, this->props.monitor.n_volumes = this->props.n_channels; this->dir[SPA_DIRECTION_INPUT].direction = SPA_DIRECTION_INPUT; - this->dir[SPA_DIRECTION_INPUT].latency = SPA_LATENCY_INFO(SPA_DIRECTION_INPUT); this->dir[SPA_DIRECTION_OUTPUT].direction = SPA_DIRECTION_OUTPUT; - this->dir[SPA_DIRECTION_OUTPUT].latency = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); this->node.iface = SPA_INTERFACE_INIT( SPA_TYPE_INTERFACE_Node, diff --git a/src/pipewire/impl-port.c b/src/pipewire/impl-port.c index add9e7aec..fbf4a4bf0 100644 --- a/src/pipewire/impl-port.c +++ b/src/pipewire/impl-port.c @@ -1549,6 +1549,7 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port) struct spa_pod_builder b = { 0 }; uint8_t buffer[1024]; bool changed; + int count = 0; if (port->destroying) return 0; @@ -1571,6 +1572,7 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port) latency.min_quantum, latency.max_quantum, latency.min_rate, latency.max_rate, latency.min_ns, latency.max_ns); + count++; } } else { spa_list_for_each(l, &port->links, input_link) { @@ -1586,13 +1588,16 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port) latency.min_quantum, latency.max_quantum, latency.min_rate, latency.max_rate, latency.min_ns, latency.max_ns); + count++; } } spa_latency_info_combine_finish(&latency); - current = &port->latency[latency.direction]; - - changed = spa_latency_info_compare(current, &latency) != 0; + current = port->have_latency ? &port->latency[latency.direction] : NULL; + if (current == NULL) + changed = count > 0; + else + changed = spa_latency_info_compare(current, &latency) != 0; pw_log_info("port %d: %s %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, port->info.id, changed ? "set" : "keep", @@ -1604,13 +1609,14 @@ int pw_impl_port_recalc_latency(struct pw_impl_port *port) if (!changed) return 0; - *current = latency; + port->latency[latency.direction] = latency; + port->have_latency = count > 0; if (!port->have_latency_param) return 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); - param = spa_latency_build(&b, SPA_PARAM_Latency, &latency); + param = port->have_latency ? spa_latency_build(&b, SPA_PARAM_Latency, &latency) : NULL; return pw_impl_port_set_param(port, SPA_PARAM_Latency, 0, param); } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index cc9567b27..e841162c7 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -847,6 +847,7 @@ struct pw_impl_port { struct spa_latency_info latency[2]; /**< latencies */ unsigned int have_latency_param:1; unsigned int ignore_latency:1; + unsigned int have_latency:1; unsigned int have_tag_param:1; struct spa_pod *tag[2]; /**< tags */ diff --git a/src/tools/pw-cat.c b/src/tools/pw-cat.c index d5665093e..e56348444 100644 --- a/src/tools/pw-cat.c +++ b/src/tools/pw-cat.c @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -1569,7 +1570,8 @@ int main(int argc, char *argv[]) { struct data data = { 0, }; struct pw_loop *l; - const struct spa_pod *params[1]; + const struct spa_pod *params[2]; + uint32_t n_params = 0; uint8_t buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); const char *prog; @@ -1886,7 +1888,7 @@ int main(int argc, char *argv[]) ret = av_codec_params_to_audio_info(&data, data.encoded.audio_stream->codecpar, &info); if (ret < 0) goto error_bad_file; - params[0] = spa_format_audio_build(&b, SPA_PARAM_EnumFormat, &info); + params[n_params++] = spa_format_audio_build(&b, SPA_PARAM_EnumFormat, &info); break; } #endif @@ -1902,11 +1904,11 @@ int main(int argc, char *argv[]) if (data.channelmap.n_channels) memcpy(info.position, data.channelmap.channels, data.channels * sizeof(int)); - params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info); + params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info); break; } case TYPE_MIDI: - params[0] = spa_pod_builder_add_object(&b, + params[n_params++] = spa_pod_builder_add_object(&b, SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_application), SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); @@ -1928,10 +1930,25 @@ int main(int argc, char *argv[]) memcpy(info.position, i->info.position, info.channels * sizeof(uint32_t)); } - params[0] = spa_format_audio_dsd_build(&b, SPA_PARAM_EnumFormat, &info); + params[n_params++] = spa_format_audio_dsd_build(&b, SPA_PARAM_EnumFormat, &info); break; } } + if (data.mode == mode_playback) { + struct spa_dict_item items[64]; + uint32_t i, n_items = 0; + + for (i = 0; i < data.props->dict.n_items; i++) { + if (spa_strstartswith(data.props->dict.items[i].key, "media.")) + items[n_items++] = data.props->dict.items[i]; + } + if (n_items > 0) { + struct spa_pod_frame f; + spa_tag_build_start(&b, &f, SPA_PARAM_Tag, SPA_DIRECTION_OUTPUT); + spa_tag_build_add_dict(&b, &SPA_DICT_INIT(items, n_items)); + params[n_params++] = spa_tag_build_end(&b, &f); + } + } data.stream = pw_stream_new(data.core, prog, data.props); data.props = NULL; @@ -1955,7 +1972,7 @@ int main(int argc, char *argv[]) PW_ID_ANY, flags | PW_STREAM_FLAG_MAP_BUFFERS, - params, 1); + params, n_params); if (ret < 0) { fprintf(stderr, "error: failed connect: %s\n", spa_strerror(ret)); goto error_connect_fail;