diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 9aa774a6c..fc8c7f7b4 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -1500,8 +1500,8 @@ static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size struct process_data { struct pw_time pwt; - uint32_t read_index; - uint32_t write_index; + uint32_t read_inc; + uint32_t write_inc; uint32_t underrun_for; uint32_t playing_for; uint32_t missing; @@ -1526,7 +1526,7 @@ do_process_done(struct spa_loop *loop, stream->delay = 0; if (stream->direction == PW_DIRECTION_OUTPUT) { - stream->read_index = pd->read_index; + stream->read_index += pd->read_inc; if (stream->corked) { if (stream->underrun_for != (uint64_t)-1) stream->underrun_for += pd->underrun_for; @@ -1538,7 +1538,7 @@ do_process_done(struct spa_loop *loop, stream->underrun_for = 0; stream->playing_for = 0; if (pd->underrun) - send_underflow(stream, pd->read_index, pd->underrun_for); + send_underflow(stream, stream->read_index, pd->underrun_for); else send_stream_started(stream); } @@ -1551,7 +1551,7 @@ do_process_done(struct spa_loop *loop, send_command_request(stream); } else { struct message *msg; - stream->write_index = pd->write_index; + stream->write_index += pd->write_inc; avail = spa_ringbuffer_get_read_index(&stream->ring, &index); @@ -1568,11 +1568,13 @@ do_process_done(struct spa_loop *loop, stream, client->name, index, avail); } else { if (avail > (int32_t)stream->attr.maxlength) { + uint32_t skip = avail - stream->attr.fragsize; /* overrun, catch up to latest fragment and send it */ - pw_log_warn(NAME" %p: [%s] overrun recover read:%u avail:%d max:%u", - stream, client->name, index, avail, stream->attr.maxlength); + pw_log_warn(NAME" %p: [%s] overrun recover read:%u avail:%d max:%u skip:%u", + stream, client->name, index, avail, stream->attr.maxlength, skip); + index += skip; + stream->read_index += skip; avail = stream->attr.fragsize; - index = stream->write_index - avail; } while (avail > 0) { @@ -1593,9 +1595,9 @@ do_process_done(struct spa_loop *loop, index += towrite; avail -= towrite; + stream->read_index += towrite; } - stream->read_index = index; - spa_ringbuffer_read_update(&stream->ring, stream->read_index); + spa_ringbuffer_read_update(&stream->ring, index); } } return 0; @@ -1610,7 +1612,7 @@ static void stream_process(void *data) void *p; struct pw_buffer *buffer; struct spa_buffer *buf; - uint32_t size, minreq; + uint32_t size, minreq, index; struct process_data pd; pw_log_trace_fp(NAME" %p: process", stream); @@ -1626,7 +1628,7 @@ static void stream_process(void *data) spa_zero(pd); if (stream->direction == PW_DIRECTION_OUTPUT) { - int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index); + int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index); if (stream->rate_match) minreq = stream->rate_match->size * stream->frame_size; @@ -1648,17 +1650,20 @@ static void stream_process(void *data) if (stream->attr.prebuf == 0 && !stream->corked) { pd.missing = size; pd.playing_for = size; - pd.read_index += size; - spa_ringbuffer_read_update(&stream->ring, pd.read_index); + index += size; + pd.read_inc = size; + spa_ringbuffer_read_update(&stream->ring, index); } } else { if (avail > (int32_t)stream->attr.maxlength) { + uint32_t skip = avail - stream->attr.maxlength; /* overrun, reported by other side, here we skip * ahead to the oldest data. */ - pw_log_debug(NAME" %p: [%s] overrun read:%u avail:%d max:%u", - stream, client->name, pd.read_index, avail, - stream->attr.maxlength); - pd.read_index += avail - stream->attr.maxlength; + pw_log_debug(NAME" %p: [%s] overrun read:%u avail:%d max:%u skip:%u", + stream, client->name, index, avail, + stream->attr.maxlength, skip); + index += skip; + pd.read_inc = skip; avail = stream->attr.maxlength; } size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail); @@ -1666,46 +1671,47 @@ static void stream_process(void *data) spa_ringbuffer_read_data(&stream->ring, stream->buffer, stream->attr.maxlength, - pd.read_index % stream->attr.maxlength, + index % stream->attr.maxlength, p, size); - pd.read_index += size; - spa_ringbuffer_read_update(&stream->ring, pd.read_index); + index += size; + pd.read_inc += size; + spa_ringbuffer_read_update(&stream->ring, index); pd.playing_for = size; pd.missing = size; pd.underrun = false; } - pw_log_debug("produce %d", size); buf->datas[0].chunk->offset = 0; buf->datas[0].chunk->stride = stream->frame_size; buf->datas[0].chunk->size = size; buffer->size = size / stream->frame_size; } else { - int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index); + int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index); size = buf->datas[0].chunk->size; if (filled < 0) { /* underrun, can't really happen because we never read more * than what's available on the other side */ pw_log_warn(NAME" %p: [%s] underrun write:%u filled:%d", - stream, client->name, pd.write_index, filled); + stream, client->name, index, filled); } else if ((uint32_t)filled + size > stream->attr.maxlength) { /* overrun, can happen when the other side is not * reading fast enough. We still write our data into the * ringbuffer and expect the other side to warn and catch up. */ pw_log_debug(NAME" %p: [%s] overrun write:%u filled:%d size:%u max:%u", - stream, client->name, pd.write_index, filled, + stream, client->name, index, filled, size, stream->attr.maxlength); } spa_ringbuffer_write_data(&stream->ring, stream->buffer, stream->attr.maxlength, - pd.write_index % stream->attr.maxlength, + index % stream->attr.maxlength, SPA_PTROFF(p, buf->datas[0].chunk->offset, void), SPA_MIN(size, stream->attr.maxlength)); - pd.write_index += size; - spa_ringbuffer_write_update(&stream->ring, pd.write_index); + index += size; + pd.write_inc = size; + spa_ringbuffer_write_update(&stream->ring, index); } pw_stream_queue_buffer(stream->stream, buffer); @@ -2293,7 +2299,7 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK) return -ENOENT; - pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64 + pw_log_debug("read:%"PRIx64" write:%"PRIx64" queued:%"PRIi64" delay:%"PRIi64 " playing:%"PRIu64, stream->read_index, stream->write_index, stream->write_index - stream->read_index, stream->delay, @@ -2850,8 +2856,8 @@ static void stream_flush(struct stream *stream) pw_stream_flush(stream->stream, false); if (stream->type == STREAM_TYPE_PLAYBACK) { - stream->write_index = stream->read_index = - stream->ring.writeindex = stream->ring.readindex; + stream->ring.writeindex = stream->ring.readindex; + stream->write_index = stream->read_index; stream->missing = stream->attr.tlength - SPA_MIN(stream->requested, stream->attr.tlength); @@ -2865,8 +2871,8 @@ static void stream_flush(struct stream *stream) send_command_request(stream); } else { - stream->read_index = stream->write_index = - stream->ring.readindex = stream->ring.writeindex; + stream->ring.readindex = stream->ring.writeindex; + stream->read_index = stream->write_index; } } @@ -5482,8 +5488,8 @@ static int handle_memblock(struct client *client, struct message *msg) struct impl *impl = client->impl; struct stream *stream; uint32_t channel, flags, index; - int64_t offset; - int32_t filled, diff; + int64_t offset, diff; + int32_t filled; int res = 0; channel = ntohl(client->desc.channel); @@ -5506,27 +5512,27 @@ static int handle_memblock(struct client *client, struct message *msg) pw_log_debug("new block %p %p/%u filled:%d index:%d flags:%02x offset:%"PRIu64, msg, msg->data, msg->length, filled, index, flags, offset); - switch (flags & FLAG_SEEKMASK) { case SEEK_RELATIVE: - index += offset; - filled += offset; - stream->missing -= offset; + diff = offset; break; case SEEK_ABSOLUTE: - diff = (int32_t)(offset - (uint64_t)index); - index += diff; - filled += diff; - stream->missing -= diff; + diff = offset - (int64_t)stream->write_index; break; case SEEK_RELATIVE_ON_READ: case SEEK_RELATIVE_END: - diff = (int32_t)(offset - (uint64_t)filled); - index += diff; - filled += diff; - stream->missing -= diff; + diff = offset - (int64_t)filled; break; + default: + pw_log_warn(NAME" %p: Received memblock frame with invalid seek mode", + impl); + res = -EPROTO; + goto finish; } + index += diff; + filled += diff; + stream->write_index += diff; + stream->missing -= diff; if (filled < 0) { /* underrun, reported on reader side */ @@ -5542,8 +5548,9 @@ static int handle_memblock(struct client *client, struct message *msg) index % stream->attr.maxlength, msg->data, SPA_MIN(msg->length, stream->attr.maxlength)); - stream->write_index = index + msg->length; - spa_ringbuffer_write_update(&stream->ring, stream->write_index); + index += msg->length; + stream->write_index += msg->length; + spa_ringbuffer_write_update(&stream->ring, index); stream->requested -= SPA_MIN(msg->length, stream->requested); finish: message_free(impl, msg, false, false);