diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index d93ecc8cd..eb9619e9f 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -148,7 +148,9 @@ struct stream { uint64_t ticks_base; struct timeval timestamp; int64_t delay; - uint32_t pending; + + uint32_t missing; + uint32_t requested; struct sample_spec ss; struct channel_map map; @@ -165,6 +167,7 @@ struct stream { unsigned int adjust_latency:1; unsigned int have_time:1; unsigned int is_underrun:1; + unsigned int in_prebuf:1; }; struct server { @@ -723,8 +726,16 @@ static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, s static void stream_flush(struct stream *stream) { - spa_ringbuffer_init(&stream->ring); - stream->write_index = stream->read_index = 0; + uint32_t old; + + old = stream->ring.writeindex; + stream->ring.writeindex = stream->ring.readindex; + stream->missing += (old - stream->ring.writeindex); + stream->write_index = stream->read_index = stream->ring.writeindex; + + if (stream->attr.prebuf > 0) + stream->in_prebuf = true; + stream->playing_for = 0; stream->underrun_for = 0; stream->have_time = false; @@ -746,80 +757,33 @@ static void stream_free(struct stream *stream) free(stream); } -static inline uint32_t queued_size(const struct stream *s, uint64_t elapsed) +static bool stream_prebuf_active(struct stream *stream) { - uint64_t queued; - queued = s->write_index - SPA_MIN(s->read_index, s->write_index); - queued -= SPA_MIN(queued, elapsed); - return queued; -} + uint32_t index; + int32_t avail; -static inline uint32_t target_queue(const struct stream *s) -{ - return s->attr.tlength; -} - -static inline uint32_t wanted_size(const struct stream *s, uint32_t queued, uint32_t target) -{ - return target - SPA_MIN(queued, target); -} - -static inline uint32_t required_size(const struct stream *s) -{ - return s->attr.minreq; -} - -static inline uint32_t writable_size(const struct stream *s, uint64_t elapsed) -{ - uint32_t queued, target, wanted, required; - - queued = queued_size(s, elapsed); - target = target_queue(s); - target -= SPA_MIN(target, s->pending); - wanted = wanted_size(s, queued, target); - required = required_size(s); - - pw_log_trace("stream %p, queued:%u target:%u wanted:%u required:%u", - s, queued, target, wanted, required); - - if (s->adjust_latency) - if (queued >= wanted) - wanted = 0; - if (wanted < required) - wanted = 0; - - return wanted; -} - -static void update_timing_info(struct stream *stream) -{ - struct pw_time pwt; - int64_t delay, pos; - - pw_stream_get_time(stream->stream, &pwt); - - stream->timestamp.tv_sec = pwt.now / SPA_NSEC_PER_SEC; - stream->timestamp.tv_usec = (pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC; - - if (pwt.rate.denom > 0) { - uint64_t ticks = pwt.ticks; - if (!stream->have_time) - stream->ticks_base = ticks; - if (ticks > stream->ticks_base) - pos = ((ticks - stream->ticks_base) * stream->ss.rate / pwt.rate.denom) * stream->frame_size; - else - pos = 0; - delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; - stream->have_time = true; - } else { - pos = delay = 0; - stream->have_time = false; - } - if (stream->direction == PW_DIRECTION_OUTPUT) - stream->read_index = pos; + avail = spa_ringbuffer_get_write_index(&stream->ring, &index); + if (stream->in_prebuf) + return avail < (int32_t) stream->attr.prebuf; else - stream->write_index = pos; - stream->delay = delay; + return stream->attr.prebuf > 0 && avail >= 0; +} + +static uint32_t stream_pop_missing(struct stream *stream) +{ + uint32_t missing; + + if (stream->missing <= 0) + return 0; + + if (stream->missing < stream->attr.minreq && + !stream_prebuf_active(stream)) + return 0; + + missing = stream->missing; + stream->requested += missing; + stream->missing = 0; + return missing; } static int send_command_request(struct stream *stream) @@ -828,9 +792,7 @@ static int send_command_request(struct stream *stream) struct message *msg; uint32_t size; - update_timing_info(stream); - - size = writable_size(stream, 0); + size = stream_pop_missing(stream); pw_log_debug(NAME" %p: REQUEST channel:%d %u", stream, stream->channel, size); if (size == 0) @@ -844,8 +806,6 @@ static int send_command_request(struct stream *stream) TAG_U32, size, TAG_INVALID); - stream->pending += size; - return send_message(client, msg); } @@ -876,6 +836,8 @@ static void fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr) attr->tlength -= attr->tlength % frame_size; attr->tlength = SPA_MAX(attr->tlength, frame_size); + s->missing = attr->tlength; + if (attr->minreq == (uint32_t) -1) { uint32_t process = usec_to_bytes_round_up(DEFAULT_PROCESS_MSEC*1000, &s->ss); /* With low-latency, tlength/4 gives a decent default in all of traditional, @@ -917,22 +879,31 @@ static int reply_create_playback_stream(struct stream *stream) char latency[32]; struct pw_manager_object *peer; const char *peer_name; - bool peer_suspended; + struct spa_fraction lat; + uint64_t lat_usec; fix_playback_buffer_attr(stream, &stream->attr); + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + pw_log_info(NAME" %p: reply CREATE_PLAYBACK_STREAM tag:%u", stream, stream->create_tag); - snprintf(latency, sizeof(latency)-1, "%u/%u", - stream->attr.minreq * 2 / stream->frame_size, - stream->ss.rate); + lat.num = stream->attr.minreq * 2 / stream->frame_size; + lat.denom = stream->ss.rate; + lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; + + snprintf(latency, sizeof(latency)-1, "%u/%u", lat.num, lat.denom); items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 1)); - size = writable_size(stream, 0); + size = stream_pop_missing(stream); reply = reply_new(client, stream->create_tag); message_put(reply, @@ -941,18 +912,13 @@ static int reply_create_playback_stream(struct stream *stream) TAG_U32, size, /* missing/requested bytes */ TAG_INVALID); - stream->pending = size; - peer = find_linked(manager, stream->id, stream->direction); if (peer && is_sink(peer)) { - struct pw_node_info *info = peer->info; peer_id = peer->id; peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); - peer_suspended = info->state == PW_NODE_STATE_SUSPENDED; } else { peer_id = SPA_ID_INVALID; peer_name = NULL; - peer_suspended = false; } if (client->version >= 9) { @@ -969,12 +935,12 @@ static int reply_create_playback_stream(struct stream *stream) TAG_CHANNEL_MAP, &stream->map, TAG_U32, peer_id, /* sink index */ TAG_STRING, peer_name, /* sink name */ - TAG_BOOLEAN, peer_suspended, /* sink suspended state */ + TAG_BOOLEAN, false, /* sink suspended state */ TAG_INVALID); } if (client->version >= 13) { message_put(reply, - TAG_USEC, 0ULL, /* sink configured latency */ + TAG_USEC, lat_usec, /* sink configured latency */ TAG_INVALID); } if (client->version >= 21) { @@ -1027,13 +993,22 @@ static int reply_create_record_stream(struct stream *stream) struct pw_manager_object *peer; const char *peer_name, *name; uint32_t peer_id; - bool peer_suspended; + struct spa_fraction lat; + uint64_t lat_usec; fix_record_buffer_attr(stream, &stream->attr); - snprintf(latency, sizeof(latency)-1, "%u/%u", - stream->attr.fragsize / stream->frame_size, - stream->ss.rate); + stream->buffer = calloc(1, stream->attr.maxlength); + if (stream->buffer == NULL) + return -errno; + + spa_ringbuffer_init(&stream->ring); + + lat.num = stream->attr.fragsize / stream->frame_size, + lat.denom = stream->ss.rate; + lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom; + + snprintf(latency, sizeof(latency)-1, "%u/%u", lat.num, lat.denom); items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); pw_stream_update_properties(stream->stream, @@ -1049,7 +1024,6 @@ static int reply_create_record_stream(struct stream *stream) if (peer && is_sink_input(peer)) peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT); if (peer && is_source_or_monitor(peer)) { - struct pw_node_info *info = peer->info; name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); if (!is_source(peer)) { size_t len = (name ? strlen(name) : 5) + 10; @@ -1060,11 +1034,9 @@ static int reply_create_record_stream(struct stream *stream) peer_id = peer->id; peer_name = name; } - peer_suspended = info->state == PW_NODE_STATE_SUSPENDED; } else { peer_id = SPA_ID_INVALID; peer_name = NULL; - peer_suspended = false; } if (client->version >= 9) { @@ -1079,12 +1051,12 @@ static int reply_create_record_stream(struct stream *stream) TAG_CHANNEL_MAP, &stream->map, TAG_U32, peer_id, /* source index */ TAG_STRING, peer_name, /* source name */ - TAG_BOOLEAN, peer_suspended, /* source suspended state */ + TAG_BOOLEAN, false, /* source suspended state */ TAG_INVALID); } if (client->version >= 13) { message_put(reply, - TAG_USEC, 0ULL, /* source configured latency */ + TAG_USEC, lat_usec, /* source configured latency */ TAG_INVALID); } if (client->version >= 22) { @@ -1208,10 +1180,11 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * } struct process_data { - uint32_t underrun_for; - uint32_t playing_for; + struct pw_time pwt; uint32_t read_index; uint32_t write_index; + uint32_t underrun_for; + uint32_t playing_for; unsigned int underrun:1; }; @@ -1222,8 +1195,20 @@ do_process_done(struct spa_loop *loop, struct stream *stream = user_data; struct client *client = stream->client; const struct process_data *pd = data; + uint32_t index; + int32_t avail; + + stream->timestamp.tv_sec = pd->pwt.now / SPA_NSEC_PER_SEC; + stream->timestamp.tv_usec = (pd->pwt.now % SPA_NSEC_PER_SEC) / SPA_NSEC_PER_USEC; + if (pd->pwt.rate.denom > 0) { + stream->delay = pd->pwt.delay * SPA_USEC_PER_SEC / pd->pwt.rate.denom; + stream->delay += pd->pwt.queued * SPA_USEC_PER_SEC / stream->ss.rate; + } else { + stream->delay = 0; + } if (stream->direction == PW_DIRECTION_OUTPUT) { + stream->read_index = pd->read_index; if (stream->corked) { stream->underrun_for += pd->underrun_for; stream->playing_for = 0; @@ -1238,41 +1223,47 @@ do_process_done(struct spa_loop *loop, else send_stream_started(stream); } + stream->missing += pd->playing_for; stream->playing_for += pd->playing_for; stream->underrun_for += pd->underrun_for; - stream->pending -= SPA_MIN(stream->pending, pd->underrun_for); + send_command_request(stream); } else { - uint32_t index; - int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index); + stream->write_index = pd->write_index; + avail = spa_ringbuffer_get_read_index(&stream->ring, &index); if (avail <= 0) { /* underrun */ if (!stream->is_underrun) { stream->is_underrun = true; send_underflow(stream, index); } - } else if (avail > MAXLENGTH) { + } else if (avail > (int32_t)stream->attr.maxlength) { /* overrun */ send_overflow(stream); } else { struct message *msg; msg = message_alloc(client, stream->channel, avail); - if (msg != NULL) { - spa_ringbuffer_read_data(&stream->ring, - stream->buffer, MAXLENGTH, - index % MAXLENGTH, - msg->data, avail); - spa_ringbuffer_read_update(&stream->ring, - index + avail); - send_message(client, msg); - } + if (msg == NULL) + return -errno; + + spa_ringbuffer_read_data(&stream->ring, + stream->buffer, stream->attr.maxlength, + index % stream->attr.maxlength, + msg->data, avail); + + stream->read_index = index + avail; + spa_ringbuffer_read_update(&stream->ring, stream->read_index); + stream->is_underrun = false; + + send_message(client, msg); } } return 0; } + static void stream_process(void *data) { struct stream *stream = data; @@ -1294,6 +1285,7 @@ static void stream_process(void *data) return; spa_zero(pd); + if (stream->direction == PW_DIRECTION_OUTPUT) { int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &pd.read_index); if (avail <= 0) { @@ -1307,7 +1299,7 @@ static void stream_process(void *data) pd.underrun_for = size; pd.underrun = true; } - } else if (avail > MAXLENGTH) { + } else if (avail > (int32_t)stream->attr.maxlength) { /* overrun, handled by other side */ pw_log_warn(NAME" %p: overrun", stream); size = buf->datas[0].maxsize; @@ -1316,11 +1308,12 @@ static void stream_process(void *data) size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail); spa_ringbuffer_read_data(&stream->ring, - stream->buffer, MAXLENGTH, - pd.read_index % MAXLENGTH, + stream->buffer, stream->attr.maxlength, + pd.read_index % stream->attr.maxlength, p, size); - spa_ringbuffer_read_update(&stream->ring, - pd.read_index + size); + + pd.read_index += size; + spa_ringbuffer_read_update(&stream->ring, pd.read_index); pd.playing_for = size; pd.underrun = false; @@ -1328,30 +1321,34 @@ static void stream_process(void *data) buf->datas[0].chunk->offset = 0; buf->datas[0].chunk->stride = stream->frame_size; buf->datas[0].chunk->size = size; + buffer->size = size / stream->frame_size; } else { int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &pd.write_index); if (filled < 0) { /* underrun */ pw_log_warn(NAME" %p: underrun", stream); - } else if (filled > MAXLENGTH) { + } else if (filled > (int32_t)stream->attr.maxlength) { /* overrun */ pw_log_warn(NAME" %p: overrun", stream); } else { - uint32_t avail = MAXLENGTH - filled; + uint32_t avail = stream->attr.maxlength - filled; size = SPA_MIN(buf->datas[0].chunk->size, avail); spa_ringbuffer_write_data(&stream->ring, - stream->buffer, MAXLENGTH, - pd.write_index % MAXLENGTH, + stream->buffer, stream->attr.maxlength, + pd.write_index % stream->attr.maxlength, SPA_MEMBER(p, buf->datas[0].chunk->offset, void), size); + + pd.write_index += size; spa_ringbuffer_write_update(&stream->ring, - pd.write_index + size); + pd.write_index); } } - pw_stream_queue_buffer(stream->stream, buffer); + pw_stream_get_time(stream->stream, &pd.pwt); + pw_loop_invoke(impl->loop, do_process_done, 1, &pd, sizeof(pd), false, stream); } @@ -1561,12 +1558,6 @@ static int do_create_playback_stream(struct client *client, uint32_t command, ui if (stream->channel == SPA_ID_INVALID) goto error_errno; - stream->buffer = calloc(1, MAXLENGTH); - if (stream->buffer == NULL) - goto error_errno; - - spa_ringbuffer_init(&stream->ring); - stream->direction = PW_DIRECTION_OUTPUT; stream->create_tag = tag; stream->ss = ss; @@ -1781,7 +1772,7 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint if (stream->channel == SPA_ID_INVALID) goto error_errno; - stream->buffer = calloc(1, MAXLENGTH); + stream->buffer = calloc(1, stream->attr.maxlength); if (stream->buffer == NULL) goto error_errno; @@ -1906,8 +1897,6 @@ static int do_get_playback_latency(struct client *client, uint32_t command, uint if (stream == NULL) return -EINVAL; - update_timing_info(stream); - pw_log_debug("read:%"PRIi64" write:%"PRIi64" queued:%"PRIi64" delay:%"PRIi64, stream->read_index, stream->write_index, stream->write_index - stream->read_index, stream->delay); @@ -1953,8 +1942,6 @@ static int do_get_record_latency(struct client *client, uint32_t command, uint32 if (stream == NULL) return -EINVAL; - update_timing_info(stream); - reply = reply_new(client, tag); message_put(reply, TAG_USEC, 0, /* monitor latency */ @@ -4069,7 +4056,7 @@ static int handle_memblock(struct client *client, struct message *msg) uint32_t channel, flags, index; int64_t offset; int32_t filled; - int res; + int res = 0; channel = ntohl(client->desc.channel); offset = (int64_t) ( @@ -4087,29 +4074,25 @@ static int handle_memblock(struct client *client, struct message *msg) goto finish; } - filled = spa_ringbuffer_get_write_index(&stream->ring, &index); pw_log_debug("new block %p %p/%u filled:%d index:%d", msg, msg->data, msg->length, filled, index); if (filled < 0) { /* underrun, reported on reader side */ - } else if (filled + msg->length > MAXLENGTH) { + } else if (filled + msg->length > stream->attr.maxlength) { /* overrun */ send_overflow(stream); - stream->write_index += msg->length; - stream->pending -= SPA_MIN(stream->pending, msg->length); } else { spa_ringbuffer_write_data(&stream->ring, - stream->buffer, MAXLENGTH, - index % MAXLENGTH, + stream->buffer, stream->attr.maxlength, + index % stream->attr.maxlength, msg->data, msg->length); - spa_ringbuffer_write_update(&stream->ring, - index + msg->length); - stream->write_index += msg->length; - stream->pending -= SPA_MIN(stream->pending, msg->length); + } - res = 0; + stream->write_index = index + msg->length; + spa_ringbuffer_write_update(&stream->ring, stream->write_index); + stream->requested -= msg->length; finish: message_free(client, msg, false, false); return res;