diff --git a/src/modules/module-pipe-tunnel.c b/src/modules/module-pipe-tunnel.c index 8597162a5..eaf6aca11 100644 --- a/src/modules/module-pipe-tunnel.c +++ b/src/modules/module-pipe-tunnel.c @@ -161,6 +161,7 @@ struct impl { unsigned int unlink_fifo; int fd; struct spa_source *socket; + struct spa_source *timer; struct pw_properties *stream_props; enum pw_direction direction; @@ -170,15 +171,78 @@ struct impl { uint32_t frame_size; unsigned int do_disconnect:1; + unsigned int driving:1; + unsigned int have_sync:1; struct spa_ringbuffer ring; void *buffer; uint32_t target_buffer; + struct spa_io_rate_match *rate_match; + struct spa_io_position *position; + struct spa_dll dll; float max_error; + float corr; + + uint64_t next_time; }; +static uint64_t get_time_ns(struct impl *impl) +{ + struct timespec now; + if (spa_system_clock_gettime(impl->data_loop->system, CLOCK_MONOTONIC, &now) < 0) + return 0; + return SPA_TIMESPEC_TO_NSEC(&now); +} + +static int set_timeout(struct impl *impl, uint64_t time) +{ + struct timespec timeout, interval; + timeout.tv_sec = time / SPA_NSEC_PER_SEC; + timeout.tv_nsec = time % SPA_NSEC_PER_SEC; + interval.tv_sec = 0; + interval.tv_nsec = 0; + pw_loop_update_timer(impl->data_loop, + impl->timer, &timeout, &interval, true); + return 0; +} + +static void on_timeout(void *d, uint64_t expirations) +{ + struct impl *impl = d; + uint64_t duration, current_time; + uint32_t rate, index; + int32_t avail; + struct spa_io_position *pos = impl->position; + + if (SPA_LIKELY(pos)) { + duration = pos->clock.target_duration; + rate = pos->clock.target_rate.denom; + } else { + duration = 1024; + rate = 48000; + } + pw_log_debug("timeout %"PRIu64, duration); + + current_time = impl->next_time; + impl->next_time += duration / impl->corr * 1e9 / rate; + avail = spa_ringbuffer_get_read_index(&impl->ring, &index); + + if (SPA_LIKELY(pos)) { + pos->clock.nsec = current_time; + pos->clock.rate = pos->clock.target_rate; + pos->clock.position += pos->clock.duration; + pos->clock.duration = pos->clock.target_duration; + pos->clock.delay = SPA_SCALE32_UP(avail, rate, impl->info.rate); + pos->clock.rate_diff = impl->corr; + pos->clock.next_nsec = impl->next_time; + } + set_timeout(impl, impl->next_time); + + pw_stream_trigger_process(impl->stream); +} + static void stream_destroy(void *d) { struct impl *impl = d; @@ -196,12 +260,20 @@ static void stream_state_changed(void *d, enum pw_stream_state old, pw_impl_module_schedule_destroy(impl->module); break; case PW_STREAM_STATE_PAUSED: - if (impl->direction == PW_DIRECTION_OUTPUT) + if (impl->direction == PW_DIRECTION_OUTPUT) { pw_loop_update_io(impl->data_loop, impl->socket, 0); + set_timeout(impl, 0); + } break; case PW_STREAM_STATE_STREAMING: - if (impl->direction == PW_DIRECTION_OUTPUT) + if (impl->direction == PW_DIRECTION_OUTPUT) { pw_loop_update_io(impl->data_loop, impl->socket, SPA_IO_IN); + impl->driving = pw_stream_is_driving(impl->stream); + if (impl->driving) { + impl->next_time = get_time_ns(impl); + set_timeout(impl, impl->next_time); + } + } break; default: break; @@ -252,7 +324,7 @@ static void playback_stream_process(void *data) static void update_rate(struct impl *impl, uint32_t filled) { - float error, corr; + float error; if (impl->rate_match == NULL) return; @@ -260,12 +332,14 @@ static void update_rate(struct impl *impl, uint32_t filled) error = (float)impl->target_buffer - (float)(filled); error = SPA_CLAMP(error, -impl->max_error, impl->max_error); - corr = spa_dll_update(&impl->dll, error); - pw_log_info("error:%f corr:%f current:%u target:%u", - error, corr, filled, impl->target_buffer); + impl->corr = spa_dll_update(&impl->dll, error); + pw_log_debug("error:%f corr:%f current:%u target:%u", + error, impl->corr, filled, impl->target_buffer); - SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); - impl->rate_match->rate = 1.0f / corr; + if (!impl->driving) { + SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); + impl->rate_match->rate = 1.0f / impl->corr; + } } static void capture_stream_process(void *data) @@ -293,11 +367,16 @@ static void capture_stream_process(void *data) pw_log_debug("avail %d %u %u", avail, index, size); - if (avail < (int32_t)size) + if (avail < (int32_t)size) { memset(bd->data, 0, size); + if (avail > 0) + pw_log_warn("underrun %d < %u", avail, size); + impl->have_sync = false; + } if (avail > (int32_t)RINGBUFFER_SIZE) { - avail = impl->target_buffer; index += avail - impl->target_buffer; + avail = impl->target_buffer; + pw_log_warn("overrun %d > %u", avail, RINGBUFFER_SIZE); } if (avail > 0) { avail = SPA_ROUND_DOWN(avail, impl->frame_size); @@ -326,6 +405,9 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size case SPA_IO_RateMatch: impl->rate_match = area; break; + case SPA_IO_Position: + impl->position = area; + break; } } @@ -403,6 +485,10 @@ static int handle_pipe_read(struct impl *impl) struct iovec iov[2]; filled = spa_ringbuffer_get_write_index(&impl->ring, &index); + if (!impl->have_sync) { + memset(impl->buffer, 0, RINGBUFFER_SIZE); + } + if (filled < 0) { pw_log_warn("%p: underrun write:%u filled:%d", impl, index, filled); @@ -425,6 +511,16 @@ static int handle_pipe_read(struct impl *impl) } } } + if (!impl->have_sync) { + impl->ring.readindex = index - impl->target_buffer; + + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256.f, impl->info.rate); + impl->corr = 1.0f; + + pw_log_info("resync"); + impl->have_sync = true; + } spa_ringbuffer_write_update(&impl->ring, index); if (nread < 0) { @@ -509,6 +605,12 @@ static int create_fifo(struct impl *impl) pw_log_error("can't create socket"); goto error; } + impl->timer = pw_loop_add_timer(impl->data_loop, on_timeout, impl); + if (impl->timer == NULL) { + res = -errno; + pw_log_error("can't create timer"); + goto error; + } pw_log_info("%s fifo '%s' with format:%s channels:%d rate:%d", impl->direction == PW_DIRECTION_OUTPUT ? "reading from" : "writing to", @@ -572,6 +674,8 @@ static void impl_destroy(struct impl *impl) } if (impl->socket) pw_loop_destroy_source(impl->data_loop, impl->socket); + if (impl->timer) + pw_loop_destroy_source(impl->data_loop, impl->timer); if (impl->fd >= 0) close(impl->fd); @@ -808,6 +912,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) spa_dll_init(&impl->dll); spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256.f, impl->info.rate); impl->max_error = 256.0f * impl->frame_size; + impl->corr = 1.0f; impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { diff --git a/src/modules/module-protocol-pulse/modules/module-pipe-source.c b/src/modules/module-protocol-pulse/modules/module-pipe-source.c index ec077ab9d..1261c4ec5 100644 --- a/src/modules/module-protocol-pulse/modules/module-pipe-source.c +++ b/src/modules/module-protocol-pulse/modules/module-pipe-source.c @@ -150,10 +150,10 @@ static int module_pipe_source_prepare(struct module * const module) pw_properties_set(stream_props, PW_KEY_NODE_NAME, "fifo_input"); -// if ((str = pw_properties_get(stream_props, PW_KEY_NODE_DRIVER)) == NULL) -// pw_properties_set(stream_props, PW_KEY_NODE_DRIVER, "true"); -// if ((str = pw_properties_get(stream_props, PW_KEY_PRIORITY_DRIVER)) == NULL) -// pw_properties_set(stream_props, PW_KEY_PRIORITY_DRIVER, "50000"); + if ((str = pw_properties_get(stream_props, PW_KEY_NODE_DRIVER)) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_DRIVER, "true"); + if ((str = pw_properties_get(stream_props, PW_KEY_PRIORITY_DRIVER)) == NULL) + pw_properties_set(stream_props, PW_KEY_PRIORITY_DRIVER, "50000"); d->module = module; d->stream_props = stream_props;