From c62b27ec75db95f4aa80bd3cce9056f0682570f0 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 23 Oct 2020 12:59:53 +0200 Subject: [PATCH] pulse-server: handle under/overrun Make sure we don't report underrun for the first time we pull data. Send STARTED message when we leave the underrun state Send UNDERFLOW when we enter the underrun state Count underrun when corked. Pass process stats back to the mainloop after process. Decrease the pending length when we get underrun so that we can request more data. --- .../module-protocol-pulse/pulse-server.c | 81 ++++++++++++++----- 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 624f0f985..cae82ca16 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -150,6 +150,7 @@ struct stream { unsigned int muted_set:1; unsigned int adjust_latency:1; unsigned int have_time:1; + unsigned int is_underrun:1; }; struct server { @@ -525,6 +526,7 @@ static void stream_flush(struct stream *stream) stream->playing_for = 0; stream->underrun_for = 0; stream->have_time = false; + stream->is_underrun = true; } static void stream_free(struct stream *stream) @@ -627,11 +629,11 @@ static int send_command_request(struct stream *stream) update_timing_info(stream); size = writable_size(stream, 0); + pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); + if (size == 0) return 0; - pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); - msg = message_alloc(client, -1, 0); message_put(msg, TAG_U32, COMMAND_REQUEST, @@ -870,8 +872,6 @@ static void stream_state_changed(void *data, enum pw_stream_state old, case PW_STREAM_STATE_PAUSED: break; case PW_STREAM_STATE_STREAMING: - stream->playing_for = 0; - send_stream_started(stream); break; } } @@ -957,21 +957,50 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * pw_stream_update_params(stream->stream, params, n_params); } +struct process_data { + uint32_t underrun_for; + uint32_t playing_for; + uint32_t read_index; + uint32_t write_index; + unsigned int underrun:1; +}; + static int do_process_done(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct stream *stream = user_data; struct client *client = stream->client; + const struct process_data *pd = data; if (stream->direction == PW_DIRECTION_OUTPUT) { + if (stream->corked) { + stream->underrun_for += pd->underrun_for; + stream->playing_for = 0; + return 0; + } + if (pd->underrun != stream->is_underrun) { + stream->is_underrun = pd->underrun; + stream->underrun_for = 0; + stream->playing_for = 0; + if (pd->underrun) + send_underflow(stream, pd->read_index); + else + send_stream_started(stream); + } + stream->playing_for += pd->playing_for; + stream->underrun_for += pd->underrun_for; + stream->pending -= SPA_MIN(stream->pending, pd->underrun_for); send_command_request(stream); } else { uint32_t index; int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index); if (avail <= 0) { /* underrun */ - send_underflow(stream, index); + if (!stream->is_underrun) { + stream->is_underrun = true; + send_underflow(stream, index); + } } else if (avail > MAXLENGTH) { /* overrun */ send_overflow(stream); @@ -988,6 +1017,7 @@ do_process_done(struct spa_loop *loop, index + avail); send_message(client, msg); } + stream->is_underrun = false; } } return 0; @@ -1000,7 +1030,8 @@ static void stream_process(void *data) void *p; struct pw_buffer *buffer; struct spa_buffer *buf; - uint32_t index, size; + uint32_t size; + struct process_data pd; pw_log_trace(NAME" %p: process", stream); @@ -1012,18 +1043,20 @@ static void stream_process(void *data) if ((p = buf->datas[0].data) == NULL) return; + spa_zero(pd); if (stream->direction == PW_DIRECTION_OUTPUT) { - int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index); if (avail <= 0) { /* underrun */ - if (stream->drain_tag) { + if (stream->drain_tag) pw_stream_flush(stream->stream, true); - } else - pw_log_warn(NAME" %p: underrun", stream); + size = buf->datas[0].maxsize; memset(p, 0, size); + pd.underrun_for = size; + pd.underrun = true; } else if (avail > MAXLENGTH) { - /* overrun */ + /* overrun, handled by other side */ pw_log_warn(NAME" %p: overrun", stream); size = buf->datas[0].maxsize; memset(p, 0, size); @@ -1032,18 +1065,19 @@ static void stream_process(void *data) spa_ringbuffer_read_data(&stream->ring, stream->buffer, MAXLENGTH, - index % MAXLENGTH, + pd.read_index % MAXLENGTH, p, size); spa_ringbuffer_read_update(&stream->ring, - index + size); + pd.read_index + size); - stream->playing_for += size; + pd.playing_for = size; + pd.underrun = false; } buf->datas[0].chunk->offset = 0; buf->datas[0].chunk->stride = stream->frame_size; buf->datas[0].chunk->size = size; } else { - int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index); if (filled < 0) { /* underrun */ pw_log_warn(NAME" %p: underrun", stream); @@ -1056,18 +1090,18 @@ static void stream_process(void *data) spa_ringbuffer_write_data(&stream->ring, stream->buffer, MAXLENGTH, - index % MAXLENGTH, + pd.write_index % MAXLENGTH, SPA_MEMBER(p, buf->datas[0].chunk->offset, void), size); spa_ringbuffer_write_update(&stream->ring, - index + size); + pd.write_index + size); } } pw_stream_queue_buffer(stream->stream, buffer); pw_loop_invoke(impl->loop, - do_process_done, 1, NULL, 0, false, stream); + do_process_done, 1, &pd, sizeof(pd), false, stream); } static void stream_drained(void *data) @@ -1349,6 +1383,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui stream->muted_set = muted_set; stream->attr = attr; stream->dev = dev; + stream->is_underrun = true; flags = 0; if (no_move) @@ -1745,6 +1780,9 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, pw_stream_set_active(stream->stream, !cork); stream->corked = cork; stream->playing_for = 0; + stream->underrun_for = 0; + if (cork) + stream->is_underrun = true; return reply_simple_ack(client, tag); } @@ -1770,8 +1808,8 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman switch (command) { case COMMAND_FLUSH_PLAYBACK_STREAM: case COMMAND_FLUSH_RECORD_STREAM: - stream_flush(stream); pw_stream_flush(stream->stream, false); + stream_flush(stream); send_command_request(stream); break; case COMMAND_TRIGGER_PLAYBACK_STREAM: @@ -2707,11 +2745,12 @@ static int handle_memblock(struct client *client, struct message *msg) filled = spa_ringbuffer_get_write_index(&stream->ring, &index); if (filled < 0) { - /* underrun */ - send_underflow(stream, index); + /* underrun, reported on reader side */ } else if (filled + msg->length > MAXLENGTH) { /* overrun */ send_overflow(stream); + stream->write_index += msg->length; + stream->pending -= SPA_MIN(stream->pending, msg->length); } else { spa_ringbuffer_write_data(&stream->ring, stream->buffer, MAXLENGTH,