diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index a7807fa0b..123b8b9a0 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -252,7 +252,7 @@ static void rtp_audio_process_playback(void *data) avail = target_buffer; } impl->first = false; - } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE / stride)) { + } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, impl->buffer_size / stride)) { pw_log_warn("receiver read overrun %u > %u", avail, target_buffer * 8); timestamp += avail - target_buffer; avail = target_buffer; @@ -392,7 +392,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, spa_dll_init(&impl->dll); spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); - memset(impl->buffer, 0, BUFFER_SIZE); + memset(impl->buffer, 0, impl->buffer_size); impl->have_sync = true; } else if (expected_write != write) { pw_log_debug("unexpected write (%u != %u)", @@ -402,9 +402,9 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, /* Write overrun only makes sense in constant delay mode. See the * RTP source module documentation and the rtp_audio_process_playback() * code for an explanation why. */ - if (!impl->direct_timestamp && (filled + samples > BUFFER_SIZE / stride)) { + if (!impl->direct_timestamp && (filled + samples > impl->buffer_size / stride)) { pw_log_debug("receiver write overrun %u + %u > %u", filled, samples, - BUFFER_SIZE / stride); + impl->buffer_size / stride); impl->have_sync = false; } else { pw_log_trace("got samples:%u", samples); @@ -736,9 +736,9 @@ static void rtp_audio_process_capture(void *data) * "Driver architecture and workflow" for an explanation why not. */ pw_log_warn("timestamp: expected %u != actual %u", expected_timestamp, actual_timestamp); impl->have_sync = false; - } else if (filled + wanted > (int32_t)SPA_MIN(impl->target_buffer * 8, BUFFER_SIZE / stride)) { + } else if (filled + wanted > (int32_t)SPA_MIN(impl->target_buffer * 8, impl->buffer_size / stride)) { pw_log_warn("sender write overrun %u + %u > %u/%u", filled, wanted, - impl->target_buffer * 8, BUFFER_SIZE / stride); + impl->target_buffer * 8, impl->buffer_size / stride); impl->have_sync = false; filled = 0; } @@ -754,7 +754,7 @@ static void rtp_audio_process_capture(void *data) actual_timestamp, impl->seq, impl->ts_offset, impl->ts_align, impl->ssrc); spa_ringbuffer_read_update(&impl->ring, actual_timestamp); spa_ringbuffer_write_update(&impl->ring, actual_timestamp); - memset(impl->buffer, 0, BUFFER_SIZE); + memset(impl->buffer, 0, impl->buffer_size); impl->have_sync = true; expected_timestamp = actual_timestamp; filled = 0; diff --git a/src/modules/module-rtp/midi.c b/src/modules/module-rtp/midi.c index 43e629acc..3ada8e091 100644 --- a/src/modules/module-rtp/midi.c +++ b/src/modules/module-rtp/midi.c @@ -49,7 +49,7 @@ static void rtp_midi_process_playback(void *data) if (avail <= 0) break; - ptr = SPA_PTROFF(impl->buffer, read & BUFFER_MASK2, void); + ptr = SPA_PTROFF(impl->buffer, read & impl->buffer_mask2, void); if ((pod = spa_pod_from_data(ptr, avail, 0, avail)) == NULL) goto done; @@ -235,7 +235,7 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti } filled = spa_ringbuffer_get_write_index(&impl->ring, &write); - if (filled > (int32_t)BUFFER_SIZE2) { + if (filled > (int32_t)impl->buffer_size2) { pw_log_warn("overflow"); return -ENOSPC; } @@ -261,11 +261,11 @@ static int rtp_midi_receive_midi(struct impl *impl, uint8_t *packet, uint32_t ti if (hdr.j) parse_journal(impl, &packet[end], seq, plen - end); - ptr = SPA_PTROFF(impl->buffer, write & BUFFER_MASK2, void); + ptr = SPA_PTROFF(impl->buffer, write & impl->buffer_mask2, void); /* each packet is written as a sequence of events. The offset is * the RTP timestamp */ - spa_pod_builder_init(&b, ptr, BUFFER_SIZE2 - filled); + spa_pod_builder_init(&b, ptr, impl->buffer_size2 - filled); spa_pod_builder_push_sequence(&b, &f[0], 0); while (offs < end) { @@ -472,7 +472,7 @@ static void rtp_midi_flush_packets(struct impl *impl, impl->seq++; len = 0; } - if ((unsigned int)size > BUFFER_SIZE || len > BUFFER_SIZE - size) { + if ((unsigned int)size > impl->buffer_size || len > impl->buffer_size - size) { pw_log_error("Buffer overflow prevented!"); return; // FIXME: what to do instead? } @@ -488,7 +488,7 @@ static void rtp_midi_flush_packets(struct impl *impl, int res; delta = offset - prev_offset; prev_offset = offset; - res = write_event(&impl->buffer[len], BUFFER_SIZE - len, delta, data, size); + res = write_event(&impl->buffer[len], impl->buffer_size - len, delta, data, size); if (res < 0) { pw_log_warn("write_event error: %d", res); return; diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c index d534e953f..2ab65e573 100644 --- a/src/modules/module-rtp/opus.c +++ b/src/modules/module-rtp/opus.c @@ -62,7 +62,7 @@ static void rtp_opus_process_playback(void *data) avail = target_buffer; } impl->first = false; - } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, BUFFER_SIZE2 / stride)) { + } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, impl->buffer_size2 / stride)) { pw_log_warn("overrun %u > %u", avail, target_buffer * 8); timestamp += avail - target_buffer; avail = target_buffer; @@ -83,8 +83,8 @@ static void rtp_opus_process_playback(void *data) } spa_ringbuffer_read_data(&impl->ring, impl->buffer, - BUFFER_SIZE2, - (timestamp * stride) & BUFFER_MASK2, + impl->buffer_size2, + (timestamp * stride) & impl->buffer_mask2, d[0].data, wanted * stride); timestamp += wanted; @@ -165,19 +165,19 @@ static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len, spa_dll_init(&impl->dll); spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); - memset(impl->buffer, 0, BUFFER_SIZE); + memset(impl->buffer, 0, impl->buffer_size); impl->have_sync = true; } else if (expected_write != write) { pw_log_debug("unexpected write (%u != %u)", write, expected_write); } - if (filled + 2880 > (int32_t)(BUFFER_SIZE2 / stride)) { + if (filled + 2880 > (int32_t)(impl->buffer_size2 / stride)) { pw_log_debug("capture overrun %u + %d > %u", filled, 2880, - BUFFER_SIZE2 / stride); + impl->buffer_size2 / stride); impl->have_sync = false; } else { - uint32_t index = (write * stride) & BUFFER_MASK2, end; + uint32_t index = (write * stride) & impl->buffer_mask2, end; res = opus_multistream_decode_float(dec, &buffer[hlen], plen, @@ -186,8 +186,8 @@ static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len, end = index + (res * stride); /* fold to the lower part of the ringbuffer when overflow */ - if (end > BUFFER_SIZE2) - memmove(impl->buffer, &impl->buffer[BUFFER_SIZE2], end - BUFFER_SIZE2); + if (end > impl->buffer_size2) + memmove(impl->buffer, &impl->buffer[impl->buffer_size2], end - impl->buffer_size2); pw_log_info("receiving %zd len:%d timestamp:%d %u", plen, res, timestamp, index); samples = res; @@ -302,22 +302,22 @@ static void rtp_opus_process_capture(void *data) pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", timestamp, impl->seq, impl->ts_offset, impl->ssrc); impl->ring.readindex = impl->ring.writeindex = expected_timestamp = timestamp; - memset(impl->buffer, 0, BUFFER_SIZE); + memset(impl->buffer, 0, impl->buffer_size); impl->have_sync = true; } else { if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) { pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); impl->have_sync = false; - } else if (filled + wanted > (int32_t)(BUFFER_SIZE / stride)) { - pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE / stride); + } else if (filled + wanted > (int32_t)(impl->buffer_size / stride)) { + pw_log_warn("overrun %u + %u > %u", filled, wanted, impl->buffer_size / stride); impl->have_sync = false; } } spa_ringbuffer_write_data(&impl->ring, impl->buffer, - BUFFER_SIZE, - (filled * stride) & BUFFER_MASK, + impl->buffer_size, + (filled * stride) & impl->buffer_mask, SPA_PTROFF(d[0].data, offs, void), wanted * stride); expected_timestamp += wanted; spa_ringbuffer_write_update(&impl->ring, expected_timestamp); diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index a1da20420..7b400051e 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -31,10 +31,7 @@ PW_LOG_TOPIC_EXTERN(mod_topic); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define BUFFER_SIZE (1u<<22) -#define BUFFER_MASK (BUFFER_SIZE-1) -#define BUFFER_SIZE2 (BUFFER_SIZE>>1) -#define BUFFER_MASK2 (BUFFER_SIZE2-1) +#define BUFFER_SIZE1 (1u<<22) /* IMPORTANT: When using calls that have return values, like * rtp_stream_emit_open_connection, callers must set the variables @@ -117,7 +114,11 @@ struct impl { uint32_t ts_align; struct spa_ringbuffer ring; - uint8_t buffer[BUFFER_SIZE]; + uint8_t *buffer; + uint32_t buffer_size; + uint32_t buffer_mask; + uint32_t buffer_size2; + uint32_t buffer_mask2; uint64_t last_recv_timestamp; struct spa_io_rate_match *io_rate_match; @@ -615,7 +616,7 @@ static void on_flush_timeout(void *d, uint64_t expirations) static void default_reset_ringbuffer(struct impl *impl) { - spa_memzero(impl->buffer, sizeof(impl->buffer)); + spa_memzero(impl->buffer, impl->buffer_size); } struct rtp_stream *rtp_stream_new(struct pw_core *core, @@ -782,9 +783,18 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, * then, all grid cells are guaranteed to have the size impl->stride, so the * aforementioned division rest will always be zero. */ - impl->actual_max_buffer_size = SPA_ROUND_DOWN(BUFFER_SIZE, impl->stride); + impl->buffer_size = pw_properties_get_uint32(props, "sess.buffer-size", BUFFER_SIZE1); + // find closest larger number rounded to power of two + impl->buffer_size = SPA_ROUND_UP_POW2_32(impl->buffer_size); + impl->buffer_mask = impl->buffer_size - 1; + impl->buffer_size2 = impl->buffer_size / 2; + impl->buffer_mask2 = impl->buffer_size2 - 1; + + impl->buffer = calloc(1, impl->buffer_size); + + impl->actual_max_buffer_size = SPA_ROUND_DOWN(impl->buffer_size, impl->stride); pw_log_debug("possible / actual max buffer size: %" PRIu32 " / %" PRIu32, - (uint32_t)BUFFER_SIZE, impl->actual_max_buffer_size); + (uint32_t)impl->buffer_size, impl->actual_max_buffer_size); pw_properties_setf(props, "rtp.mime", "%s", impl->format_info->mime); @@ -1046,6 +1056,9 @@ void rtp_stream_destroy(struct rtp_stream *s) if (impl->data_loop) pw_context_release_loop(impl->context, impl->data_loop); + if (impl->buffer) + free(impl->buffer); + spa_hook_list_clean(&impl->listener_list); free(impl); }