mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-02 09:01:50 -05:00
pulse-server: Avoid overflow in stream read/write index
Keep separate counters for the ringbuffer and stream read/write indexes. The ringbuffer has 32 bits indexes while the pulse server is required to keep 64 bit read/write indexes. Also handle invalid seek flags. Fixes #1331
This commit is contained in:
parent
0a5ae1cf47
commit
f85b0bfd16
1 changed files with 56 additions and 49 deletions
|
|
@ -1500,8 +1500,8 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size
|
||||||
|
|
||||||
struct process_data {
|
struct process_data {
|
||||||
struct pw_time pwt;
|
struct pw_time pwt;
|
||||||
uint32_t read_index;
|
uint32_t read_inc;
|
||||||
uint32_t write_index;
|
uint32_t write_inc;
|
||||||
uint32_t underrun_for;
|
uint32_t underrun_for;
|
||||||
uint32_t playing_for;
|
uint32_t playing_for;
|
||||||
uint32_t missing;
|
uint32_t missing;
|
||||||
|
|
@ -1526,7 +1526,7 @@ do_process_done(struct spa_loop *loop,
|
||||||
stream->delay = 0;
|
stream->delay = 0;
|
||||||
|
|
||||||
if (stream->direction == PW_DIRECTION_OUTPUT) {
|
if (stream->direction == PW_DIRECTION_OUTPUT) {
|
||||||
stream->read_index = pd->read_index;
|
stream->read_index += pd->read_inc;
|
||||||
if (stream->corked) {
|
if (stream->corked) {
|
||||||
if (stream->underrun_for != (uint64_t)-1)
|
if (stream->underrun_for != (uint64_t)-1)
|
||||||
stream->underrun_for += pd->underrun_for;
|
stream->underrun_for += pd->underrun_for;
|
||||||
|
|
@ -1538,7 +1538,7 @@ do_process_done(struct spa_loop *loop,
|
||||||
stream->underrun_for = 0;
|
stream->underrun_for = 0;
|
||||||
stream->playing_for = 0;
|
stream->playing_for = 0;
|
||||||
if (pd->underrun)
|
if (pd->underrun)
|
||||||
send_underflow(stream, pd->read_index, pd->underrun_for);
|
send_underflow(stream, stream->read_index, pd->underrun_for);
|
||||||
else
|
else
|
||||||
send_stream_started(stream);
|
send_stream_started(stream);
|
||||||
}
|
}
|
||||||
|
|
@ -1551,7 +1551,7 @@ do_process_done(struct spa_loop *loop,
|
||||||
send_command_request(stream);
|
send_command_request(stream);
|
||||||
} else {
|
} else {
|
||||||
struct message *msg;
|
struct message *msg;
|
||||||
stream->write_index = pd->write_index;
|
stream->write_index += pd->write_inc;
|
||||||
|
|
||||||
avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
|
avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
|
||||||
|
|
||||||
|
|
@ -1568,11 +1568,13 @@ do_process_done(struct spa_loop *loop,
|
||||||
stream, client->name, index, avail);
|
stream, client->name, index, avail);
|
||||||
} else {
|
} else {
|
||||||
if (avail > (int32_t)stream->attr.maxlength) {
|
if (avail > (int32_t)stream->attr.maxlength) {
|
||||||
|
uint32_t skip = avail - stream->attr.fragsize;
|
||||||
/* overrun, catch up to latest fragment and send it */
|
/* overrun, catch up to latest fragment and send it */
|
||||||
pw_log_warn(NAME" %p: [%s] overrun recover read:%u avail:%d max:%u",
|
pw_log_warn(NAME" %p: [%s] overrun recover read:%u avail:%d max:%u skip:%u",
|
||||||
stream, client->name, index, avail, stream->attr.maxlength);
|
stream, client->name, index, avail, stream->attr.maxlength, skip);
|
||||||
|
index += skip;
|
||||||
|
stream->read_index += skip;
|
||||||
avail = stream->attr.fragsize;
|
avail = stream->attr.fragsize;
|
||||||
index = stream->write_index - avail;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (avail > 0) {
|
while (avail > 0) {
|
||||||
|
|
@ -1593,9 +1595,9 @@ do_process_done(struct spa_loop *loop,
|
||||||
|
|
||||||
index += towrite;
|
index += towrite;
|
||||||
avail -= towrite;
|
avail -= towrite;
|
||||||
|
stream->read_index += towrite;
|
||||||
}
|
}
|
||||||
stream->read_index = index;
|
spa_ringbuffer_read_update(&stream->ring, index);
|
||||||
spa_ringbuffer_read_update(&stream->ring, stream->read_index);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
@ -1610,7 +1612,7 @@ static void stream_process(void *data)
|
||||||
void *p;
|
void *p;
|
||||||
struct pw_buffer *buffer;
|
struct pw_buffer *buffer;
|
||||||
struct spa_buffer *buf;
|
struct spa_buffer *buf;
|
||||||
uint32_t size, minreq;
|
uint32_t size, minreq, index;
|
||||||
struct process_data pd;
|
struct process_data pd;
|
||||||
|
|
||||||
pw_log_trace_fp(NAME" %p: process", stream);
|
pw_log_trace_fp(NAME" %p: process", stream);
|
||||||
|
|
@ -1626,7 +1628,7 @@ static void stream_process(void *data)
|
||||||
spa_zero(pd);
|
spa_zero(pd);
|
||||||
|
|
||||||
if (stream->direction == PW_DIRECTION_OUTPUT) {
|
if (stream->direction == PW_DIRECTION_OUTPUT) {
|
||||||
int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index);
|
int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
|
||||||
|
|
||||||
if (stream->rate_match)
|
if (stream->rate_match)
|
||||||
minreq = stream->rate_match->size * stream->frame_size;
|
minreq = stream->rate_match->size * stream->frame_size;
|
||||||
|
|
@ -1648,17 +1650,20 @@ static void stream_process(void *data)
|
||||||
if (stream->attr.prebuf == 0 && !stream->corked) {
|
if (stream->attr.prebuf == 0 && !stream->corked) {
|
||||||
pd.missing = size;
|
pd.missing = size;
|
||||||
pd.playing_for = size;
|
pd.playing_for = size;
|
||||||
pd.read_index += size;
|
index += size;
|
||||||
spa_ringbuffer_read_update(&stream->ring, pd.read_index);
|
pd.read_inc = size;
|
||||||
|
spa_ringbuffer_read_update(&stream->ring, index);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (avail > (int32_t)stream->attr.maxlength) {
|
if (avail > (int32_t)stream->attr.maxlength) {
|
||||||
|
uint32_t skip = avail - stream->attr.maxlength;
|
||||||
/* overrun, reported by other side, here we skip
|
/* overrun, reported by other side, here we skip
|
||||||
* ahead to the oldest data. */
|
* ahead to the oldest data. */
|
||||||
pw_log_debug(NAME" %p: [%s] overrun read:%u avail:%d max:%u",
|
pw_log_debug(NAME" %p: [%s] overrun read:%u avail:%d max:%u skip:%u",
|
||||||
stream, client->name, pd.read_index, avail,
|
stream, client->name, index, avail,
|
||||||
stream->attr.maxlength);
|
stream->attr.maxlength, skip);
|
||||||
pd.read_index += avail - stream->attr.maxlength;
|
index += skip;
|
||||||
|
pd.read_inc = skip;
|
||||||
avail = stream->attr.maxlength;
|
avail = stream->attr.maxlength;
|
||||||
}
|
}
|
||||||
size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
|
size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
|
||||||
|
|
@ -1666,46 +1671,47 @@ static void stream_process(void *data)
|
||||||
|
|
||||||
spa_ringbuffer_read_data(&stream->ring,
|
spa_ringbuffer_read_data(&stream->ring,
|
||||||
stream->buffer, stream->attr.maxlength,
|
stream->buffer, stream->attr.maxlength,
|
||||||
pd.read_index % stream->attr.maxlength,
|
index % stream->attr.maxlength,
|
||||||
p, size);
|
p, size);
|
||||||
|
|
||||||
pd.read_index += size;
|
index += size;
|
||||||
spa_ringbuffer_read_update(&stream->ring, pd.read_index);
|
pd.read_inc += size;
|
||||||
|
spa_ringbuffer_read_update(&stream->ring, index);
|
||||||
|
|
||||||
pd.playing_for = size;
|
pd.playing_for = size;
|
||||||
pd.missing = size;
|
pd.missing = size;
|
||||||
pd.underrun = false;
|
pd.underrun = false;
|
||||||
}
|
}
|
||||||
pw_log_debug("produce %d", size);
|
|
||||||
buf->datas[0].chunk->offset = 0;
|
buf->datas[0].chunk->offset = 0;
|
||||||
buf->datas[0].chunk->stride = stream->frame_size;
|
buf->datas[0].chunk->stride = stream->frame_size;
|
||||||
buf->datas[0].chunk->size = size;
|
buf->datas[0].chunk->size = size;
|
||||||
buffer->size = size / stream->frame_size;
|
buffer->size = size / stream->frame_size;
|
||||||
} else {
|
} else {
|
||||||
int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index);
|
int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
|
||||||
size = buf->datas[0].chunk->size;
|
size = buf->datas[0].chunk->size;
|
||||||
if (filled < 0) {
|
if (filled < 0) {
|
||||||
/* underrun, can't really happen because we never read more
|
/* underrun, can't really happen because we never read more
|
||||||
* than what's available on the other side */
|
* than what's available on the other side */
|
||||||
pw_log_warn(NAME" %p: [%s] underrun write:%u filled:%d",
|
pw_log_warn(NAME" %p: [%s] underrun write:%u filled:%d",
|
||||||
stream, client->name, pd.write_index, filled);
|
stream, client->name, index, filled);
|
||||||
} else if ((uint32_t)filled + size > stream->attr.maxlength) {
|
} else if ((uint32_t)filled + size > stream->attr.maxlength) {
|
||||||
/* overrun, can happen when the other side is not
|
/* overrun, can happen when the other side is not
|
||||||
* reading fast enough. We still write our data into the
|
* reading fast enough. We still write our data into the
|
||||||
* ringbuffer and expect the other side to warn and catch up. */
|
* ringbuffer and expect the other side to warn and catch up. */
|
||||||
pw_log_debug(NAME" %p: [%s] overrun write:%u filled:%d size:%u max:%u",
|
pw_log_debug(NAME" %p: [%s] overrun write:%u filled:%d size:%u max:%u",
|
||||||
stream, client->name, pd.write_index, filled,
|
stream, client->name, index, filled,
|
||||||
size, stream->attr.maxlength);
|
size, stream->attr.maxlength);
|
||||||
}
|
}
|
||||||
|
|
||||||
spa_ringbuffer_write_data(&stream->ring,
|
spa_ringbuffer_write_data(&stream->ring,
|
||||||
stream->buffer, stream->attr.maxlength,
|
stream->buffer, stream->attr.maxlength,
|
||||||
pd.write_index % stream->attr.maxlength,
|
index % stream->attr.maxlength,
|
||||||
SPA_PTROFF(p, buf->datas[0].chunk->offset, void),
|
SPA_PTROFF(p, buf->datas[0].chunk->offset, void),
|
||||||
SPA_MIN(size, stream->attr.maxlength));
|
SPA_MIN(size, stream->attr.maxlength));
|
||||||
|
|
||||||
pd.write_index += size;
|
index += size;
|
||||||
spa_ringbuffer_write_update(&stream->ring, pd.write_index);
|
pd.write_inc = size;
|
||||||
|
spa_ringbuffer_write_update(&stream->ring, index);
|
||||||
}
|
}
|
||||||
pw_stream_queue_buffer(stream->stream, buffer);
|
pw_stream_queue_buffer(stream->stream, buffer);
|
||||||
|
|
||||||
|
|
@ -2293,7 +2299,7 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint
|
||||||
if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
|
if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
|
||||||
return -ENOENT;
|
return -ENOENT;
|
||||||
|
|
||||||
pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64
|
pw_log_debug("read:%"PRIx64" write:%"PRIx64" queued:%"PRIi64" delay:%"PRIi64
|
||||||
" playing:%"PRIu64,
|
" playing:%"PRIu64,
|
||||||
stream->read_index, stream->write_index,
|
stream->read_index, stream->write_index,
|
||||||
stream->write_index - stream->read_index, stream->delay,
|
stream->write_index - stream->read_index, stream->delay,
|
||||||
|
|
@ -2850,8 +2856,8 @@ static void stream_flush(struct stream *stream)
|
||||||
pw_stream_flush(stream->stream, false);
|
pw_stream_flush(stream->stream, false);
|
||||||
|
|
||||||
if (stream->type == STREAM_TYPE_PLAYBACK) {
|
if (stream->type == STREAM_TYPE_PLAYBACK) {
|
||||||
stream->write_index = stream->read_index =
|
stream->ring.writeindex = stream->ring.readindex;
|
||||||
stream->ring.writeindex = stream->ring.readindex;
|
stream->write_index = stream->read_index;
|
||||||
|
|
||||||
stream->missing = stream->attr.tlength -
|
stream->missing = stream->attr.tlength -
|
||||||
SPA_MIN(stream->requested, stream->attr.tlength);
|
SPA_MIN(stream->requested, stream->attr.tlength);
|
||||||
|
|
@ -2865,8 +2871,8 @@ static void stream_flush(struct stream *stream)
|
||||||
|
|
||||||
send_command_request(stream);
|
send_command_request(stream);
|
||||||
} else {
|
} else {
|
||||||
stream->read_index = stream->write_index =
|
stream->ring.readindex = stream->ring.writeindex;
|
||||||
stream->ring.readindex = stream->ring.writeindex;
|
stream->read_index = stream->write_index;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -5482,8 +5488,8 @@ static int handle_memblock(struct client *client, struct message *msg)
|
||||||
struct impl *impl = client->impl;
|
struct impl *impl = client->impl;
|
||||||
struct stream *stream;
|
struct stream *stream;
|
||||||
uint32_t channel, flags, index;
|
uint32_t channel, flags, index;
|
||||||
int64_t offset;
|
int64_t offset, diff;
|
||||||
int32_t filled, diff;
|
int32_t filled;
|
||||||
int res = 0;
|
int res = 0;
|
||||||
|
|
||||||
channel = ntohl(client->desc.channel);
|
channel = ntohl(client->desc.channel);
|
||||||
|
|
@ -5506,27 +5512,27 @@ static int handle_memblock(struct client *client, struct message *msg)
|
||||||
pw_log_debug("new block %p %p/%u filled:%d index:%d flags:%02x offset:%"PRIu64,
|
pw_log_debug("new block %p %p/%u filled:%d index:%d flags:%02x offset:%"PRIu64,
|
||||||
msg, msg->data, msg->length, filled, index, flags, offset);
|
msg, msg->data, msg->length, filled, index, flags, offset);
|
||||||
|
|
||||||
|
|
||||||
switch (flags & FLAG_SEEKMASK) {
|
switch (flags & FLAG_SEEKMASK) {
|
||||||
case SEEK_RELATIVE:
|
case SEEK_RELATIVE:
|
||||||
index += offset;
|
diff = offset;
|
||||||
filled += offset;
|
|
||||||
stream->missing -= offset;
|
|
||||||
break;
|
break;
|
||||||
case SEEK_ABSOLUTE:
|
case SEEK_ABSOLUTE:
|
||||||
diff = (int32_t)(offset - (uint64_t)index);
|
diff = offset - (int64_t)stream->write_index;
|
||||||
index += diff;
|
|
||||||
filled += diff;
|
|
||||||
stream->missing -= diff;
|
|
||||||
break;
|
break;
|
||||||
case SEEK_RELATIVE_ON_READ:
|
case SEEK_RELATIVE_ON_READ:
|
||||||
case SEEK_RELATIVE_END:
|
case SEEK_RELATIVE_END:
|
||||||
diff = (int32_t)(offset - (uint64_t)filled);
|
diff = offset - (int64_t)filled;
|
||||||
index += diff;
|
|
||||||
filled += diff;
|
|
||||||
stream->missing -= diff;
|
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
pw_log_warn(NAME" %p: Received memblock frame with invalid seek mode",
|
||||||
|
impl);
|
||||||
|
res = -EPROTO;
|
||||||
|
goto finish;
|
||||||
}
|
}
|
||||||
|
index += diff;
|
||||||
|
filled += diff;
|
||||||
|
stream->write_index += diff;
|
||||||
|
stream->missing -= diff;
|
||||||
|
|
||||||
if (filled < 0) {
|
if (filled < 0) {
|
||||||
/* underrun, reported on reader side */
|
/* underrun, reported on reader side */
|
||||||
|
|
@ -5542,8 +5548,9 @@ static int handle_memblock(struct client *client, struct message *msg)
|
||||||
index % stream->attr.maxlength,
|
index % stream->attr.maxlength,
|
||||||
msg->data,
|
msg->data,
|
||||||
SPA_MIN(msg->length, stream->attr.maxlength));
|
SPA_MIN(msg->length, stream->attr.maxlength));
|
||||||
stream->write_index = index + msg->length;
|
index += msg->length;
|
||||||
spa_ringbuffer_write_update(&stream->ring, stream->write_index);
|
stream->write_index += msg->length;
|
||||||
|
spa_ringbuffer_write_update(&stream->ring, index);
|
||||||
stream->requested -= SPA_MIN(msg->length, stream->requested);
|
stream->requested -= SPA_MIN(msg->length, stream->requested);
|
||||||
finish:
|
finish:
|
||||||
message_free(impl, msg, false, false);
|
message_free(impl, msg, false, false);
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue