mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-01 22:58:50 -04:00
pulse-server: handle under/overrun
Make sure we don't report underrun for the first time we pull data. Send STARTED message when we leave the underrun state Send UNDERFLOW when we enter the underrun state Count underrun when corked. Pass process stats back to the mainloop after process. Decrease the pending length when we get underrun so that we can request more data.
This commit is contained in:
parent
2a0828e7a2
commit
c62b27ec75
1 changed files with 60 additions and 21 deletions
|
|
@ -150,6 +150,7 @@ struct stream {
|
|||
unsigned int muted_set:1;
|
||||
unsigned int adjust_latency:1;
|
||||
unsigned int have_time:1;
|
||||
unsigned int is_underrun:1;
|
||||
};
|
||||
|
||||
struct server {
|
||||
|
|
@ -525,6 +526,7 @@ static void stream_flush(struct stream *stream)
|
|||
stream->playing_for = 0;
|
||||
stream->underrun_for = 0;
|
||||
stream->have_time = false;
|
||||
stream->is_underrun = true;
|
||||
}
|
||||
|
||||
static void stream_free(struct stream *stream)
|
||||
|
|
@ -627,11 +629,11 @@ static int send_command_request(struct stream *stream)
|
|||
update_timing_info(stream);
|
||||
|
||||
size = writable_size(stream, 0);
|
||||
pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
|
||||
|
||||
if (size == 0)
|
||||
return 0;
|
||||
|
||||
pw_log_trace(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size);
|
||||
|
||||
msg = message_alloc(client, -1, 0);
|
||||
message_put(msg,
|
||||
TAG_U32, COMMAND_REQUEST,
|
||||
|
|
@ -870,8 +872,6 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
|
|||
case PW_STREAM_STATE_PAUSED:
|
||||
break;
|
||||
case PW_STREAM_STATE_STREAMING:
|
||||
stream->playing_for = 0;
|
||||
send_stream_started(stream);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -957,21 +957,50 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
|
|||
pw_stream_update_params(stream->stream, params, n_params);
|
||||
}
|
||||
|
||||
struct process_data {
|
||||
uint32_t underrun_for;
|
||||
uint32_t playing_for;
|
||||
uint32_t read_index;
|
||||
uint32_t write_index;
|
||||
unsigned int underrun:1;
|
||||
};
|
||||
|
||||
static int
|
||||
do_process_done(struct spa_loop *loop,
|
||||
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
|
||||
{
|
||||
struct stream *stream = user_data;
|
||||
struct client *client = stream->client;
|
||||
const struct process_data *pd = data;
|
||||
|
||||
if (stream->direction == PW_DIRECTION_OUTPUT) {
|
||||
if (stream->corked) {
|
||||
stream->underrun_for += pd->underrun_for;
|
||||
stream->playing_for = 0;
|
||||
return 0;
|
||||
}
|
||||
if (pd->underrun != stream->is_underrun) {
|
||||
stream->is_underrun = pd->underrun;
|
||||
stream->underrun_for = 0;
|
||||
stream->playing_for = 0;
|
||||
if (pd->underrun)
|
||||
send_underflow(stream, pd->read_index);
|
||||
else
|
||||
send_stream_started(stream);
|
||||
}
|
||||
stream->playing_for += pd->playing_for;
|
||||
stream->underrun_for += pd->underrun_for;
|
||||
stream->pending -= SPA_MIN(stream->pending, pd->underrun_for);
|
||||
send_command_request(stream);
|
||||
} else {
|
||||
uint32_t index;
|
||||
int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
|
||||
if (avail <= 0) {
|
||||
/* underrun */
|
||||
send_underflow(stream, index);
|
||||
if (!stream->is_underrun) {
|
||||
stream->is_underrun = true;
|
||||
send_underflow(stream, index);
|
||||
}
|
||||
} else if (avail > MAXLENGTH) {
|
||||
/* overrun */
|
||||
send_overflow(stream);
|
||||
|
|
@ -988,6 +1017,7 @@ do_process_done(struct spa_loop *loop,
|
|||
index + avail);
|
||||
send_message(client, msg);
|
||||
}
|
||||
stream->is_underrun = false;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
|
@ -1000,7 +1030,8 @@ static void stream_process(void *data)
|
|||
void *p;
|
||||
struct pw_buffer *buffer;
|
||||
struct spa_buffer *buf;
|
||||
uint32_t index, size;
|
||||
uint32_t size;
|
||||
struct process_data pd;
|
||||
|
||||
pw_log_trace(NAME" %p: process", stream);
|
||||
|
||||
|
|
@ -1012,18 +1043,20 @@ static void stream_process(void *data)
|
|||
if ((p = buf->datas[0].data) == NULL)
|
||||
return;
|
||||
|
||||
spa_zero(pd);
|
||||
if (stream->direction == PW_DIRECTION_OUTPUT) {
|
||||
int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
|
||||
int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index);
|
||||
if (avail <= 0) {
|
||||
/* underrun */
|
||||
if (stream->drain_tag) {
|
||||
if (stream->drain_tag)
|
||||
pw_stream_flush(stream->stream, true);
|
||||
} else
|
||||
pw_log_warn(NAME" %p: underrun", stream);
|
||||
|
||||
size = buf->datas[0].maxsize;
|
||||
memset(p, 0, size);
|
||||
pd.underrun_for = size;
|
||||
pd.underrun = true;
|
||||
} else if (avail > MAXLENGTH) {
|
||||
/* overrun */
|
||||
/* overrun, handled by other side */
|
||||
pw_log_warn(NAME" %p: overrun", stream);
|
||||
size = buf->datas[0].maxsize;
|
||||
memset(p, 0, size);
|
||||
|
|
@ -1032,18 +1065,19 @@ static void stream_process(void *data)
|
|||
|
||||
spa_ringbuffer_read_data(&stream->ring,
|
||||
stream->buffer, MAXLENGTH,
|
||||
index % MAXLENGTH,
|
||||
pd.read_index % MAXLENGTH,
|
||||
p, size);
|
||||
spa_ringbuffer_read_update(&stream->ring,
|
||||
index + size);
|
||||
pd.read_index + size);
|
||||
|
||||
stream->playing_for += size;
|
||||
pd.playing_for = size;
|
||||
pd.underrun = false;
|
||||
}
|
||||
buf->datas[0].chunk->offset = 0;
|
||||
buf->datas[0].chunk->stride = stream->frame_size;
|
||||
buf->datas[0].chunk->size = size;
|
||||
} else {
|
||||
int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
|
||||
int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index);
|
||||
if (filled < 0) {
|
||||
/* underrun */
|
||||
pw_log_warn(NAME" %p: underrun", stream);
|
||||
|
|
@ -1056,18 +1090,18 @@ static void stream_process(void *data)
|
|||
|
||||
spa_ringbuffer_write_data(&stream->ring,
|
||||
stream->buffer, MAXLENGTH,
|
||||
index % MAXLENGTH,
|
||||
pd.write_index % MAXLENGTH,
|
||||
SPA_MEMBER(p, buf->datas[0].chunk->offset, void),
|
||||
size);
|
||||
spa_ringbuffer_write_update(&stream->ring,
|
||||
index + size);
|
||||
pd.write_index + size);
|
||||
}
|
||||
}
|
||||
|
||||
pw_stream_queue_buffer(stream->stream, buffer);
|
||||
|
||||
pw_loop_invoke(impl->loop,
|
||||
do_process_done, 1, NULL, 0, false, stream);
|
||||
do_process_done, 1, &pd, sizeof(pd), false, stream);
|
||||
}
|
||||
|
||||
static void stream_drained(void *data)
|
||||
|
|
@ -1349,6 +1383,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui
|
|||
stream->muted_set = muted_set;
|
||||
stream->attr = attr;
|
||||
stream->dev = dev;
|
||||
stream->is_underrun = true;
|
||||
|
||||
flags = 0;
|
||||
if (no_move)
|
||||
|
|
@ -1745,6 +1780,9 @@ static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag,
|
|||
pw_stream_set_active(stream->stream, !cork);
|
||||
stream->corked = cork;
|
||||
stream->playing_for = 0;
|
||||
stream->underrun_for = 0;
|
||||
if (cork)
|
||||
stream->is_underrun = true;
|
||||
|
||||
return reply_simple_ack(client, tag);
|
||||
}
|
||||
|
|
@ -1770,8 +1808,8 @@ static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t comman
|
|||
switch (command) {
|
||||
case COMMAND_FLUSH_PLAYBACK_STREAM:
|
||||
case COMMAND_FLUSH_RECORD_STREAM:
|
||||
stream_flush(stream);
|
||||
pw_stream_flush(stream->stream, false);
|
||||
stream_flush(stream);
|
||||
send_command_request(stream);
|
||||
break;
|
||||
case COMMAND_TRIGGER_PLAYBACK_STREAM:
|
||||
|
|
@ -2707,11 +2745,12 @@ static int handle_memblock(struct client *client, struct message *msg)
|
|||
|
||||
filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
|
||||
if (filled < 0) {
|
||||
/* underrun */
|
||||
send_underflow(stream, index);
|
||||
/* underrun, reported on reader side */
|
||||
} else if (filled + msg->length > MAXLENGTH) {
|
||||
/* overrun */
|
||||
send_overflow(stream);
|
||||
stream->write_index += msg->length;
|
||||
stream->pending -= SPA_MIN(stream->pending, msg->length);
|
||||
} else {
|
||||
spa_ringbuffer_write_data(&stream->ring,
|
||||
stream->buffer, MAXLENGTH,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue