diff --git a/src/modules/module-combine-stream.c b/src/modules/module-combine-stream.c index 247818c6d..d1c8b3491 100644 --- a/src/modules/module-combine-stream.c +++ b/src/modules/module-combine-stream.c @@ -45,6 +45,7 @@ * - `node.name`: a unique name for the stream * - `node.description`: a human readable name for the stream * - `combine.mode` = capture | playback | sink | source, default sink + * - `combine.latency-compensate`: use delay buffers to match stream latencies * - `combine.props = {}`: properties to be passed to the sink/source * - `stream.props = {}`: properties to be passed to the streams * - `stream.rules = {}`: rules for matching streams, use create-stream actions @@ -80,6 +81,7 @@ * combine.mode = sink * node.name = "combine_sink" * node.description = "My Combine Sink" + * combine.latency-compensate = false * combine.props = { * audio.position = [ FL FR ] * } @@ -119,6 +121,7 @@ * combine.mode = sink * node.name = "combine_sink_5_1" * node.description = "My 5.1 Combine Sink" + * combine.latency-compensate = false * combine.props = { * audio.position = [ FL FR FC LFE SL SR ] * } @@ -214,6 +217,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); "( stream.props= ) " \ "( stream.rules= ) " +#define DELAYBUF_MAX_SIZE (20 * sizeof(float) * 96000) + static const struct spa_dict_item module_props[] = { { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, @@ -224,6 +229,7 @@ static const struct spa_dict_item module_props[] = { struct impl { struct pw_context *context; + struct pw_loop *main_loop; struct pw_data_loop *data_loop; struct pw_properties *props; @@ -244,6 +250,8 @@ struct impl { struct pw_registry *registry; struct spa_hook registry_listener; + struct spa_source *update_delay_event; + struct pw_properties *combine_props; struct pw_stream *combine; struct spa_hook combine_listener; @@ -259,11 +267,18 @@ struct impl { struct spa_audio_info_raw info; unsigned int do_disconnect:1; + unsigned int latency_compensate:1; struct spa_list streams; uint32_t n_streams; }; +struct ringbuffer { + void *buf; + uint32_t idx; + uint32_t size; +}; + struct stream { uint32_t id; @@ -278,6 +293,13 @@ struct stream { struct spa_audio_info_raw info; uint32_t remap[SPA_AUDIO_MAX_CHANNELS]; + uint32_t rate; + + void *delaybuf; + struct ringbuffer delay[SPA_AUDIO_MAX_CHANNELS]; + + int64_t delay_nsec; /* for main loop */ + int64_t data_delay_nsec; /* for data loop */ unsigned int ready:1; unsigned int added:1; @@ -324,6 +346,53 @@ static void parse_audio_info(const struct pw_properties *props, struct spa_audio parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); } +static void ringbuffer_init(struct ringbuffer *r, void *buf, uint32_t size) +{ + r->buf = buf; + r->idx = 0; + r->size = size; +} + +static void ringbuffer_memcpy(struct ringbuffer *r, void *dst, void *src, uint32_t size) +{ + uint32_t avail; + + avail = SPA_MIN(size, r->size); + + /* buf to dst */ + if (dst && avail > 0) { + spa_ringbuffer_read_data(NULL, r->buf, r->size, r->idx, dst, avail); + dst = SPA_PTROFF(dst, avail, void); + } + + /* src to dst */ + if (size > avail) { + if (dst) + memcpy(dst, src, size - avail); + src = SPA_PTROFF(src, size - avail, void); + } + + /* src to buf */ + if (avail > 0) { + spa_ringbuffer_write_data(NULL, r->buf, r->size, r->idx, src, avail); + r->idx = (r->idx + avail) % r->size; + } +} + +static void ringbuffer_copy(struct ringbuffer *dst, struct ringbuffer *src) +{ + uint32_t l0, l1; + + if (dst->size == 0 || src->size == 0) + return; + + l0 = src->size - src->idx; + l1 = src->idx; + + ringbuffer_memcpy(dst, NULL, SPA_PTROFF(src->buf, src->idx, void), l0); + ringbuffer_memcpy(dst, NULL, src->buf, l1); +} + static struct stream *find_stream(struct impl *impl, uint32_t id) { struct stream *s; @@ -347,6 +416,17 @@ static void apply_latency_offset(struct spa_latency_info *latency, int64_t offse latency->max_ns += SPA_MAX(offset, -(int64_t)latency->max_ns); } +static int64_t get_stream_delay(struct stream *s) +{ + struct pw_time t; + + if (pw_stream_get_time_n(s->stream, &t, sizeof(t)) < 0 || + t.rate.denom == 0) + return INT64_MIN; + + return t.delay * SPA_NSEC_PER_SEC * t.rate.num / t.rate.denom; +} + static void update_latency(struct impl *impl) { struct spa_latency_info latency; @@ -355,13 +435,28 @@ static void update_latency(struct impl *impl) if (impl->combine == NULL) return; - spa_latency_info_combine_start(&latency, get_combine_direction(impl)); + if (!impl->latency_compensate) { + spa_latency_info_combine_start(&latency, get_combine_direction(impl)); - spa_list_for_each(s, &impl->streams, link) - if (s->have_latency) - spa_latency_info_combine(&latency, &s->latency); + spa_list_for_each(s, &impl->streams, link) + if (s->have_latency) + spa_latency_info_combine(&latency, &s->latency); - spa_latency_info_combine_finish(&latency); + spa_latency_info_combine_finish(&latency); + } else { + int64_t max_delay = INT64_MIN; + + latency = SPA_LATENCY_INFO(get_combine_direction(impl)); + + spa_list_for_each(s, &impl->streams, link) { + int64_t delay = get_stream_delay(s); + + if (delay > max_delay && s->have_latency) { + latency = s->latency; + max_delay = delay; + } + } + } apply_latency_offset(&latency, impl->latency_offset); @@ -378,6 +473,121 @@ static void update_latency(struct impl *impl) } } +struct replace_delay_info { + struct stream *stream; + void *buf; + struct ringbuffer delay[SPA_AUDIO_MAX_CHANNELS]; +}; + +static int do_replace_delay(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct replace_delay_info *info = user_data; + unsigned int i; + + for (i = 0; i < SPA_N_ELEMENTS(info->stream->delay); ++i) { + ringbuffer_copy(&info->delay[i], &info->stream->delay[i]); + info->stream->delay[i] = info->delay[i]; + } + + SPA_SWAP(info->stream->delaybuf, info->buf); + return 0; +} + +static void resize_delay(struct stream *stream, uint32_t size) +{ + struct replace_delay_info info; + uint32_t channels = stream->info.channels; + unsigned int i; + + size = SPA_MIN(size, DELAYBUF_MAX_SIZE); + + for (i = 0; i < channels; ++i) + if (stream->delay[i].size != size) + break; + if (i == channels) + return; + + pw_log_info("stream %d latency compensation samples:%u", stream->id, + (unsigned int)(size / sizeof(float))); + + spa_zero(info); + info.stream = stream; + if (size > 0) + info.buf = calloc(channels, size); + if (!info.buf) + size = 0; + + for (i = 0; i < channels; ++i) + ringbuffer_init(&info.delay[i], SPA_PTROFF(info.buf, i*size, void), size); + + pw_data_loop_invoke(stream->impl->data_loop, do_replace_delay, 0, NULL, 0, true, &info); + + free(info.buf); +} + +static void update_delay(struct impl *impl) +{ + struct stream *s; + int64_t max_delay = INT64_MIN; + + if (!impl->latency_compensate) + return; + + spa_list_for_each(s, &impl->streams, link) { + int64_t delay = get_stream_delay(s); + + if (delay != s->delay_nsec && delay != INT64_MIN) + pw_log_debug("stream %d delay:%"PRIi64" ns", s->id, delay); + + max_delay = SPA_MAX(max_delay, delay); + s->delay_nsec = delay; + } + + spa_list_for_each(s, &impl->streams, link) { + uint32_t size = 0; + + if (s->delay_nsec != INT64_MIN) { + int64_t delay = max_delay - s->delay_nsec; + size = delay * s->rate / SPA_NSEC_PER_SEC; + size *= sizeof(float); + } + + resize_delay(s, size); + } + + update_latency(impl); +} + +static void update_delay_event(void *data, uint64_t count) +{ + struct impl *impl = data; + + /* in main loop */ + update_delay(impl); +} + +static int do_clear_delaybuf(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + struct impl *impl = user_data; + struct stream *s; + unsigned int i; + + spa_list_for_each(s, &impl->streams, link) { + for (i = 0; i < SPA_N_ELEMENTS(s->delay); ++i) + if (s->delay[i].size) + memset(s->delay[i].buf, 0, s->delay[i].size); + } + + return 0; +} + +static void clear_delaybuf(struct impl *impl) +{ + pw_data_loop_invoke(impl->data_loop, do_clear_delaybuf, 0, NULL, 0, true, impl); +} + static int do_add_stream(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { @@ -413,6 +623,8 @@ static void destroy_stream(struct stream *s) spa_hook_remove(&s->stream_listener); pw_stream_destroy(s->stream); } + + free(s->delaybuf); free(s); } @@ -462,8 +674,24 @@ static void stream_param_changed(void *d, uint32_t id, const struct spa_pod *par { struct stream *s = d; struct spa_latency_info latency; + struct spa_audio_info format = { 0 }; switch (id) { + case SPA_PARAM_Format: + if (!param) { + s->rate = 0; + } else { + if (spa_format_parse(param, &format.media_type, &format.media_subtype) < 0) + break; + if (format.media_type != SPA_MEDIA_TYPE_audio || + format.media_subtype != SPA_MEDIA_SUBTYPE_raw) + break; + if (spa_format_audio_raw_parse(param, &format.info.raw) < 0) + break; + s->rate = format.info.raw.rate; + } + update_delay(s->impl); + break; case SPA_PARAM_Latency: if (!param) { s->have_latency = false; @@ -602,6 +830,7 @@ static int create_stream(struct stream_info *info) goto error; pw_data_loop_invoke(impl->data_loop, do_add_stream, 0, NULL, 0, true, s); + update_delay(impl); return 0; error_errno: @@ -673,6 +902,7 @@ static void registry_event_global_remove(void *data, uint32_t id) return; destroy_stream(s); + update_delay(impl); } static const struct pw_registry_events registry_events = { @@ -698,6 +928,7 @@ static void combine_state_changed(void *d, enum pw_stream_state old, pw_impl_module_schedule_destroy(impl->module); break; case PW_STREAM_STATE_PAUSED: + clear_delaybuf(impl); impl->combine_id = pw_stream_get_node_id(impl->combine); pw_log_info("got combine id %d", impl->combine_id); break; @@ -708,11 +939,27 @@ static void combine_state_changed(void *d, enum pw_stream_state old, } } +static bool check_stream_delay(struct stream *s) +{ + int64_t delay; + + if (!s->impl->latency_compensate) + return false; + + delay = get_stream_delay(s); + if (delay == INT64_MIN || delay == s->data_delay_nsec) + return false; + + s->data_delay_nsec = delay; + return true; +} + static void combine_input_process(void *d) { struct impl *impl = d; struct pw_buffer *in, *out; struct stream *s; + bool delay_changed = false; if ((in = pw_stream_dequeue_buffer(impl->combine)) == NULL) { pw_log_debug("out of buffers: %m"); @@ -725,6 +972,9 @@ static void combine_input_process(void *d) if (s->stream == NULL) continue; + if (check_stream_delay(s)) + delay_changed = true; + if ((out = pw_stream_dequeue_buffer(s->stream)) == NULL) { pw_log_warn("out of playback buffers: %m"); goto do_trigger; @@ -746,8 +996,8 @@ static void combine_input_process(void *d) offs = SPA_MIN(ds->chunk->offset, ds->maxsize); size = SPA_MIN(ds->chunk->size, ds->maxsize - offs); - memcpy(dd->data, - SPA_PTROFF(ds->data, offs, void), size); + ringbuffer_memcpy(&s->delay[j], + dd->data, SPA_PTROFF(ds->data, offs, void), size); outsize = SPA_MAX(outsize, size); stride = SPA_MAX(stride, ds->chunk->stride); @@ -763,6 +1013,12 @@ do_trigger: pw_stream_trigger_process(s->stream); } pw_stream_queue_buffer(impl->combine, in); + + /* Update delay if quantum etc. has changed. + * This should be rare enough so that doing it via main loop doesn't matter. + */ + if (impl->latency_compensate && delay_changed) + pw_loop_signal_event(impl->main_loop, impl->update_delay_event); } static void combine_output_process(void *d) @@ -770,6 +1026,7 @@ static void combine_output_process(void *d) struct impl *impl = d; struct pw_buffer *in, *out; struct stream *s; + bool delay_changed = false; if ((out = pw_stream_dequeue_buffer(impl->combine)) == NULL) { pw_log_debug("out of buffers: %m"); @@ -782,6 +1039,9 @@ static void combine_output_process(void *d) if (s->stream == NULL) continue; + if (check_stream_delay(s)) + delay_changed = true; + if ((in = pw_stream_dequeue_buffer(s->stream)) == NULL) { pw_log_warn("%p: out of capture buffers: %m", s); continue; @@ -806,8 +1066,8 @@ static void combine_output_process(void *d) size = SPA_MIN(ds->chunk->size, ds->maxsize - offs); size = SPA_MIN(size, dd->maxsize); - memcpy(dd->data, - SPA_PTROFF(ds->data, offs, void), size); + ringbuffer_memcpy(&s->delay[j], + dd->data, SPA_PTROFF(ds->data, offs, void), size); outsize = SPA_MAX(outsize, size); stride = SPA_MAX(stride, ds->chunk->stride); @@ -820,6 +1080,9 @@ static void combine_output_process(void *d) pw_stream_queue_buffer(s->stream, in); } pw_stream_queue_buffer(impl->combine, out); + + if (impl->latency_compensate && delay_changed) + pw_loop_signal_event(impl->main_loop, impl->update_delay_event); } static void combine_param_changed(void *d, uint32_t id, const struct spa_pod *param) @@ -967,6 +1230,9 @@ static void impl_destroy(struct impl *impl) if (impl->combine) pw_stream_destroy(impl->combine); + if (impl->update_delay_event) + pw_loop_destroy_source(impl->main_loop, impl->update_delay_event); + if (impl->registry) { spa_hook_remove(&impl->registry_listener); pw_proxy_destroy((struct pw_proxy*)impl->registry); @@ -1026,6 +1292,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) return -errno; pw_log_debug("module %p: new %s", impl, args); + impl->main_loop = pw_context_get_main_loop(context); impl->data_loop = pw_context_get_data_loop(context); spa_list_init(&impl->streams); @@ -1062,6 +1329,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) prefix = "sink"; } + if ((str = pw_properties_get(props, "combine.latency-compensate")) != NULL) + impl->latency_compensate = spa_atob(str); + impl->combine_props = pw_properties_new(NULL, NULL); impl->stream_props = pw_properties_new(NULL, NULL); if (impl->combine_props == NULL || impl->stream_props == NULL) { @@ -1128,6 +1398,16 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (pw_properties_get(impl->stream_props, PW_KEY_NODE_DONT_RECONNECT) == NULL) pw_properties_set(impl->stream_props, PW_KEY_NODE_DONT_RECONNECT, "true"); + if (impl->latency_compensate) { + impl->update_delay_event = pw_loop_add_event(impl->main_loop, + update_delay_event, impl); + if (impl->update_delay_event == NULL) { + res = -errno; + pw_log_error("can't create event source: %m"); + goto error; + } + } + impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { str = pw_properties_get(props, PW_KEY_REMOTE_NAME);