diff --git a/src/modules/module-pipe-tunnel.c b/src/modules/module-pipe-tunnel.c index 48321145f..8597162a5 100644 --- a/src/modules/module-pipe-tunnel.c +++ b/src/modules/module-pipe-tunnel.c @@ -111,6 +111,9 @@ #define DEFAULT_CHANNELS 2 #define DEFAULT_POSITION "[ FL FR ]" +#define RINGBUFFER_SIZE (1u << 22) +#define RINGBUFFER_MASK (RINGBUFFER_SIZE-1) + PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic @@ -137,6 +140,7 @@ static const struct spa_dict_item module_props[] = { struct impl { struct pw_context *context; + struct pw_loop *data_loop; #define MODE_PLAYBACK 0 #define MODE_CAPTURE 1 @@ -156,6 +160,7 @@ struct impl { char *filename; unsigned int unlink_fifo; int fd; + struct spa_source *socket; struct pw_properties *stream_props; enum pw_direction direction; @@ -165,8 +170,13 @@ struct impl { uint32_t frame_size; unsigned int do_disconnect:1; - uint32_t leftover_count; - uint8_t *leftover; + + struct spa_ringbuffer ring; + void *buffer; + uint32_t target_buffer; + struct spa_io_rate_match *rate_match; + struct spa_dll dll; + float max_error; }; static void stream_destroy(void *d) @@ -186,8 +196,12 @@ static void stream_state_changed(void *d, enum pw_stream_state old, pw_impl_module_schedule_destroy(impl->module); break; case PW_STREAM_STATE_PAUSED: + if (impl->direction == PW_DIRECTION_OUTPUT) + pw_loop_update_io(impl->data_loop, impl->socket, 0); break; case PW_STREAM_STATE_STREAMING: + if (impl->direction == PW_DIRECTION_OUTPUT) + pw_loop_update_io(impl->data_loop, impl->socket, SPA_IO_IN); break; default: break; @@ -221,9 +235,12 @@ static void playback_stream_process(void *data) continue; } else if (errno == EAGAIN || errno == EWOULDBLOCK) { /* Don't continue writing */ + pw_log_debug("pipe (%s) overrun: %m", + impl->filename); break; } else { - pw_log_warn("Failed to write to pipe sink"); + pw_log_warn("Failed to write to pipe (%s): %m", + impl->filename); } } offs += written; @@ -233,53 +250,85 @@ static void playback_stream_process(void *data) pw_stream_queue_buffer(impl->stream, buf); } +static void update_rate(struct impl *impl, uint32_t filled) +{ + float error, corr; + + if (impl->rate_match == NULL) + return; + + error = (float)impl->target_buffer - (float)(filled); + error = SPA_CLAMP(error, -impl->max_error, impl->max_error); + + corr = spa_dll_update(&impl->dll, error); + pw_log_info("error:%f corr:%f current:%u target:%u", + error, corr, filled, impl->target_buffer); + + SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); + impl->rate_match->rate = 1.0f / corr; +} + static void capture_stream_process(void *data) { struct impl *impl = data; struct pw_buffer *buf; - struct spa_data *d; - uint32_t req; - ssize_t nread; + struct spa_data *bd; + uint32_t req, index, size; + int32_t avail; if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("out of buffers: %m"); + pw_log_warn("out of buffers: %m"); return; } - d = &buf->buffer->datas[0]; + bd = &buf->buffer->datas[0]; if ((req = buf->requested * impl->frame_size) == 0) req = 4096 * impl->frame_size; - req = SPA_MIN(req, d->maxsize); + size = SPA_MIN(req, bd->maxsize); + size = SPA_ROUND_DOWN(size, impl->frame_size); - d->chunk->offset = 0; - d->chunk->stride = impl->frame_size; - d->chunk->size = SPA_MIN(req, impl->leftover_count); - memcpy(d->data, impl->leftover, d->chunk->size); - req -= d->chunk->size; + avail = spa_ringbuffer_get_read_index(&impl->ring, &index); - nread = read(impl->fd, SPA_PTROFF(d->data, d->chunk->size, void), req); - if (nread < 0) { - const bool important = !(errno == EINTR - || errno == EAGAIN - || errno == EWOULDBLOCK); + pw_log_debug("avail %d %u %u", avail, index, size); - if (important) - pw_log_warn("failed to read from pipe (%s): %s", - impl->filename, strerror(errno)); - } - else { - d->chunk->size += nread; + if (avail < (int32_t)size) + memset(bd->data, 0, size); + if (avail > (int32_t)RINGBUFFER_SIZE) { + avail = impl->target_buffer; + index += avail - impl->target_buffer; } + if (avail > 0) { + avail = SPA_ROUND_DOWN(avail, impl->frame_size); + update_rate(impl, avail); - impl->leftover_count = d->chunk->size % impl->frame_size; - d->chunk->size -= impl->leftover_count; - memcpy(impl->leftover, SPA_PTROFF(d->data, d->chunk->size, void), impl->leftover_count); + avail = SPA_MIN(size, (uint32_t)avail); + spa_ringbuffer_read_data(&impl->ring, + impl->buffer, RINGBUFFER_SIZE, + index & RINGBUFFER_MASK, + bd->data, avail); + + index += avail; + spa_ringbuffer_read_update(&impl->ring, index); + } + bd->chunk->offset = 0; + bd->chunk->size = size; + bd->chunk->stride = impl->frame_size; pw_stream_queue_buffer(impl->stream, buf); } +static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + struct impl *impl = data; + switch (id) { + case SPA_IO_RateMatch: + impl->rate_match = area; + break; + } +} + static const struct pw_stream_events playback_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, @@ -290,8 +339,9 @@ static const struct pw_stream_events playback_stream_events = { static const struct pw_stream_events capture_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, + .io_changed = stream_io_changed, .state_changed = stream_state_changed, - .process = capture_stream_process + .process = capture_stream_process, }; static int create_stream(struct impl *impl) @@ -335,6 +385,78 @@ static int create_stream(struct impl *impl) return 0; } +static inline void +set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, + uint32_t offset, struct iovec *iov, uint32_t len) +{ + iov[0].iov_len = SPA_MIN(len, size - offset); + iov[0].iov_base = SPA_PTROFF(buffer, offset, void); + iov[1].iov_len = len - iov[0].iov_len; + iov[1].iov_base = buffer; +} + +static int handle_pipe_read(struct impl *impl) +{ + ssize_t nread; + int32_t filled; + uint32_t index; + struct iovec iov[2]; + + filled = spa_ringbuffer_get_write_index(&impl->ring, &index); + if (filled < 0) { + pw_log_warn("%p: underrun write:%u filled:%d", + impl, index, filled); + } + + set_iovec(&impl->ring, + impl->buffer, RINGBUFFER_SIZE, + index & RINGBUFFER_MASK, + iov, RINGBUFFER_SIZE); + + nread = read(impl->fd, iov[0].iov_base, iov[0].iov_len); + if (nread > 0) { + index += nread; + filled += nread; + if (nread == (ssize_t)iov[0].iov_len) { + nread = read(impl->fd, iov[1].iov_base, iov[1].iov_len); + if (nread > 0) { + index += nread; + filled += nread; + } + } + } + spa_ringbuffer_write_update(&impl->ring, index); + + if (nread < 0) { + const bool important = !(errno == EINTR + || errno == EAGAIN + || errno == EWOULDBLOCK); + + if (important) + pw_log_warn("failed to read from pipe (%s): %m", + impl->filename); + else + pw_log_debug("pipe (%s) underrun: %m", impl->filename); + } + pw_log_debug("filled %d %u %d", filled, index, impl->target_buffer); + + return 0; +} + + +static void on_pipe_io(void *data, int fd, uint32_t mask) +{ + struct impl *impl = data; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + pw_log_warn("error:%08x", mask); + pw_loop_update_io(impl->data_loop, impl->socket, 0); + return; + } + if (mask & SPA_IO_IN) + handle_pipe_read(impl); +} + static int create_fifo(struct impl *impl) { struct stat st; @@ -363,7 +485,6 @@ static int create_fifo(struct impl *impl) do_unlink_fifo = true; } - if ((fd = open(filename, O_RDWR | O_CLOEXEC | O_NONBLOCK, 0)) < 0) { res = -errno; pw_log_error("open('%s'): %s", filename, spa_strerror(res)); @@ -381,6 +502,14 @@ static int create_fifo(struct impl *impl) pw_log_error("'%s' is not a FIFO.", filename); goto error; } + impl->socket = pw_loop_add_io(impl->data_loop, fd, + 0, false, on_pipe_io, impl); + if (impl->socket == NULL) { + res = -errno; + pw_log_error("can't create socket"); + goto error; + } + pw_log_info("%s fifo '%s' with format:%s channels:%d rate:%d", impl->direction == PW_DIRECTION_OUTPUT ? "reading from" : "writing to", filename, @@ -390,6 +519,7 @@ static int create_fifo(struct impl *impl) impl->filename = strdup(filename); impl->unlink_fifo = do_unlink_fifo; impl->fd = fd; + return 0; error: @@ -440,13 +570,15 @@ static void impl_destroy(struct impl *impl) unlink(impl->filename); free(impl->filename); } + if (impl->socket) + pw_loop_destroy_source(impl->data_loop, impl->socket); if (impl->fd >= 0) close(impl->fd); pw_properties_free(impl->stream_props); pw_properties_free(impl->props); - free(impl->leftover); + free(impl->buffer); free(impl); } @@ -569,6 +701,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_properties *props = NULL; struct impl *impl; const char *str, *media_class = NULL; + struct pw_data_loop *data_loop; int res; PW_LOG_TOPIC_INIT(mod_topic); @@ -601,6 +734,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->module = module; impl->context = context; + data_loop = pw_context_get_data_loop(context); + impl->data_loop = pw_data_loop_get_loop(data_loop); if ((str = pw_properties_get(props, "tunnel.mode")) == NULL) str = "playback"; @@ -662,12 +797,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, PW_KEY_NODE_RATE); - impl->leftover = calloc(1, impl->frame_size); - if (impl->leftover == NULL) { + impl->buffer = calloc(1, RINGBUFFER_SIZE); + if (impl->buffer == NULL) { res = -errno; - pw_log_error("can't alloc leftover buffer: %m"); + pw_log_error("can't alloc ringbuffer: %m"); goto error; } + spa_ringbuffer_init(&impl->ring); + impl->target_buffer = 8192 * impl->frame_size; + spa_dll_init(&impl->dll); + spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256.f, impl->info.rate); + impl->max_error = 256.0f * impl->frame_size; impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { diff --git a/src/modules/module-protocol-pulse/modules/module-pipe-source.c b/src/modules/module-protocol-pulse/modules/module-pipe-source.c index 8de79cace..ec077ab9d 100644 --- a/src/modules/module-protocol-pulse/modules/module-pipe-source.c +++ b/src/modules/module-protocol-pulse/modules/module-pipe-source.c @@ -150,6 +150,11 @@ static int module_pipe_source_prepare(struct module * const module) pw_properties_set(stream_props, PW_KEY_NODE_NAME, "fifo_input"); +// if ((str = pw_properties_get(stream_props, PW_KEY_NODE_DRIVER)) == NULL) +// pw_properties_set(stream_props, PW_KEY_NODE_DRIVER, "true"); +// if ((str = pw_properties_get(stream_props, PW_KEY_PRIORITY_DRIVER)) == NULL) +// pw_properties_set(stream_props, PW_KEY_PRIORITY_DRIVER, "50000"); + d->module = module; d->stream_props = stream_props; d->global_props = global_props;