diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 35ee410f1..8a93697e4 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -239,8 +239,10 @@ struct session { struct spa_io_rate_match *rate_match; struct spa_dll dll; uint32_t target_buffer; + uint32_t last_packet_size; float max_error; unsigned buffering:1; + unsigned first:1; }; static void stream_destroy(void *d) @@ -255,8 +257,8 @@ static void stream_process(void *data) struct session *sess = data; struct pw_buffer *buf; struct spa_data *d; - uint32_t index; - int32_t avail, wanted; + uint32_t index, target_buffer; + int32_t avail, wanted; if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { pw_log_debug("Out of stream buffers: %m"); @@ -270,27 +272,39 @@ static void stream_process(void *data) avail = spa_ringbuffer_get_read_index(&sess->ring, &index); - if (avail < wanted || sess->buffering) { - memset(d[0].data, 0, wanted); + target_buffer = sess->target_buffer + sess->last_packet_size / 2; + + if (avail < wanted || sess->buffering) { + memset(d[0].data, 0, wanted); if (!sess->buffering && sess->have_sync) { pw_log_debug("underrun %u/%u < %u, buffering...", - avail, sess->target_buffer, wanted); + avail, target_buffer, wanted); sess->buffering = true; } - } else { + } else { float error, corr; - if (avail > (int32_t)BUFFER_SIZE) { - index += avail - sess->target_buffer; - avail = sess->target_buffer; - pw_log_warn("overrun %u > %u", avail, BUFFER_SIZE); + if (avail > (int32_t)SPA_MIN(target_buffer * 3, BUFFER_SIZE)) { + index += avail - target_buffer; + avail = target_buffer; + pw_log_warn("overrun %u > %u", avail, target_buffer * 3); } else { - error = (float)sess->target_buffer - (float)avail; + if (sess->first) { + if ((uint32_t)avail > target_buffer) { + uint32_t skip = avail - target_buffer; + pw_log_debug("first: avail:%d skip:%u target:%u", + avail, skip, target_buffer); + index += skip; + avail = target_buffer; + } + sess->first = false; + } + error = (float)target_buffer - (float)avail; error = SPA_CLAMP(error, -sess->max_error, sess->max_error); corr = spa_dll_update(&sess->dll, error); pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, - sess->target_buffer, error, corr); + target_buffer, error, corr); if (sess->rate_match) { SPA_FLAG_SET(sess->rate_match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE); @@ -300,16 +314,16 @@ static void stream_process(void *data) spa_ringbuffer_read_data(&sess->ring, sess->buffer, BUFFER_SIZE, - index & BUFFER_MASK, - d[0].data, wanted); + index & BUFFER_MASK, + d[0].data, wanted); - index += wanted; - spa_ringbuffer_read_update(&sess->ring, index); - } - d[0].chunk->size = wanted; - d[0].chunk->stride = sess->info.stride; - d[0].chunk->offset = 0; - buf->size = wanted / sess->info.stride; + index += wanted; + spa_ringbuffer_read_update(&sess->ring, index); + } + d[0].chunk->size = wanted; + d[0].chunk->stride = sess->info.stride; + d[0].chunk->offset = 0; + buf->size = wanted / sess->info.stride; pw_stream_queue_buffer(sess->stream, buf); } @@ -423,7 +437,9 @@ on_rtp_io(void *data, int fd, uint32_t mask) pw_log_debug("got rtp, capture overrun %u %zd", filled, len); sess->have_sync = false; } else { - pw_log_trace("got rtp, buffering"); + uint32_t target_buffer; + + pw_log_trace("got rtp packet len:%zd", len); spa_ringbuffer_write_data(&sess->ring, sess->buffer, BUFFER_SIZE, @@ -433,10 +449,13 @@ on_rtp_io(void *data, int fd, uint32_t mask) filled += len; spa_ringbuffer_write_update(&sess->ring, index); - if (sess->buffering && (uint32_t)filled > sess->target_buffer) { + sess->last_packet_size = len; + target_buffer = sess->target_buffer + len/2; + + if (sess->buffering && (uint32_t)filled > target_buffer) { sess->buffering = false; pw_log_debug("buffering done %u > %u", - filled, sess->target_buffer); + filled, target_buffer); } } } @@ -603,6 +622,7 @@ static int session_new(struct impl *impl, struct sdp_info *info) return -errno; session->info = *info; + session->first = true; props = pw_properties_copy(impl->stream_props); if (props == NULL) {