pulse-server: Only allow one memblock in the queue

Allow only one memblock in the output queue. Otherwise, if the
client is for some reason not reading from the socket, we would queue
an unlimited amount of messages until we get killed by OOM.

Change some warn to debug in the process function for things we
handle from the non-rt thread.

Fixes #488
This commit is contained in:
Wim Taymans 2021-01-20 10:41:34 +01:00
parent 8f80a2bc15
commit 384b8a4593

View file

@ -1407,7 +1407,15 @@ do_process_done(struct spa_loop *loop,
} else { } else {
struct message *msg; struct message *msg;
stream->write_index = pd->write_index; stream->write_index = pd->write_index;
avail = spa_ringbuffer_get_read_index(&stream->ring, &index); avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
if (!spa_list_is_empty(&client->out_messages)) {
pw_log_debug(NAME" %p: [%s] pending read:%u avail:%d",
stream, client->name, index, avail);
return 0;
}
if (avail <= 0) { if (avail <= 0) {
/* underrun, can't really happen but if it does we /* underrun, can't really happen but if it does we
* do nothing and wait for more data */ * do nothing and wait for more data */
@ -1416,7 +1424,7 @@ do_process_done(struct spa_loop *loop,
} else { } else {
if (avail > (int32_t)stream->attr.maxlength) { if (avail > (int32_t)stream->attr.maxlength) {
/* overrun, catch up to latest fragment and send it */ /* overrun, catch up to latest fragment and send it */
pw_log_warn(NAME" %p: [%s] overrun read:%u avail:%d max:%u", pw_log_warn(NAME" %p: [%s] overrun recover read:%u avail:%d max:%u",
stream, client->name, index, avail, stream->attr.maxlength); stream, client->name, index, avail, stream->attr.maxlength);
avail = stream->attr.fragsize; avail = stream->attr.fragsize;
index = stream->write_index - avail; index = stream->write_index - avail;
@ -1452,7 +1460,7 @@ static void stream_process(void *data)
uint32_t size, minreq; uint32_t size, minreq;
struct process_data pd; struct process_data pd;
pw_log_trace(NAME" %p: process", stream); pw_log_trace_fp(NAME" %p: process", stream);
buffer = pw_stream_dequeue_buffer(stream->stream); buffer = pw_stream_dequeue_buffer(stream->stream);
if (buffer == NULL) if (buffer == NULL)
@ -1491,7 +1499,7 @@ static void stream_process(void *data)
if (avail > (int32_t)stream->attr.maxlength) { if (avail > (int32_t)stream->attr.maxlength) {
/* overrun, reported by other side, here we skip /* overrun, reported by other side, here we skip
* ahead to the oldest data. */ * ahead to the oldest data. */
pw_log_warn(NAME" %p: [%s] overrun read:%u avail:%d max:%u", pw_log_debug(NAME" %p: [%s] overrun read:%u avail:%d max:%u",
stream, client->name, pd.read_index, avail, stream, client->name, pd.read_index, avail,
stream->attr.maxlength); stream->attr.maxlength);
pd.read_index += avail - stream->attr.maxlength; pd.read_index += avail - stream->attr.maxlength;
@ -1526,8 +1534,8 @@ static void stream_process(void *data)
} else if ((uint32_t)filled + size > stream->attr.maxlength) { } else if ((uint32_t)filled + size > stream->attr.maxlength) {
/* overrun, can happen when the other side is not /* overrun, can happen when the other side is not
* reading fast enough. We still write our data into the * reading fast enough. We still write our data into the
* ringbuffer and expect the other side to catch up. */ * ringbuffer and expect the other side to warn and catch up. */
pw_log_warn(NAME" %p: [%s] overrun write:%u filled:%d size:%u max:%u", pw_log_debug(NAME" %p: [%s] overrun write:%u filled:%d size:%u max:%u",
stream, client->name, pd.write_index, filled, stream, client->name, pd.write_index, filled,
size, stream->attr.maxlength); size, stream->attr.maxlength);
} }