pulse-server: improve under and overrun handling

This commit is contained in:
Wim Taymans 2020-11-09 12:06:44 +01:00
parent 8abb648ec3
commit bd57c1a457

View file

@ -1229,19 +1229,20 @@ do_process_done(struct spa_loop *loop,
send_command_request(stream); send_command_request(stream);
} else { } else {
struct message *msg;
stream->write_index = pd->write_index; stream->write_index = pd->write_index;
avail = spa_ringbuffer_get_read_index(&stream->ring, &index); avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
if (avail <= 0) { if (avail <= 0) {
/* underrun */ /* underrun, can't really happen but if it does we
if (!stream->is_underrun) { * do nothing and wait for more data */
stream->is_underrun = true; pw_log_warn(NAME" %p: underrun", stream);
send_underflow(stream, index);
}
} else if (avail > (int32_t)stream->attr.maxlength) {
/* overrun */
send_overflow(stream);
} else { } else {
struct message *msg; if (avail > (int32_t)stream->attr.maxlength) {
/* overrun, catch up to latest fragment and send it */
pw_log_warn(NAME" %p: overrun", stream);
avail = stream->attr.fragsize;
index = stream->write_index - avail;
}
msg = message_alloc(client, stream->channel, avail); msg = message_alloc(client, stream->channel, avail);
if (msg == NULL) if (msg == NULL)
@ -1255,8 +1256,6 @@ do_process_done(struct spa_loop *loop,
stream->read_index = index + avail; stream->read_index = index + avail;
spa_ringbuffer_read_update(&stream->ring, stream->read_index); spa_ringbuffer_read_update(&stream->ring, stream->read_index);
stream->is_underrun = false;
send_message(client, msg); send_message(client, msg);
} }
} }
@ -1289,7 +1288,7 @@ static void stream_process(void *data)
if (stream->direction == PW_DIRECTION_OUTPUT) { if (stream->direction == PW_DIRECTION_OUTPUT) {
int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index); int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index);
if (avail <= 0) { if (avail <= 0) {
/* underrun */ /* underrun, produce a silence buffer */
size = buf->datas[0].maxsize; size = buf->datas[0].maxsize;
memset(p, 0, size); memset(p, 0, size);
@ -1299,12 +1298,14 @@ static void stream_process(void *data)
pd.underrun_for = size; pd.underrun_for = size;
pd.underrun = true; pd.underrun = true;
} }
} else if (avail > (int32_t)stream->attr.maxlength) {
/* overrun, handled by other side */
pw_log_warn(NAME" %p: overrun", stream);
size = buf->datas[0].maxsize;
memset(p, 0, size);
} else { } else {
if (avail > (int32_t)stream->attr.maxlength) {
/* overrun, reported by other side, here we skip
* ahead to the oldest data. */
pw_log_warn(NAME" %p: overrun", stream);
pd.read_index += avail - stream->attr.maxlength;
avail = stream->attr.maxlength;
}
size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail); size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
spa_ringbuffer_read_data(&stream->ring, spa_ringbuffer_read_data(&stream->ring,
@ -1324,26 +1325,25 @@ static void stream_process(void *data)
buffer->size = size / stream->frame_size; buffer->size = size / stream->frame_size;
} else { } else {
int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index); int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index);
size = buf->datas[0].chunk->size;
if (filled < 0) { if (filled < 0) {
/* underrun */ /* underrun, can't really happen because we never read more
* than what's available on the other side */
pw_log_warn(NAME" %p: underrun", stream); pw_log_warn(NAME" %p: underrun", stream);
} else if (filled > (int32_t)stream->attr.maxlength) { } else if ((uint32_t)filled + size > stream->attr.maxlength) {
/* overrun */ /* overrun, can happen when the other side is not
* reading fast enough. We still write our data into the
* ringbuffer and expect the other side to catch up. */
pw_log_warn(NAME" %p: overrun", stream); pw_log_warn(NAME" %p: overrun", stream);
} else {
uint32_t avail = stream->attr.maxlength - filled;
size = SPA_MIN(buf->datas[0].chunk->size, avail);
spa_ringbuffer_write_data(&stream->ring,
stream->buffer, stream->attr.maxlength,
pd.write_index % stream->attr.maxlength,
SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
size);
pd.write_index += size;
spa_ringbuffer_write_update(&stream->ring,
pd.write_index);
} }
spa_ringbuffer_write_data(&stream->ring,
stream->buffer, stream->attr.maxlength,
pd.write_index % stream->attr.maxlength,
SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
size);
pd.write_index += size;
spa_ringbuffer_write_update(&stream->ring, pd.write_index);
} }
pw_stream_queue_buffer(stream->stream, buffer); pw_stream_queue_buffer(stream->stream, buffer);
@ -4083,13 +4083,13 @@ static int handle_memblock(struct client *client, struct message *msg)
} else if (filled + msg->length > stream->attr.maxlength) { } else if (filled + msg->length > stream->attr.maxlength) {
/* overrun */ /* overrun */
send_overflow(stream); send_overflow(stream);
} else {
spa_ringbuffer_write_data(&stream->ring,
stream->buffer, stream->attr.maxlength,
index % stream->attr.maxlength,
msg->data, msg->length);
} }
/* always write data to ringbuffer, we expect the other side
* to recover */
spa_ringbuffer_write_data(&stream->ring,
stream->buffer, stream->attr.maxlength,
index % stream->attr.maxlength,
msg->data, msg->length);
stream->write_index = index + msg->length; stream->write_index = index + msg->length;
spa_ringbuffer_write_update(&stream->ring, stream->write_index); spa_ringbuffer_write_update(&stream->ring, stream->write_index);
stream->requested -= msg->length; stream->requested -= msg->length;