pipe-tunnel: rework the source

Let the source write into a ringbuffer when there is data available.

We then read from the ringbuffer when scheduled and use a dll to keep
the delay constant. We can later make this a driver and use the rate
correction to tweak the timeouts instead of resampling.

See #3478
This commit is contained in:
Wim Taymans 2023-08-30 18:41:19 +02:00
parent bc15d0c766
commit 9f66c42d1f
2 changed files with 179 additions and 34 deletions

View file

@ -111,6 +111,9 @@
#define DEFAULT_CHANNELS 2
#define DEFAULT_POSITION "[ FL FR ]"
#define RINGBUFFER_SIZE (1u << 22)
#define RINGBUFFER_MASK (RINGBUFFER_SIZE-1)
PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define PW_LOG_TOPIC_DEFAULT mod_topic
@ -137,6 +140,7 @@ static const struct spa_dict_item module_props[] = {
struct impl {
struct pw_context *context;
struct pw_loop *data_loop;
#define MODE_PLAYBACK 0
#define MODE_CAPTURE 1
@ -156,6 +160,7 @@ struct impl {
char *filename;
unsigned int unlink_fifo;
int fd;
struct spa_source *socket;
struct pw_properties *stream_props;
enum pw_direction direction;
@ -165,8 +170,13 @@ struct impl {
uint32_t frame_size;
unsigned int do_disconnect:1;
uint32_t leftover_count;
uint8_t *leftover;
struct spa_ringbuffer ring;
void *buffer;
uint32_t target_buffer;
struct spa_io_rate_match *rate_match;
struct spa_dll dll;
float max_error;
};
static void stream_destroy(void *d)
@ -186,8 +196,12 @@ static void stream_state_changed(void *d, enum pw_stream_state old,
pw_impl_module_schedule_destroy(impl->module);
break;
case PW_STREAM_STATE_PAUSED:
if (impl->direction == PW_DIRECTION_OUTPUT)
pw_loop_update_io(impl->data_loop, impl->socket, 0);
break;
case PW_STREAM_STATE_STREAMING:
if (impl->direction == PW_DIRECTION_OUTPUT)
pw_loop_update_io(impl->data_loop, impl->socket, SPA_IO_IN);
break;
default:
break;
@ -221,9 +235,12 @@ static void playback_stream_process(void *data)
continue;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Don't continue writing */
pw_log_debug("pipe (%s) overrun: %m",
impl->filename);
break;
} else {
pw_log_warn("Failed to write to pipe sink");
pw_log_warn("Failed to write to pipe (%s): %m",
impl->filename);
}
}
offs += written;
@ -233,53 +250,85 @@ static void playback_stream_process(void *data)
pw_stream_queue_buffer(impl->stream, buf);
}
static void update_rate(struct impl *impl, uint32_t filled)
{
float error, corr;
if (impl->rate_match == NULL)
return;
error = (float)impl->target_buffer - (float)(filled);
error = SPA_CLAMP(error, -impl->max_error, impl->max_error);
corr = spa_dll_update(&impl->dll, error);
pw_log_info("error:%f corr:%f current:%u target:%u",
error, corr, filled, impl->target_buffer);
SPA_FLAG_SET(impl->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE);
impl->rate_match->rate = 1.0f / corr;
}
static void capture_stream_process(void *data)
{
struct impl *impl = data;
struct pw_buffer *buf;
struct spa_data *d;
uint32_t req;
ssize_t nread;
struct spa_data *bd;
uint32_t req, index, size;
int32_t avail;
if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) {
pw_log_debug("out of buffers: %m");
pw_log_warn("out of buffers: %m");
return;
}
d = &buf->buffer->datas[0];
bd = &buf->buffer->datas[0];
if ((req = buf->requested * impl->frame_size) == 0)
req = 4096 * impl->frame_size;
req = SPA_MIN(req, d->maxsize);
size = SPA_MIN(req, bd->maxsize);
size = SPA_ROUND_DOWN(size, impl->frame_size);
d->chunk->offset = 0;
d->chunk->stride = impl->frame_size;
d->chunk->size = SPA_MIN(req, impl->leftover_count);
memcpy(d->data, impl->leftover, d->chunk->size);
req -= d->chunk->size;
avail = spa_ringbuffer_get_read_index(&impl->ring, &index);
nread = read(impl->fd, SPA_PTROFF(d->data, d->chunk->size, void), req);
if (nread < 0) {
const bool important = !(errno == EINTR
|| errno == EAGAIN
|| errno == EWOULDBLOCK);
pw_log_debug("avail %d %u %u", avail, index, size);
if (important)
pw_log_warn("failed to read from pipe (%s): %s",
impl->filename, strerror(errno));
}
else {
d->chunk->size += nread;
if (avail < (int32_t)size)
memset(bd->data, 0, size);
if (avail > (int32_t)RINGBUFFER_SIZE) {
avail = impl->target_buffer;
index += avail - impl->target_buffer;
}
if (avail > 0) {
avail = SPA_ROUND_DOWN(avail, impl->frame_size);
update_rate(impl, avail);
impl->leftover_count = d->chunk->size % impl->frame_size;
d->chunk->size -= impl->leftover_count;
memcpy(impl->leftover, SPA_PTROFF(d->data, d->chunk->size, void), impl->leftover_count);
avail = SPA_MIN(size, (uint32_t)avail);
spa_ringbuffer_read_data(&impl->ring,
impl->buffer, RINGBUFFER_SIZE,
index & RINGBUFFER_MASK,
bd->data, avail);
index += avail;
spa_ringbuffer_read_update(&impl->ring, index);
}
bd->chunk->offset = 0;
bd->chunk->size = size;
bd->chunk->stride = impl->frame_size;
pw_stream_queue_buffer(impl->stream, buf);
}
static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
struct impl *impl = data;
switch (id) {
case SPA_IO_RateMatch:
impl->rate_match = area;
break;
}
}
static const struct pw_stream_events playback_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
@ -290,8 +339,9 @@ static const struct pw_stream_events playback_stream_events = {
static const struct pw_stream_events capture_stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.io_changed = stream_io_changed,
.state_changed = stream_state_changed,
.process = capture_stream_process
.process = capture_stream_process,
};
static int create_stream(struct impl *impl)
@ -335,6 +385,78 @@ static int create_stream(struct impl *impl)
return 0;
}
static inline void
set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size,
uint32_t offset, struct iovec *iov, uint32_t len)
{
iov[0].iov_len = SPA_MIN(len, size - offset);
iov[0].iov_base = SPA_PTROFF(buffer, offset, void);
iov[1].iov_len = len - iov[0].iov_len;
iov[1].iov_base = buffer;
}
static int handle_pipe_read(struct impl *impl)
{
ssize_t nread;
int32_t filled;
uint32_t index;
struct iovec iov[2];
filled = spa_ringbuffer_get_write_index(&impl->ring, &index);
if (filled < 0) {
pw_log_warn("%p: underrun write:%u filled:%d",
impl, index, filled);
}
set_iovec(&impl->ring,
impl->buffer, RINGBUFFER_SIZE,
index & RINGBUFFER_MASK,
iov, RINGBUFFER_SIZE);
nread = read(impl->fd, iov[0].iov_base, iov[0].iov_len);
if (nread > 0) {
index += nread;
filled += nread;
if (nread == (ssize_t)iov[0].iov_len) {
nread = read(impl->fd, iov[1].iov_base, iov[1].iov_len);
if (nread > 0) {
index += nread;
filled += nread;
}
}
}
spa_ringbuffer_write_update(&impl->ring, index);
if (nread < 0) {
const bool important = !(errno == EINTR
|| errno == EAGAIN
|| errno == EWOULDBLOCK);
if (important)
pw_log_warn("failed to read from pipe (%s): %m",
impl->filename);
else
pw_log_debug("pipe (%s) underrun: %m", impl->filename);
}
pw_log_debug("filled %d %u %d", filled, index, impl->target_buffer);
return 0;
}
static void on_pipe_io(void *data, int fd, uint32_t mask)
{
struct impl *impl = data;
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pw_log_warn("error:%08x", mask);
pw_loop_update_io(impl->data_loop, impl->socket, 0);
return;
}
if (mask & SPA_IO_IN)
handle_pipe_read(impl);
}
static int create_fifo(struct impl *impl)
{
struct stat st;
@ -363,7 +485,6 @@ static int create_fifo(struct impl *impl)
do_unlink_fifo = true;
}
if ((fd = open(filename, O_RDWR | O_CLOEXEC | O_NONBLOCK, 0)) < 0) {
res = -errno;
pw_log_error("open('%s'): %s", filename, spa_strerror(res));
@ -381,6 +502,14 @@ static int create_fifo(struct impl *impl)
pw_log_error("'%s' is not a FIFO.", filename);
goto error;
}
impl->socket = pw_loop_add_io(impl->data_loop, fd,
0, false, on_pipe_io, impl);
if (impl->socket == NULL) {
res = -errno;
pw_log_error("can't create socket");
goto error;
}
pw_log_info("%s fifo '%s' with format:%s channels:%d rate:%d",
impl->direction == PW_DIRECTION_OUTPUT ? "reading from" : "writing to",
filename,
@ -390,6 +519,7 @@ static int create_fifo(struct impl *impl)
impl->filename = strdup(filename);
impl->unlink_fifo = do_unlink_fifo;
impl->fd = fd;
return 0;
error:
@ -440,13 +570,15 @@ static void impl_destroy(struct impl *impl)
unlink(impl->filename);
free(impl->filename);
}
if (impl->socket)
pw_loop_destroy_source(impl->data_loop, impl->socket);
if (impl->fd >= 0)
close(impl->fd);
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
free(impl->leftover);
free(impl->buffer);
free(impl);
}
@ -569,6 +701,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
struct pw_properties *props = NULL;
struct impl *impl;
const char *str, *media_class = NULL;
struct pw_data_loop *data_loop;
int res;
PW_LOG_TOPIC_INIT(mod_topic);
@ -601,6 +734,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->module = module;
impl->context = context;
data_loop = pw_context_get_data_loop(context);
impl->data_loop = pw_data_loop_get_loop(data_loop);
if ((str = pw_properties_get(props, "tunnel.mode")) == NULL)
str = "playback";
@ -662,12 +797,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
copy_props(impl, props, PW_KEY_NODE_RATE);
impl->leftover = calloc(1, impl->frame_size);
if (impl->leftover == NULL) {
impl->buffer = calloc(1, RINGBUFFER_SIZE);
if (impl->buffer == NULL) {
res = -errno;
pw_log_error("can't alloc leftover buffer: %m");
pw_log_error("can't alloc ringbuffer: %m");
goto error;
}
spa_ringbuffer_init(&impl->ring);
impl->target_buffer = 8192 * impl->frame_size;
spa_dll_init(&impl->dll);
spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 256.f, impl->info.rate);
impl->max_error = 256.0f * impl->frame_size;
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {