pipe-tunnel: support pipe-source as a driver

Use a timer to periodically schedule the pipe-source when configured
as a driver. Adjust the timeout period based on buffer fill levels.

Fixes #3478
This commit is contained in:
Wim Taymans 2023-08-31 11:41:00 +02:00
parent f25da522a4
commit 9f30e58ef2
2 changed files with 119 additions and 14 deletions

View file

@ -161,6 +161,7 @@ struct impl {
unsigned int unlink_fifo;
int fd;
struct spa_source *socket;
struct spa_source *timer;
struct pw_properties *stream_props;
enum pw_direction direction;
@ -170,15 +171,78 @@ struct impl {
uint32_t frame_size;
unsigned int do_disconnect:1;
unsigned int driving:1;
unsigned int have_sync:1;
struct spa_ringbuffer ring;
void *buffer;
uint32_t target_buffer;
struct spa_io_rate_match *rate_match;
struct spa_io_position *position;
struct spa_dll dll;
float max_error;
float corr;
uint64_t next_time;
};
static uint64_t get_time_ns(struct impl *impl)
{
struct timespec now;
if (spa_system_clock_gettime(impl->data_loop->system, CLOCK_MONOTONIC, &now) < 0)
return 0;
return SPA_TIMESPEC_TO_NSEC(&now);
}
static int set_timeout(struct impl *impl, uint64_t time)
{
struct timespec timeout, interval;
timeout.tv_sec = time / SPA_NSEC_PER_SEC;
timeout.tv_nsec = time % SPA_NSEC_PER_SEC;
interval.tv_sec = 0;
interval.tv_nsec = 0;
pw_loop_update_timer(impl->data_loop,
impl->timer, &timeout, &interval, true);
return 0;
}
static void on_timeout(void *d, uint64_t expirations)
{
struct impl *impl = d;
uint64_t duration, current_time;
uint32_t rate, index;
int32_t avail;
struct spa_io_position *pos = impl->position;
if (SPA_LIKELY(pos)) {
duration = pos->clock.target_duration;
rate = pos->clock.target_rate.denom;
} else {
duration = 1024;
rate = 48000;
}
pw_log_debug("timeout %"PRIu64, duration);
current_time = impl->next_time;
impl->next_time += duration / impl->corr * 1e9 / rate;
avail = spa_ringbuffer_get_read_index(&impl->ring, &index);
if (SPA_LIKELY(pos)) {
pos->clock.nsec = current_time;
pos->clock.rate = pos->clock.target_rate;
pos->clock.position += pos->clock.duration;
pos->clock.duration = pos->clock.target_duration;
pos->clock.delay = SPA_SCALE32_UP(avail, rate, impl->info.rate);
pos->clock.rate_diff = impl->corr;
pos->clock.next_nsec = impl->next_time;
}
set_timeout(impl, impl->next_time);
pw_stream_trigger_process(impl->stream);
}
static void stream_destroy(void *d)
{
struct impl *impl = d;
@ -196,12 +260,20 @@ static void stream_state_changed(void *d, enum pw_stream_state old,
pw_impl_module_schedule_destroy(impl->module);
break;
case PW_STREAM_STATE_PAUSED:
if (impl->direction == PW_DIRECTION_OUTPUT)
if (impl->direction == PW_DIRECTION_OUTPUT) {
pw_loop_update_io(impl->data_loop, impl->socket, 0);
set_timeout(impl, 0);
}
break;
case PW_STREAM_STATE_STREAMING:
if (impl->direction == PW_DIRECTION_OUTPUT)
if (impl->direction == PW_DIRECTION_OUTPUT) {
pw_loop_update_io(impl->data_loop, impl->socket, SPA_IO_IN);
impl->driving = pw_stream_is_driving(impl->stream);
if (impl->driving) {
impl->next_time = get_time_ns(impl);
set_timeout(impl, impl->next_time);
}
}
break;
default:
break;
@ -252,7 +324,7 @@ static void playback_stream_process(void *data)
static void update_rate(struct impl *impl, uint32_t filled)
{
float error, corr;
float error;
if (impl->rate_match == NULL)
return;
@ -260,12 +332,14 @@ static void update_rate(struct impl *impl, uint32_t filled)
error = (float)impl->target_buffer - (float)(filled);
error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
corr = spa_dll_update(&impl->dll, error);
pw_log_info("error:%f corr:%f current:%u target:%u",
error, corr, filled, impl->target_buffer);
impl->corr = spa_dll_update(&impl->dll, error);
pw_log_debug("error:%f corr:%f current:%u target:%u",
error, impl->corr, filled, impl->target_buffer);
if (!impl->driving) {
SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE);
impl->rate_match->rate = 1.0f / corr;
impl->rate_match->rate = 1.0f / impl->corr;
}
}
static void capture_stream_process(void *data)
@ -293,11 +367,16 @@ static void capture_stream_process(void *data)
pw_log_debug("avail %d %u %u", avail, index, size);
if (avail < (int32_t)size)
if (avail < (int32_t)size) {
memset(bd->data, 0, size);
if (avail > 0)
pw_log_warn("underrun %d < %u", avail, size);
impl->have_sync = false;
}
if (avail > (int32_t)RINGBUFFER_SIZE) {
avail = impl->target_buffer;
index += avail - impl->target_buffer;
avail = impl->target_buffer;
pw_log_warn("overrun %d > %u", avail, RINGBUFFER_SIZE);
}
if (avail > 0) {
avail = SPA_ROUND_DOWN(avail, impl->frame_size);
@ -326,6 +405,9 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size
case SPA_IO_RateMatch:
impl->rate_match = area;
break;
case SPA_IO_Position:
impl->position = area;
break;
}
}
@ -403,6 +485,10 @@ static int handle_pipe_read(struct impl *impl)
struct iovec iov[2];
filled = spa_ringbuffer_get_write_index(&impl->ring, &index);
if (!impl->have_sync) {
memset(impl->buffer, 0, RINGBUFFER_SIZE);
}
if (filled < 0) {
pw_log_warn("%p: underrun write:%u filled:%d",
impl, index, filled);
@ -425,6 +511,16 @@ static int handle_pipe_read(struct impl *impl)
}
}
}
if (!impl->have_sync) {
impl->ring.readindex = index - impl->target_buffer;
spa_dll_init(&impl->dll);
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256.f, impl->info.rate);
impl->corr = 1.0f;
pw_log_info("resync");
impl->have_sync = true;
}
spa_ringbuffer_write_update(&impl->ring, index);
if (nread < 0) {
@ -509,6 +605,12 @@ static int create_fifo(struct impl *impl)
pw_log_error("can't create socket");
goto error;
}
impl->timer = pw_loop_add_timer(impl->data_loop, on_timeout, impl);
if (impl->timer == NULL) {
res = -errno;
pw_log_error("can't create timer");
goto error;
}
pw_log_info("%s fifo '%s' with format:%s channels:%d rate:%d",
impl->direction == PW_DIRECTION_OUTPUT ? "reading from" : "writing to",
@ -572,6 +674,8 @@ static void impl_destroy(struct impl *impl)
}
if (impl->socket)
pw_loop_destroy_source(impl->data_loop, impl->socket);
if (impl->timer)
pw_loop_destroy_source(impl->data_loop, impl->timer);
if (impl->fd >= 0)
close(impl->fd);
@ -808,6 +912,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
spa_dll_init(&impl->dll);
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256.f, impl->info.rate);
impl->max_error = 256.0f * impl->frame_size;
impl->corr = 1.0f;
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {

View file

@ -150,10 +150,10 @@ static int module_pipe_source_prepare(struct module * const module)
pw_properties_set(stream_props, PW_KEY_NODE_NAME,
"fifo_input");
// if ((str = pw_properties_get(stream_props, PW_KEY_NODE_DRIVER)) == NULL)
// pw_properties_set(stream_props, PW_KEY_NODE_DRIVER, "true");
// if ((str = pw_properties_get(stream_props, PW_KEY_PRIORITY_DRIVER)) == NULL)
// pw_properties_set(stream_props, PW_KEY_PRIORITY_DRIVER, "50000");
if ((str = pw_properties_get(stream_props, PW_KEY_NODE_DRIVER)) == NULL)
pw_properties_set(stream_props, PW_KEY_NODE_DRIVER, "true");
if ((str = pw_properties_get(stream_props, PW_KEY_PRIORITY_DRIVER)) == NULL)
pw_properties_set(stream_props, PW_KEY_PRIORITY_DRIVER, "50000");
d->module = module;
d->stream_props = stream_props;