diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index fe4e2830c..389eef084 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -47,6 +47,7 @@ #include #include #include +#include #include "pipewire/pipewire.h" @@ -114,7 +115,9 @@ struct stream { struct pw_stream *stream; struct spa_hook stream_listener; - struct spa_list messages; + struct spa_ringbuffer ring; + void *buffer; + int64_t read_index; int64_t write_index; uint64_t underrun_for; @@ -295,6 +298,44 @@ static int reply_error(struct client *client, uint32_t tag, uint32_t error) return send_message(client, reply); } +static int send_underflow(struct stream *stream, int64_t offset) +{ + struct client *client = stream->client; + struct message *reply; + + pw_log_warn(NAME" %p: UNDERFLOW channel:%u offset:%"PRIi64, + client, stream->channel, offset); + + reply = message_alloc(client, -1, 0); + message_put(reply, + TAG_U32, COMMAND_UNDERFLOW, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_INVALID); + if (client->version >= 23) { + message_put(reply, + TAG_S64, offset, + TAG_INVALID); + } + return send_message(client, reply); +} + +static int send_overflow(struct stream *stream) +{ + struct client *client = stream->client; + struct message *reply; + + pw_log_warn(NAME" %p: OVERFLOW channel:%u", client, stream->channel); + + reply = message_alloc(client, -1, 0); + message_put(reply, + TAG_U32, COMMAND_OVERFLOW, + TAG_U32, -1, + TAG_U32, stream->channel, + TAG_INVALID); + return send_message(client, reply); +} + static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; @@ -385,9 +426,7 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s static void stream_flush(struct stream *stream) { - struct message *msg; - spa_list_consume(msg, &stream->messages, link) - message_free(stream->client, msg, false); + spa_ringbuffer_init(&stream->ring); stream->write_index = stream->read_index = 0; stream->playing_for = 0; stream->underrun_for = 0; @@ -403,6 +442,8 @@ static void stream_free(struct stream *stream) spa_hook_remove(&stream->stream_listener); pw_stream_destroy(stream->stream); } + if (stream->buffer) + free(stream->buffer); free(stream); } @@ -450,12 +491,45 @@ static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed) return wanted; } +static void update_timing_info(struct stream *stream) +{ + struct pw_time pwt; + int64_t delay, pos; + + pw_stream_get_time(stream->stream, &pwt); + + stream->timestamp.tv_sec = pwt.now / SPA_NSEC_PER_SEC; + stream->timestamp.tv_usec = (pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC; + + if (pwt.rate.denom > 0) { + uint64_t ticks = pwt.ticks; + if (!stream->have_time) + stream->ticks_base = ticks; + if (ticks > stream->ticks_base) + pos = ((ticks - stream->ticks_base) * stream->ss.rate / pwt.rate.denom) * stream->frame_size; + else + pos = 0; + delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; + stream->have_time = true; + } else { + pos = delay = 0; + stream->have_time = false; + } + if (stream->direction == PW_DIRECTION_OUTPUT) + stream->read_index = pos; + else + stream->write_index = pos; + stream->delay = delay; +} + static int send_command_request(struct stream *stream) { struct client *client = stream->client; struct message *msg; uint32_t size; + update_timing_info(stream); + size = writable_size(stream, 0); if (size == 0) return 0; @@ -473,6 +547,7 @@ static int send_command_request(struct stream *stream) return send_message(client, msg); } + static int reply_create_playback_stream(struct stream *stream) { struct client *client = stream->client; @@ -670,47 +745,52 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * pw_stream_update_params(stream->stream, params, n_params); } -static void update_timing_info(struct stream *stream) +static int +do_process_done(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) { - struct pw_time pwt; - int64_t delay, pos; + struct stream *stream = user_data; + struct client *client = stream->client; - pw_stream_get_time(stream->stream, &pwt); + if (stream->direction == PW_DIRECTION_OUTPUT) { + send_command_request(stream); + } else { + uint32_t index; + int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + if (avail < 0) { + /* underrun */ + send_underflow(stream, index); + } else if (avail > MAXLENGTH) { + /* overrun */ + send_overflow(stream); + } else { + struct message *msg; - stream->timestamp.tv_sec = pwt.now / SPA_NSEC_PER_SEC; - stream->timestamp.tv_usec = (pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC; - - if (pwt.rate.denom > 0) { - uint64_t ticks = pwt.ticks; - if (!stream->have_time) - stream->ticks_base = ticks; - if (ticks > stream->ticks_base) - pos = ((ticks - stream->ticks_base) * stream->ss.rate / pwt.rate.denom) * stream->frame_size; - else - pos = 0; - delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; - stream->have_time = true; - } else { - pos = delay = 0; - stream->have_time = false; - } - if (stream->direction == PW_DIRECTION_OUTPUT) - stream->read_index = pos; - else - stream->write_index = pos; - stream->delay = delay; + msg = message_alloc(client, stream->channel, avail); + if (msg != NULL) { + spa_ringbuffer_read_data(&stream->ring, + stream->buffer, MAXLENGTH, + index % MAXLENGTH, + msg->data, avail); + spa_ringbuffer_read_update(&stream->ring, + index + avail); + send_message(client, msg); + } + } + } + return 0; } -static void stream_process_record(struct stream *stream) +static void stream_process(void *data) { - struct client *client = stream->client; + struct stream *stream = data; + struct impl *impl = stream->impl; + void *p; struct pw_buffer *buffer; struct spa_buffer *buf; - uint32_t size; - struct message *msg; - void *p; + uint32_t index, size; - update_timing_info(stream); + pw_log_trace(NAME" %p: process", stream); buffer = pw_stream_dequeue_buffer(stream->stream); if (buffer == NULL) @@ -720,72 +800,59 @@ static void stream_process_record(struct stream *stream) if ((p = buf->datas[0].data) == NULL) return; - size = buf->datas[0].chunk->size; + if (stream->direction == PW_DIRECTION_OUTPUT) { + int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + if (avail < 0) { + /* underrun */ + pw_log_warn(NAME" %p: underrun", stream); + size = buf->datas[0].maxsize; + memset(p, 0, size); + } else if (avail > MAXLENGTH) { + /* overrun */ + pw_log_warn(NAME" %p: overrun", stream); + size = buf->datas[0].maxsize; + memset(p, 0, size); + } else { + size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail); - msg = message_alloc(client, stream->channel, size); - if (msg != NULL) { - memcpy(msg->data, - SPA_MEMBER(p, buf->datas[0].chunk->offset, void), - size); - send_message(client, msg); - } - stream->write_index += size; - pw_stream_queue_buffer(stream->stream, buffer); -} - -static void stream_process_playback(struct stream *stream) -{ - struct client *client = stream->client; - - pw_log_trace(NAME" %p: process", stream); - - update_timing_info(stream); - - while (!spa_list_is_empty(&stream->messages)) { - struct pw_buffer *buffer; - struct spa_buffer *buf; - struct message *msg; - uint32_t size, maxsize; - void *p; - - buffer = pw_stream_dequeue_buffer(stream->stream); - if (buffer == NULL) - break; - - buf = buffer->buffer; - if ((p = buf->datas[0].data) == NULL) - break; - - msg = spa_list_first(&stream->messages, struct message, link); - maxsize = buf->datas[0].maxsize; - size = SPA_MIN(msg->length - msg->offset, maxsize); - memcpy(p, SPA_MEMBER(msg->data, msg->offset, void), size); - - pw_log_trace(NAME" %p: process message %p %d-%d/%d", - stream, msg, msg->offset, size, msg->length); - - stream->read_index += size; - stream->playing_for += size; - msg->offset += size; - if (msg->offset >= msg->length) - message_free(client, msg, false); + spa_ringbuffer_read_data(&stream->ring, + stream->buffer, MAXLENGTH, + index % MAXLENGTH, + p, size); + spa_ringbuffer_read_update(&stream->ring, + index + size); + stream->playing_for += size; + } buf->datas[0].chunk->offset = 0; buf->datas[0].chunk->stride = stream->frame_size; buf->datas[0].chunk->size = size; + } else { + int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + if (filled < 0) { + /* underrun */ + pw_log_warn(NAME" %p: underrun", stream); + } else if (filled > MAXLENGTH) { + /* overrun */ + pw_log_warn(NAME" %p: overrun", stream); + } else { + uint32_t avail = MAXLENGTH - filled; + size = SPA_MIN(buf->datas[0].chunk->size, avail); - pw_stream_queue_buffer(stream->stream, buffer); + spa_ringbuffer_write_data(&stream->ring, + stream->buffer, MAXLENGTH, + index % MAXLENGTH, + SPA_MEMBER(p, buf->datas[0].chunk->offset, void), + size); + spa_ringbuffer_write_update(&stream->ring, + index + size); + } } - send_command_request(stream); -} -static void stream_process(void *data) -{ - struct stream *stream = data; - if (stream->direction == PW_DIRECTION_OUTPUT) - stream_process_playback(stream); - else - stream_process_record(stream); + pw_stream_queue_buffer(stream->stream, buffer); + + pw_loop_invoke(impl->loop, + do_process_done, 1, NULL, 0, false, stream); } static void stream_drained(void *data) @@ -889,7 +956,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui struct pw_properties *props = NULL; uint8_t n_formats = 0; struct format_info *formats = NULL; - struct stream *stream; + struct stream *stream = NULL; struct spa_audio_info_raw info; uint32_t n_params; const struct spa_pod *params[1]; @@ -1005,7 +1072,13 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui stream->corked = corked; stream->adjust_latency = adjust_latency; stream->channel = pw_map_insert_new(&client->streams, stream); - spa_list_init(&stream->messages); + + stream->buffer = calloc(1, MAXLENGTH); + if (stream->buffer == NULL) { + res = -errno; + goto error; + } + spa_ringbuffer_init(&stream->ring); stream->direction = PW_DIRECTION_OUTPUT; stream->create_tag = tag; @@ -1045,6 +1118,7 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui PW_DIRECTION_OUTPUT, SPA_ID_INVALID, PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_RT_PROCESS | PW_STREAM_FLAG_MAP_BUFFERS, params, n_params); @@ -1113,7 +1187,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint struct pw_properties *props = NULL; uint8_t n_formats = 0; struct format_info *formats = NULL; - struct stream *stream; + struct stream *stream = NULL; struct spa_audio_info_raw info; uint32_t n_params; const struct spa_pod *params[1]; @@ -1222,7 +1296,14 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint stream->corked = corked; stream->adjust_latency = adjust_latency; stream->channel = pw_map_insert_new(&client->streams, stream); - spa_list_init(&stream->messages); + + stream->buffer = calloc(1, MAXLENGTH); + if (stream->buffer == NULL) { + res = -errno; + goto error; + } + spa_ringbuffer_init(&stream->ring); + stream->create_tag = tag; stream->ss = ss; stream->map = map; @@ -1257,6 +1338,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint PW_DIRECTION_INPUT, SPA_ID_INVALID, PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_RT_PROCESS | PW_STREAM_FLAG_MAP_BUFFERS, params, n_params); @@ -1312,6 +1394,7 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint if (stream == NULL) return -EINVAL; + update_timing_info(stream); pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64, stream->read_index, stream->write_index, @@ -1343,7 +1426,7 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 struct impl *impl = client->impl; struct message *reply; uint32_t channel; - struct timeval tv, now; + struct timeval tv; struct stream *stream; int res; @@ -1358,7 +1441,7 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 if (stream == NULL) return -EINVAL; - gettimeofday(&now, NULL); + update_timing_info(stream); reply = reply_new(client, tag); message_put(reply, @@ -2284,8 +2367,9 @@ static int handle_memblock(struct client *client, struct message *msg) { struct impl *impl = client->impl; struct stream *stream; - uint32_t channel, flags; + uint32_t channel, flags, index; int64_t offset; + int32_t filled; channel = ntohl(client->desc.channel); offset = (int64_t) ( @@ -2301,9 +2385,25 @@ static int handle_memblock(struct client *client, struct message *msg) if (stream == NULL) return -EINVAL; - pw_log_debug("new block %p %p", msg, msg->data); - spa_list_append(&stream->messages, &msg->link); - stream->write_index += msg->length; + pw_log_debug("new block %p %p/%u", msg, msg->data, msg->length); + + filled = spa_ringbuffer_get_write_index(&stream->ring, &index); + if (filled < 0) { + /* underrun */ + send_underflow(stream, index); + } else if (filled + msg->length > MAXLENGTH) { + /* overrun */ + send_overflow(stream); + } else { + spa_ringbuffer_write_data(&stream->ring, + stream->buffer, MAXLENGTH, + index % MAXLENGTH, + msg->data, msg->length); + spa_ringbuffer_write_update(&stream->ring, + index + msg->length); + stream->write_index += msg->length; + } + message_free(client, msg, false); return 0; }