diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 523f821eb..637c1f721 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -227,7 +227,7 @@ struct impl { bool mcast_loop; float min_ptime; float max_ptime; - uint32_t pbytes; + uint32_t psamples; struct sockaddr_storage src_addr; socklen_t src_len; @@ -245,7 +245,7 @@ struct impl { struct spa_audio_info_raw info; const struct format_info *format_info; - uint32_t frame_size; + uint32_t stride; int payload; uint16_t seq; uint32_t ssrc; @@ -282,20 +282,21 @@ set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, static void flush_packets(struct impl *impl) { int32_t avail; - uint32_t index; + uint32_t stride, timestamp; struct iovec iov[3]; struct msghdr msg; ssize_t n; struct rtp_header header; int32_t tosend; - avail = spa_ringbuffer_get_read_index(&impl->ring, &index); - - tosend = impl->pbytes; + avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); + tosend = impl->psamples; if (avail < tosend) return; + stride = impl->stride; + spa_zero(header); header.v = 2; header.pt = impl->payload; @@ -314,14 +315,14 @@ static void flush_packets(struct impl *impl) while (avail >= tosend) { header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->ts_offset + index / impl->frame_size); + header.timestamp = htonl(impl->ts_offset + timestamp); set_iovec(&impl->ring, impl->buffer, BUFFER_SIZE, - index & BUFFER_MASK, - &iov[1], tosend); + (timestamp * stride) & BUFFER_MASK, + &iov[1], tosend * stride); - pw_log_trace("sending %d index:%d", tosend, index); + pw_log_trace("sending %d timestamp:%d", tosend, timestamp); n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); if (n < 0) { switch (errno) { @@ -338,10 +339,10 @@ static void flush_packets(struct impl *impl) impl->seq++; - index += tosend; + timestamp += tosend; avail -= tosend; } - spa_ringbuffer_read_update(&impl->ring, index); + spa_ringbuffer_read_update(&impl->ring, timestamp); } static void stream_process(void *data) @@ -349,7 +350,7 @@ static void stream_process(void *data) struct impl *impl = data; struct pw_buffer *buf; struct spa_data *d; - uint32_t index, expected_index; + uint32_t timestamp, expected_timestamp, stride; int32_t filled, wanted; if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { @@ -358,33 +359,37 @@ static void stream_process(void *data) } d = buf->buffer->datas; - wanted = d[0].chunk->size; + stride = impl->stride; + wanted = d[0].chunk->size / stride; - filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_index); - if (impl->io_position) - index = impl->io_position->clock.position * impl->frame_size; + filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); + if (SPA_LIKELY(impl->io_position)) + timestamp = impl->io_position->clock.position; else - index = expected_index; + timestamp = expected_timestamp; + if (impl->sync) { + if (expected_timestamp != timestamp) { + pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); + impl->sync = false; + } else if ((filled + wanted) * stride > (int32_t)BUFFER_SIZE) { + pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride); + impl->sync = false; + } + } if (!impl->sync) { - pw_log_info("sync %u", index); - impl->ring.readindex = impl->ring.writeindex = index; + pw_log_info("sync to timestamp %u", timestamp); + impl->ring.readindex = impl->ring.writeindex = timestamp; impl->sync = true; - } else if (expected_index != index) { - pw_log_warn("expected %u != index %u", expected_index, index); - impl->ring.readindex = impl->ring.writeindex = index; - } else if (filled + wanted > (int32_t)BUFFER_SIZE) { - pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE); - impl->ring.readindex = impl->ring.writeindex = index; } spa_ringbuffer_write_data(&impl->ring, impl->buffer, BUFFER_SIZE, - index & BUFFER_MASK, - d[0].data, wanted); - index += wanted; - spa_ringbuffer_write_update(&impl->ring, index); + (timestamp * stride) & BUFFER_MASK, + d[0].data, wanted * stride); + timestamp += wanted; + spa_ringbuffer_write_update(&impl->ring, timestamp); pw_stream_queue_buffer(impl->stream, buf); @@ -521,8 +526,7 @@ static int setup_stream(struct impl *impl) if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) { pw_properties_setf(props, PW_KEY_NODE_LATENCY, - "%d/%d", impl->pbytes / impl->frame_size, - impl->info.rate); + "%d/%d", impl->psamples, impl->info.rate); } pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", impl->info.rate); @@ -634,7 +638,7 @@ static void send_sap(struct impl *impl, bool bye) impl->port, impl->payload, impl->payload, impl->format_info->mime, impl->info.rate, impl->info.channels, - (impl->pbytes / impl->frame_size) * 1000 / impl->info.rate); + impl->psamples * 1000 / impl->info.rate); if (impl->ts_refclk[0] != '\0') { spa_strbuf_append(&buf, @@ -837,7 +841,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct impl *impl; struct pw_properties *props = NULL, *stream_props = NULL; uint32_t id = pw_global_get_id(pw_impl_module_get_global(module)); - uint32_t pid = getpid(), port, min_bytes, max_bytes; + uint32_t pid = getpid(), port, min_samples, max_samples; int64_t ts_offset; char addr[64]; const char *str; @@ -912,7 +916,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) res = -EINVAL; goto out; } - impl->frame_size = impl->format_info->size * impl->info.channels; + impl->stride = impl->format_info->size * impl->info.channels; impl->msg_id_hash = rand(); impl->ntp = (uint32_t) time(NULL) + 2208988800U; @@ -965,11 +969,11 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (!spa_atof(str, &impl->max_ptime)) impl->max_ptime = DEFAULT_MAX_PTIME; - min_bytes = (impl->min_ptime * impl->info.rate / 1000) * impl->frame_size; - max_bytes = (impl->max_ptime * impl->info.rate / 1000) * impl->frame_size; + min_samples = impl->min_ptime * impl->info.rate / 1000; + max_samples = impl->max_ptime * impl->info.rate / 1000; - impl->pbytes = SPA_ROUND_DOWN(impl->mtu, impl->frame_size); - impl->pbytes = SPA_CLAMP(impl->pbytes, min_bytes, max_bytes); + impl->psamples = impl->mtu / impl->stride; + impl->psamples = SPA_CLAMP(impl->psamples, min_samples, max_samples); if ((str = pw_properties_get(props, "sess.name")) == NULL) pw_properties_setf(props, "sess.name", "PipeWire RTP Stream on %s", @@ -986,7 +990,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) pw_properties_setf(stream_props, "rtp.mtu", "%u", impl->mtu); pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl); pw_properties_setf(stream_props, "rtp.ptime", "%u", - (impl->pbytes / impl->frame_size) * 1000 / impl->info.rate); + impl->psamples * 1000 / impl->info.rate); impl->core = pw_context_get_object(impl->module_context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index f0141b9f4..8fb082e86 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -250,9 +250,7 @@ struct session { struct spa_io_position *position; struct spa_dll dll; uint32_t target_buffer; - uint32_t last_packet_size; float max_error; - unsigned buffering:1; unsigned first:1; unsigned receiving:1; unsigned direct_timestamp:1; @@ -270,8 +268,8 @@ static void stream_process(void *data) struct session *sess = data; struct pw_buffer *buf; struct spa_data *d; - uint32_t index, target_buffer; - int32_t avail, wanted; + uint32_t wanted, timestamp, target_buffer, stride, maxsize; + int32_t avail; if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); @@ -279,42 +277,53 @@ static void stream_process(void *data) } d = buf->buffer->datas; - wanted = buf->requested ? - SPA_MIN(buf->requested * sess->info.stride, d[0].maxsize) - : d[0].maxsize; + stride = sess->info.stride; + + maxsize = d[0].maxsize / stride; + wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; if (sess->position && sess->direct_timestamp) { + /* in direct mode, read directly from the timestamp index, + * because sender and receiver are in sync, this would keep + * target_buffer of bytes available. */ spa_ringbuffer_read_update(&sess->ring, - sess->position->clock.position * sess->info.stride); + sess->position->clock.position); } - avail = spa_ringbuffer_get_read_index(&sess->ring, &index); + avail = spa_ringbuffer_get_read_index(&sess->ring, ×tamp); - target_buffer = sess->target_buffer + sess->last_packet_size / 2; + target_buffer = sess->target_buffer; - if (avail < wanted || sess->buffering) { - memset(d[0].data, 0, wanted); - if (!sess->buffering && sess->have_sync) { - pw_log_debug("underrun %u/%u < %u, buffering...", - avail, target_buffer, wanted); - sess->buffering = true; + if (avail < (int32_t)wanted) { + enum spa_log_level level; + memset(d[0].data, 0, wanted * stride); + if (sess->have_sync) { + sess->have_sync = false; + level = SPA_LOG_LEVEL_WARN; + } else { + level = SPA_LOG_LEVEL_DEBUG; } + pw_log(level, "underrun %d/%u < %u", + avail, target_buffer, wanted); } else { float error, corr; - if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE)) { + if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) { pw_log_warn("overrun %u > %u", avail, target_buffer * 8); - index += avail - target_buffer; + timestamp += avail - target_buffer; avail = target_buffer; - } else { - if (sess->first) { - if ((uint32_t)avail > target_buffer) { - uint32_t skip = avail - target_buffer; - pw_log_debug("first: avail:%d skip:%u target:%u", + } else if (sess->first) { + if ((uint32_t)avail > target_buffer) { + uint32_t skip = avail - target_buffer; + pw_log_debug("first: avail:%d skip:%u target:%u", avail, skip, target_buffer); - index += skip; - avail = target_buffer; - } - sess->first = false; + timestamp += skip; + avail = target_buffer; } + sess->first = false; + } + if (!sess->direct_timestamp) { + /* when not using direct timestamp and clocks are not + * in sync, try to adjust our playback rate to keep the + * requested target_buffer bytes in the ringbuffer */ error = (float)target_buffer - (float)avail; error = SPA_CLAMP(error, -sess->max_error, sess->max_error); @@ -323,24 +332,25 @@ static void stream_process(void *data) pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, target_buffer, error, corr); - if (sess->rate_match && !sess->direct_timestamp) { - SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); + if (sess->rate_match) { + SPA_FLAG_SET(sess->rate_match->flags, + SPA_IO_RATE_MATCH_FLAG_ACTIVE); sess->rate_match->rate = 1.0f / corr; } } spa_ringbuffer_read_data(&sess->ring, sess->buffer, BUFFER_SIZE, - index & BUFFER_MASK, - d[0].data, wanted); + (timestamp * stride) & BUFFER_MASK, + d[0].data, wanted * stride); - index += wanted; - spa_ringbuffer_read_update(&sess->ring, index); + timestamp += wanted; + spa_ringbuffer_read_update(&sess->ring, timestamp); } - d[0].chunk->size = wanted; - d[0].chunk->stride = sess->info.stride; + d[0].chunk->size = wanted * stride; + d[0].chunk->stride = stride; d[0].chunk->offset = 0; - buf->size = wanted / sess->info.stride; + buf->size = wanted; pw_stream_queue_buffer(sess->stream, buf); } @@ -374,7 +384,7 @@ on_rtp_io(void *data, int fd, uint32_t mask) uint8_t buffer[2048], *payload; if (mask & SPA_IO_IN) { - uint32_t index, expected_index, timestamp; + uint32_t stride, read, timestamp, expected_timestamp, samples; uint16_t seq; int32_t filled; @@ -405,60 +415,45 @@ on_rtp_io(void *data, int fd, uint32_t mask) sess->expected_seq = seq + 1; sess->have_seq = true; - len = SPA_ROUND_DOWN(len - hlen, sess->info.stride); + stride = sess->info.stride; + samples = (len - hlen) / stride; payload = &buffer[hlen]; - filled = spa_ringbuffer_get_write_index(&sess->ring, &index); + filled = spa_ringbuffer_get_write_index(&sess->ring, &expected_timestamp); - timestamp = ntohl(hdr->timestamp) - sess->info.ts_offset; - expected_index = timestamp * sess->info.stride; - - if (sess->direct_timestamp) - expected_index += sess->target_buffer; + read = ntohl(hdr->timestamp) - sess->info.ts_offset; + /* we always write to timestamp + delay */ + timestamp = read + sess->target_buffer; if (!sess->have_sync) { - sess->ring.readindex = sess->ring.writeindex = - index = expected_index; - filled = 0; - sess->have_sync = true; - sess->buffering = true; - pw_log_debug("sync to timestamp %u", index); + pw_log_info("sync to timestamp %u", read); + /* we read from timestamp, keeping target_buffer of data + * in the ringbuffer. */ + sess->ring.readindex = read; + sess->ring.writeindex = timestamp; + filled = sess->target_buffer; spa_dll_init(&sess->dll); spa_dll_set_bw(&sess->dll, SPA_DLL_BW_MIN, 128, sess->info.info.rate); - - } else if (expected_index != index) { + sess->have_sync = true; + } else if (expected_timestamp != timestamp) { pw_log_debug("unexpected timestamp (%u != %u)", - index / sess->info.stride, - expected_index / sess->info.stride); - index = expected_index; - filled = 0; + timestamp, expected_timestamp); } - if (filled + len > BUFFER_SIZE) { - pw_log_debug("got rtp, capture overrun %u %zd", filled, len); + if (filled + samples > BUFFER_SIZE / stride) { + pw_log_debug("capture overrun %u + %u > %u", filled, samples, + BUFFER_SIZE / stride); sess->have_sync = false; } else { - uint32_t target_buffer; - - pw_log_trace("got rtp packet len:%zd", len); + pw_log_trace("got samples:%u", samples); spa_ringbuffer_write_data(&sess->ring, sess->buffer, BUFFER_SIZE, - index & BUFFER_MASK, - payload, len); - index += len; - filled += len; - spa_ringbuffer_write_update(&sess->ring, index); - - sess->last_packet_size = len; - target_buffer = sess->target_buffer + len/2; - - if (sess->buffering && (uint32_t)filled > target_buffer) { - sess->buffering = false; - pw_log_debug("buffering done %u > %u", - filled, target_buffer); - } + (timestamp * stride) & BUFFER_MASK, + payload, (samples * stride)); + timestamp += samples; + spa_ringbuffer_write_update(&sess->ring, timestamp); } sess->receiving = true; } @@ -559,9 +554,9 @@ error: return res; } -static uint32_t msec_to_bytes(struct sdp_info *info, uint32_t msec) +static uint32_t msec_to_samples(struct sdp_info *info, uint32_t msec) { - return msec * info->stride * info->info.rate / 1000; + return msec * info->info.rate / 1000; } static void session_free(struct session *sess) @@ -736,12 +731,12 @@ static int session_new(struct impl *impl, struct sdp_info *info) sess_latency_msec = pw_properties_get_uint32(props, "sess.latency.msec", impl->sess_latency_msec); - session->target_buffer = msec_to_bytes(info, sess_latency_msec); - session->max_error = msec_to_bytes(info, ERROR_MSEC); + session->target_buffer = msec_to_samples(info, sess_latency_msec); + session->max_error = msec_to_samples(info, ERROR_MSEC); pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", info->info.rate); pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", - session->target_buffer / (2 * info->stride), info->info.rate); + session->target_buffer / 2, info->info.rate); spa_dll_init(&session->dll); spa_dll_set_bw(&session->dll, SPA_DLL_BW_MIN, 128, session->info.info.rate);