diff --git a/src/modules/meson.build b/src/modules/meson.build index 2a2b52678..7e4f68b76 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -596,7 +596,9 @@ summary({'zeroconf-discover': build_module_zeroconf_discover}, bool_yn: true, se # that contains that common code. pipewire_module_rtp_common_lib = static_library('pipewire-module-rtp-common-lib', [ 'module-rtp/stream.c', - 'module-rtp/jitter-buffer.c' ], + 'module-rtp/jitter-buffer.c', + 'module-rtp/audio-codec.c', + 'module-rtp/opus-codec.c' ], include_directories : [configinc], install : false, dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep], diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index 525ab75bc..ef7b3cf90 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -140,7 +140,8 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); "( sess.min-ptime= ) " \ "( sess.max-ptime= ) " \ "( sess.media= ) " \ - "( audio.format= ) " \ + "( audio.format= ) " \ "( audio.rate= ) " \ "( audio.channels= ) "\ "( audio.position= ) " \ @@ -1632,6 +1633,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) } else if (spa_streq(str, "opus")) { struct spa_dict_item items[] = { + { "audio.format", DEFAULT_OPUS_AUDIO_FORMAT }, { "audio.rate", SPA_STRINGIFY(DEFAULT_RATE) }, { "audio.channels", SPA_STRINGIFY(DEFAULT_CHANNELS) }, { "audio.position", DEFAULT_POSITION } }; diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 5f8c3e3d7..8f045e71d 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -199,7 +199,8 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); "( sess.min-ptime= ) " \ "( sess.max-ptime= ) " \ "( sess.media= ) " \ - "( audio.format= ) " \ + "( audio.format= ) " \ "( audio.rate= ) " \ "( audio.channels= ) " \ "( audio.position= ) " \ diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 651c7c1be..6cce0b1c7 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -174,7 +174,8 @@ PW_LOG_TOPIC(mod_topic, "mod." NAME); "( sess.latency.msec= ) "\ "( sess.ignore-ssrc= ) "\ "( sess.media= ) " \ - "( audio.format= ) " \ + "( audio.format= ) " \ "( audio.rate= ) " \ "( audio.channels= ) " \ "( audio.position= ) " \ @@ -316,7 +317,9 @@ on_rtp_io(void *data, int fd, uint32_t mask) /* Use this Linux specific feature to get the actual size of the * packet, even if it was truncated due to it being larger than * the buffer size. The code below uses this to detect packets - * that exceed the MTU size. */ + * that exceed the MTU size. Truncated packets are unusable. + * Especially if an audio codec is in use, the partial absence + * of data can lead to a corrupted state. */ MSG_TRUNC, #else 0, diff --git a/src/modules/module-rtp/audio-codec.c b/src/modules/module-rtp/audio-codec.c new file mode 100644 index 000000000..e32387bbb --- /dev/null +++ b/src/modules/module-rtp/audio-codec.c @@ -0,0 +1,14 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Carlos Rafael Giani */ +/* SPDX-License-Identifier: MIT */ + +#include "audio-codec.h" + +const char * rtp_audio_codec_type_name(enum rtp_audio_codec_type type) +{ + switch (type) { + case RTP_AUDIO_CODEC_TYPE_ENCODER: return "encoder"; + case RTP_AUDIO_CODEC_TYPE_DECODER: return "decoder"; + default: return ""; + } +} diff --git a/src/modules/module-rtp/audio-codec.h b/src/modules/module-rtp/audio-codec.h new file mode 100644 index 000000000..f608f11bc --- /dev/null +++ b/src/modules/module-rtp/audio-codec.h @@ -0,0 +1,209 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Carlos Rafael Giani */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_RTP_AUDIO_CODEC_H +#define PIPEWIRE_RTP_AUDIO_CODEC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include + +enum rtp_audio_codec_type { + RTP_AUDIO_CODEC_TYPE_ENCODER, + RTP_AUDIO_CODEC_TYPE_DECODER, +}; + +struct rtp_audio_codec_context { + void *handle; + struct spa_audio_info audio_info; + size_t samples_per_frame; + size_t max_encoded_frame_size; + size_t stride; + enum rtp_audio_codec_type type; + uint8_t *output_buffer; +}; + +struct rtp_audio_codec { + /* Initializes the audio codec. + * + * The codec initialization will be stored in the context. + * + * Initializing an already initialized codec leads to undefined behavior. + * + * context must point to an rtp_audio_codec_context struct instance. + * Said instance will be filled with states for the initialized codec. + * Any already present values will be overwritten. + * + * stream_info must point to an spa_audio_info that contains a raw audio + * format that the codec shall en/decode from/to. + * + * type specifies whether the codec shall be initialized as an + * en- or a decoder. + * + * samples_per_frame specifies how many samples a frame covers. + * "Samples" is meant in the RTP sense; that is, it specifies how + * many samples are there in one channel. This means that this + * value does not depend on the channel count in stream_info. + * + * max_encoded_data_size defines the maximum allowed size in bytes for + * encoded frames. The encode() function then is guaranteed to not + * produce a frame larger than this (it may produce a frame that is + * smaller than that though). + * + * stride specifies the number of bytes per one sample. This value + * _is_ depending on the channel count in stream_info. That is, + * stride = (number of channels x bytes per sample). + * + * codec_props contains extra, codec specific properties, like the + * encoding quality. These properties are optional, and this argument + * can be set to NULL. + * + * Returns 0 in case of success, and a negative errno in case of an error. + * An error will roll back any partial initialization. */ + int (*init)(struct rtp_audio_codec_context *context, struct spa_audio_info *stream_info, + enum rtp_audio_codec_type type, uint32_t samples_per_frame, + size_t max_encoded_data_size, size_t stride, struct pw_properties *codec_props); + + /* Shuts down a previously initialized codec. + * + * If the codec was not initialized, or was already shut down, + * this is a no-op. + * + * context must point to a valid rtp_audio_codec_context struct instance. */ + void (*shutdown)(struct rtp_audio_codec_context *context); + + /* Resets the codec state. + * + * Use this if for example a frame is corrupted and could not be decoded + * to reset to an initial state. This is recommended, since in such cases, + * the codec might not be able to cleanly decode data if previous states + * are retained. + * + * reason is an optional string for logging. It helps indicating in the + * logs the reason for the codec reset. If set to NULL, no reason is logged. + * + * context must point to a valid rtp_audio_codec_context struct instance + * that was initialized. */ + void (*reset)(struct rtp_audio_codec_context *context, const char *reason); + + /* Return the en- or decoder delay. + * + * The delay is given in samples. Whether this returns the en- or the + * decoder delay depends on type that was passed to init(). The delay + * is stored in the value pointed to by the delay pointer. + * + * IMPORTANT: This delay must not vary for the duration of the existence + * of the context. Once the context is created, the delay must be fixed. + * + * context must point to a valid rtp_audio_codec_context struct instance + * that was initialized. + * + * delay must be a valid pointer. + * + * Returns 0 in case of success, and a negative errno in case of an error. + * + * In case of an error, the value pointed to by the delay pointer is + * undefined. An error means that the codec cannot be used anymore and + * must be shut down. */ + int (*get_delay)(struct rtp_audio_codec_context *context, size_t *delay); + + /* Encodes a set of samples to a codec frame. + * + * This function only works if the type during initialization + * was RTP_AUDIO_CODEC_TYPE_ENCODER. If it isn't, this function's + * behavior is undefined. + * + * context must point to a valid rtp_audio_codec_context struct instance + * that was initialized. + * + * in_samples must point to a memory block containing at least the amount + * of samples that was specified when the codec was initialized. + * + * out_encoded_data must point to a pointer that shall be set to refer to + * an internal buffer that contains the encoded data. out_encoded_data_size + * must point to a size_t value that shall be set to the size of the encoded + * data in bytes. + * + * Returns 0 in case of success, and a negative errno in case of an error. + * + * In case of an error, the values out_encoded_data and out_encoded_data_size + * are set to are undefined. An error means that the codec cannot be used + * anymore and must be shut down. */ + int (*encode)(struct rtp_audio_codec_context *context, const uint8_t *in_samples, + uint8_t **out_encoded_data, size_t *out_encoded_data_size); + + /* Decodes a codec frame to a set of samples. + * + * Codec frames must be fed into the decoder in order. + * + * This function only works if the type during initialization + * was RTP_AUDIO_CODEC_TYPE_DECODER. If it isn't, this function's + * behavior is undefined. + * + * context must point to a valid rtp_audio_codec_context struct instance + * that was initialized. + * + * in_encoded_data must point to a memory block containing the codec frame. + * in_encoded_data_size must be set to the size of the codec frame in bytes. + * + * out_samples must point to a pointer that shall be set refer to an + * internal buffer that contains the decoded samples. out_num_samples + * must point to a size_t value that shall contain how many samples + * were decoded. That amount's maximum possible value is the number + * of samples per frame specified during initialization. The codec is + * allowed to produce less samples than that, but not more than that. + * + * Returns 0 in case of success, and a negative errno in case of an error. + * + * In case of an error, the values out_samples and out_num_samples + * are set to are undefined. An error means that the codec cannot be used + * anymore and must be shut down. */ + int (*decode)(struct rtp_audio_codec_context *context, const uint8_t *in_encoded_data, + size_t in_encoded_data_size, uint8_t **out_samples, size_t *out_num_samples); + + /* Applies PLC to cover a lost frame. + * + * This only works properly if non-missing frames that preceded the + * lost frame were decoded in order. + * + * context must point to a valid rtp_audio_codec_context struct instance + * that was initialized. + * + * out_samples must point to a pointer that shall be set refer to an + * internal buffer that contains the PLC samples. out_num_samples + * must point to a size_t value that shall contain how many PLC samples + * were generated. That amount's maximum possible value is the number + * of samples per frame specified during initialization. The codec is + * allowed to produce less samples than that, but not more than that. + * + * Returns 0 in case of success, and a negative errno in case of an error. + * + * In case of an error, the values out_samples and out_num_samples + * are set to are undefined. An error means that the codec cannot be used + * anymore and must be shut down. */ + int (*apply_plc)(struct rtp_audio_codec_context *context, + uint8_t **out_samples, size_t *out_num_samples); + + /* Gets a human-readable name of this codec. + * + * This is meant for logging purposes. */ + const char * (*get_name)(void); +}; + +/* Returns a human-readable string for the given audio codec type. + * + * This is meant for logging purposes. */ +const char * rtp_audio_codec_type_name(enum rtp_audio_codec_type type); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* PIPEWIRE_RTP_AUDIO_CODEC_H */ diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index 20f58fde6..a42eddaf7 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -119,12 +119,13 @@ static void rtp_audio_process_playback(void *data) uint32_t clock_rate = impl->io_position->clock.rate.denom; /* Translate the clock position to an RTP timestamp and - * shift it to compensate for device delay and ASRC delay. - * The device delay is scaled along with the clock position, - * since both are expressed in clock sample units, while - * pwt.buffered is expressed in stream time. */ + * shift it to compensate for device delay, decoder delay, + * and ASRC delay. The device delay is scaled along with + * the clock position, since both are expressed in clock + * sample units, while pwt.buffered is expressed in + * stream time. */ timestamp = scale_u64(impl->io_position->clock.position + device_delay, - impl->rate, clock_rate) + pwt.buffered; + impl->rate, clock_rate) + pwt.buffered + impl->codec_delay; spa_ringbuffer_read_update(&impl->ring, timestamp); avail = spa_ringbuffer_get_read_index(&impl->ring, &read_index); } else { @@ -224,15 +225,27 @@ static void rtp_audio_process_playback(void *data) device_delay = scale_u64(device_delay, impl->rate, clock_rate); } - /* Reduce target buffer by the delay amount to start playback sooner. - * This compensates for the delay to the device. */ - if (SPA_UNLIKELY(impl->target_buffer < device_delay)) { - pw_log_error("Delay to device (%" PRIu32 ") is higher than " - "the target buffer size (%" PRIu32 ")", device_delay, - impl->target_buffer); + /* Reduce target buffer by the decoder and device delay amount to + * start playback sooner. This compensates for the delay to the + * device and for the decoder delay. */ + + if (SPA_UNLIKELY(impl->target_buffer < (impl->codec_delay + device_delay))) { + if (impl->target_buffer < device_delay) { + pw_log_error("Delay to device (%" PRIu32 ") is higher than " + "the target buffer size (%" PRIu32 ")", device_delay, + impl->target_buffer); + } else if (impl->target_buffer < impl->codec_delay) { + pw_log_error("Decoder delay (%zu) is higher than the " + "target buffer size (%" PRIu32 ")", impl->codec_delay, + impl->target_buffer); + } else { + pw_log_error("The combined decoder delay (%zu) and device delay " + "(%" PRIu32 " are higher than the target buffer size (%" PRIu32 ")", + impl->codec_delay, device_delay, impl->target_buffer); + } target_buffer = 0; } else { - target_buffer = impl->target_buffer - device_delay; + target_buffer = impl->target_buffer - impl->codec_delay - device_delay; } if (avail < (int32_t)wanted) { @@ -330,50 +343,325 @@ static void rtp_audio_process_playback(void *data) pw_stream_queue_buffer(impl->stream, buf); } +static int process_received_samples(struct impl *impl, uint8_t *samples, uint32_t num_samples, + uint16_t seqnum, uint32_t timestamp, uint32_t ts_offset); + +static int on_jitter_buffer_output_rtp_packet(void *context, const uint8_t *packet_data, size_t packet_size, + size_t header_size, uint32_t timestamp, uint16_t seqnum) +{ + int ret; + size_t payload_size; + uint8_t *decoded_samples; + size_t num_decoded_samples; + struct impl *impl = context; + + payload_size = packet_size - header_size; + + ret = impl->audio_codec->decode(&(impl->audio_codec_context), packet_data + header_size, + payload_size, &decoded_samples, &num_decoded_samples); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not decode audio from packet with seqnum %" PRIu16 ": %s", + seqnum, spa_strerror(ret)); + return ret; + } + + pw_log_trace("got packet with seqnum %" PRIu16 " from jitter buffer; decoding " + "yielded %zu samples", seqnum, num_decoded_samples); + + return process_received_samples(impl, decoded_samples, num_decoded_samples, seqnum, + timestamp, impl->target_buffer); +} + +static void reset_rtp_audio_codec(struct impl *impl, const char *reason); + +static int on_jitter_buffer_signal_lost_packets(void *context, uint16_t seqnum_of_first_lost_packet, + size_t num_lost_packets, bool open_ended) +{ + int ret; + uint8_t *plc_samples; + size_t num_plc_samples; + uint32_t timestamp; + struct impl *impl = context; + size_t i; + + /* Don't apply PLC if sync is not established. Playback is in sync when + * there is a steady supply of data going on and the timestamps have been + * okay thus far. When sync is lost, there is an audible discontinuity + * that requires a reset of the ringbuffer contents along with the stats + * that keep playback in sync, so applying PLC then makes no sense. */ + if (!impl->have_sync) + return 0; + + // TODO apply fadeout if open_ended == true + + pw_log_info("Jitter buffer signals lost packet(s); packet loss sequence starts at " + "seqnum %" PRIu16 ", sequence length %zd, %sopen ended", seqnum_of_first_lost_packet, + num_lost_packets, open_ended ? "" : "not "); + + for (i = 0; i < num_lost_packets; ++i) { + uint16_t seqnum = (seqnum_of_first_lost_packet + i); + + ret = impl->audio_codec->apply_plc(&(impl->audio_codec_context), &plc_samples, &num_plc_samples); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not apply PLC: %s", spa_strerror(ret)); + return ret; + } + + spa_ringbuffer_get_write_index(&impl->ring, ×tamp); + + /* PLC generates sample data to cover the lost packet, but it does + * not generate timestamps. Since there is no other information available + * at this stage for reconstructing timestamps, just use the current + * ringbuffer write index. That write index is actually the ideal current + * timestamp in direct timestamp mode - that is, if sync is perfect, then + * that ideal timestamp perfectly matches the RTP timestamps (when they + * are shited by target_buffer). See process_received_samples() for + * how these are used and compared to calculate a skew. */ + + pw_log_trace("jitter buffer signaled loss of packet with seqnum %" PRIu16 "; PLC " + "yielded %zu samples; using RTP timestamp %" PRIu32, seqnum, num_plc_samples, + timestamp); + + /* The write index has the target_buffer factored in. (See for example the + * !have_sync cases in process_received_samples().) Therefore, it is + * important to ensure that the timestamp we got from the ringbuffer write + * index is _not_ shifted by target_buffer again. For this reason, pass + * 0 as the ts_offset. */ + ret = process_received_samples(impl, plc_samples, num_plc_samples, seqnum, timestamp, 0); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not process PLC samples: %s", spa_strerror(ret)); + return ret; + } + } + + if (open_ended) + reset_rtp_audio_codec(impl, "open ended packet loss"); + + return 0; +} + +static int setup_rtp_audio_codec(struct impl *impl, const struct rtp_audio_codec *audio_codec, + struct pw_properties *props) +{ + int ret; + enum rtp_audio_codec_type audio_codec_type; + + /* No audio codec -> PCM is to be transmitted directly. + * Nothing to set up then. */ + if (audio_codec == NULL) + return 0; + + spa_assert(impl->psamples > 0); + + /* A jitter buffer is only needed in the output direction, + * since that is the direction source nodes (= receivers) use. */ + if (impl->direction == PW_DIRECTION_OUTPUT) { + struct rtp_jitter_buffer_params params; + uint32_t jitter_buffer_length_perc; + size_t max_num_packets_in_ringbuffer; + + if (props != NULL) { + jitter_buffer_length_perc = pw_properties_get_uint32(props, + "jitter.buffer.length.perc", 50); + } else { + jitter_buffer_length_perc = 50; + } + + /* Round up the number of packets, since even a packet with just 1 sample + * actually inside is considered a full packet in the ringbuffer. */ + max_num_packets_in_ringbuffer = (impl->target_buffer + (impl->psamples - 1)) / impl->psamples; + /* Get the number of slots out of the number of packets that fit in the + * jitter buffer, using the jitter_buffer_length_perc percentage. At least + * 1 slot is necessary though, so limit the output to a minimum of 1. */ + params.num_slots = SPA_MAX(max_num_packets_in_ringbuffer * jitter_buffer_length_perc / 100, 1u); + pw_log_debug("ringbuffer can hold up to %zu packets worth of data; " + "jitter buffer length is set to cover %" PRIu32 "%% of " + "the ring buffer -> num slots: %zu", max_num_packets_in_ringbuffer, + jitter_buffer_length_perc, params.num_slots); + + params.max_packet_size = impl->mtu; + params.packet_duration = scale_u64(impl->psamples, SPA_NSEC_PER_SEC, impl->rate); + params.loop = impl->data_loop; + params.context = impl; + params.output_rtp_packet = on_jitter_buffer_output_rtp_packet; + params.signal_lost_packets = on_jitter_buffer_signal_lost_packets; + + ret = rtp_jitter_buffer_init(&(impl->jitter_buffer), ¶ms); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not setup jitter buffer: %s", spa_strerror(ret)); + return ret; + } + + audio_codec_type = RTP_AUDIO_CODEC_TYPE_DECODER; + } else { + audio_codec_type = RTP_AUDIO_CODEC_TYPE_ENCODER; + /* This is a buffer used when data that is to be encoded wraps + * around the ring buffer. The encoder needs the data in + * contiguous form, so the wrapped data must be copied into + * this buffer first in such cases. */ + impl->audio_encoder_staging_buffer = malloc(impl->psamples * impl->stride); + } + + impl->audio_codec = audio_codec; + + ret = impl->audio_codec->init(&(impl->audio_codec_context), &impl->stream_info, + audio_codec_type, impl->psamples, impl->payload_size, impl->stride, props); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not setup audio codec: %s", spa_strerror(ret)); + return ret; + } + + /* Since a stream always either only receives or only sends, one single + * quantity is used for both directions. This means that when this stream + * sends, the audio_codec is configured to encode, and thus, codec_delay + * then is the encoder delay. Consequently, if the stream receives, and + * the audio_codec decodes, codec_delay is the decoder delay. */ + ret = impl->audio_codec->get_delay(&(impl->audio_codec_context), &(impl->codec_delay)); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not get audio codec delay: %s", spa_strerror(ret)); + return ret; + } + + /* In the output direction (which is what RTP source nodes use), the + * codec_delay is the decoder delay. That delay is applied by either + * shifting timestamps forward (in the direct timestamp mode) or by + * subtracting from target_buffer (in the constant latency mode). + * That is, the decoder delay is treated as part of the specified + * session latency. This ensures that RTP source nodes are in sync + * even in the (unlikely) case that they use different decoders with + * different decoder latencies. But, it is then important to check + * that the decoder delay is smaller than the buffer size. If this + * is not the case, then the constant latency mode would use negative + * buffer sizes, and the direct timestamp mode would always shift + * timestamps past the buffer size. */ + if ((impl->direction == PW_DIRECTION_OUTPUT) && + SPA_UNLIKELY(impl->codec_delay > impl->target_buffer)) { + pw_log_error("decoder delay (%zu samples) is larger than buffer size " + "(%" PRIu32 " samples)", impl->codec_delay, impl->target_buffer); + return -EINVAL; + } + + pw_log_info("initialized %s %s with %" PRIu32 " samples per packet; " + "codec delay: %zu samples", impl->audio_codec->get_name(), + rtp_audio_codec_type_name(audio_codec_type), impl->psamples, + impl->codec_delay); + + return 0; +} + +static void teardown_rtp_audio_codec(struct impl *impl) +{ + if ((impl->audio_codec != NULL) && (impl->audio_codec_context.handle != NULL)) { + rtp_jitter_buffer_shutdown(&(impl->jitter_buffer)); + + impl->audio_codec->shutdown(&(impl->audio_codec_context)); + impl->audio_codec_context.handle = NULL; + } + + free(impl->audio_encoder_staging_buffer); + impl->audio_encoder_staging_buffer = NULL; +} + +static void reset_rtp_audio_codec(struct impl *impl, const char *reason) +{ + if (impl->audio_codec != NULL) + impl->audio_codec->reset(&(impl->audio_codec_context), reason); +} + static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, ssize_t hlen, uint64_t current_time) { + int ret; struct rtp_header *hdr; - ssize_t plen; - uint16_t seq; - uint32_t timestamp, samples, write, expected_write; - uint32_t stride = impl->stride; - int32_t filled; + uint16_t seqnum; + uint32_t timestamp; hdr = (struct rtp_header*)buffer; + seqnum = ntohs(hdr->sequence_number); + timestamp = ntohl(hdr->timestamp) - impl->ts_offset; - seq = ntohs(hdr->sequence_number); - if (impl->have_seq && impl->seq != seq) { - pw_log_info("unexpected seq (%d != %d) SSRC:%u", - seq, impl->seq, impl->ssrc); - /* No need to resynchronize here. If packets arrive out of - * order, then they are still written in order into the ring - * buffer, since they are written according to where the - * RTP timestamp points to. */ + if (impl->have_seq && (impl->next_expected_incoming_seq >= 0) && + (impl->next_expected_incoming_seq != seqnum)) { + pw_log_info("packet arrived out of order: expected/actual packet seq: %" + PRIu16"/%" PRIu16 " SSRC: %" PRIu32 " timestamp: %" PRIu32, + impl->next_expected_incoming_seq, seqnum, impl->ssrc, + timestamp); } + + /* Note that setting the last_recv_timestamp might not be correct + * in cases where the jitter buffer switches to hold-back mode. That's + * because in such cases, the jitter buffer will output packets in + * bursts, and it might do so at a time that is significantly ahead + * of the current_time. However - the jitter buffer switches to that + * mode when it detects packet losses. last_recv_timestamp is used + * for in-flight data calculations to smoothen out DLL adjustments + * when the RTP source is running in constant delay mode. These + * calculations assume steady packet transmission and a reliable + * network - and in such a network, neither packet reordering nor + * packet losses occur. Thus, it is still okay to set this timestamp + * here instead of in process_received_samples(), because when packets + * are lost / reordered, those calculations fall apart anyway. */ + impl->last_recv_timestamp = current_time; + + impl->next_expected_incoming_seq = (seqnum + 1) & 65535; + + /* If execution reaches this point, then the packet might be out of order, + * but still arrived in time. If the jitter buffer is initialized, it + * will take care of reordering packets. If it is not initialized, then + * there is no need to reorder the packets and depayload the data. The + * packet contents can be directly written into the ring buffer + * according to where their RTP timestamps point to, so even if they + * come in out of order, they ultimately end up in the ring buffer in + * the right locations. This can be done when raw PCM data is + * transmitted, but not when the data is encoded - with encoded data, + * the jitter buffer is necessary, since audio codecs typically require + * encoded frames to arrive in order. */ + + if (rtp_jitter_buffer_is_initialized(&(impl->jitter_buffer))) { + ret = rtp_jitter_buffer_insert_packet(&(impl->jitter_buffer), buffer, len, + hlen, timestamp, seqnum); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not insert packet into jitter buffer: %s", spa_strerror(ret)); + return ret; + } + } else { + ssize_t plen = len - hlen; + ret = process_received_samples(impl, &buffer[hlen], plen / impl->stride, + seqnum, timestamp, impl->target_buffer); + if (SPA_UNLIKELY(ret < 0)) { + pw_log_error("could not process received samples: %s", spa_strerror(ret)); + return ret; + } + } + + return 0; +} + +static int process_received_samples(struct impl *impl, uint8_t *samples, + uint32_t num_samples, uint16_t seq, uint32_t timestamp, uint32_t ts_offset) +{ + uint32_t write, expected_write; + int32_t filled; + uint32_t stride = impl->stride; + impl->seq = seq + 1; impl->have_seq = true; - timestamp = ntohl(hdr->timestamp) - impl->ts_offset; - impl->receiving = true; - impl->last_recv_timestamp = current_time; - - plen = len - hlen; - samples = plen / stride; filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); - /* we always write to timestamp + delay */ - write = timestamp + impl->target_buffer; + write = timestamp + ts_offset; if (!impl->have_sync) { pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u target:%u direct:%u", timestamp, seq, impl->ts_offset, impl->ssrc, impl->target_buffer, impl->direct_timestamp); - /* we read from timestamp, keeping target_buffer of data - * in the ringbuffer. */ + /* Synchronize by setting the read and write indices such that + * the read index is ahead of the write index by exactly the + * ringbuffer length, and the write index equals the timestamp + * of the current RTP packet. */ impl->ring.readindex = timestamp; impl->ring.writeindex = write; filled = impl->target_buffer; @@ -381,26 +669,34 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, spa_dll_init(&impl->dll); spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); memset(impl->buffer, 0, impl->buffer_size); + + reset_rtp_audio_codec(impl, "(re)resynchronization"); + impl->have_sync = true; } else if (expected_write != write) { pw_log_debug("unexpected write (%u != %u)", write, expected_write); } - /* Write overrun only makes sense in constant delay mode. See the - * RTP source module documentation and the rtp_audio_process_playback() - * code for an explanation why. */ - if (!impl->direct_timestamp && (filled + samples > impl->buffer_size / stride)) { - pw_log_debug("receiver write overrun %u + %u > %u", filled, samples, + if (!impl->direct_timestamp && (filled + num_samples > impl->buffer_size / stride)) { + /* In constant delay mode, the goal is to keep the buffer fill + * level at a fixed level. If it goes above or below that, rate + * matching is used in rtp_audio_process_playback() to drive + * the fill level to that target value. If however the write side + * (that is, this function here) reaches a write overrun, it cannot + * insert any more samples and rely on that rate matching to + * compensate (there's no more room in the ringbuffer). A hard + * resync is needed in such a case. */ + pw_log_debug("receiver write overrun %u + %u > %u", filled, num_samples, impl->buffer_size / stride); impl->have_sync = false; } else { - pw_log_trace("got samples:%u", samples); + pw_log_trace("got %" PRIu32 " samples", num_samples); spa_ringbuffer_write_data(&impl->ring, impl->buffer, impl->actual_max_buffer_size, ((uint64_t)write * stride) % impl->actual_max_buffer_size, - &buffer[hlen], (samples * stride)); + samples, (num_samples * stride)); /* Only update the write index if data was actually _appended_. * If packets arrived out of order, then it may be that parts @@ -433,7 +729,7 @@ static int rtp_audio_receive(struct impl *impl, uint8_t *buffer, ssize_t len, * In unsigned arithmetic, if write + samples exceeds UINT32_MAX, * it wraps around to a smaller value. We detect this by checking * if new_write < write (which can only happen on overflow). */ - const uint32_t new_write = write + samples; + const uint32_t new_write = write + num_samples; const bool wrapped_around = new_write < write; /* Determine if new_write is ahead of expected_write. @@ -476,6 +772,8 @@ static void rtp_audio_send_packets(struct impl *impl, uint32_t timestamp, uint32 struct iovec iov[3]; struct rtp_header header; uint32_t stride; + bool do_send = true; + int ret; stride = impl->stride; @@ -505,12 +803,84 @@ static void rtp_audio_send_packets(struct impl *impl, uint32_t timestamp, uint32 ((uint64_t)timestamp * stride) % impl->actual_max_buffer_size, &iov[1], tosend * stride); - pw_log_trace_fp("sending %d packet:%d ts_offset:%d timestamp:%u (%f s)", - tosend, num, impl->ts_offset, timestamp, + pw_log_trace_fp("sending %d packet:%d seq: %" PRIu16 " ts_offset:%d timestamp:%u (%f s)", + tosend, num, seq, impl->ts_offset, timestamp, (double)timestamp * impl->io_position->clock.rate.num / impl->io_position->clock.rate.denom); - rtp_stream_call_send_packet(impl, iov, 3); + if (impl->audio_codec != NULL) { + /* In here, the ring buffer that iov points to is redirected + * to be encoded, and the encoder's output is then set as the + * new iov content. */ + + uint8_t *samples_to_encode = NULL; + uint8_t *encoded_data = NULL; + size_t encoded_data_size = 0; + + /* The audio codec expects one contiguous data block with impl->psamples + * samples. In case of a ring buffer wrap around, copy the audio data + * into the staging buffer, since the encoder cannot handle the wrap + * around on its own. And if there are fewer than psamples samples, + * copy what's available into the staging buffer and zero-stuff the rest + * of its space to be able to give the encoder the required amount of + * data to encode. */ + spa_assert((iov[1].iov_len + iov[2].iov_len) == (tosend * stride)); + if (iov[2].iov_len != 0) { + /* Copy the audio data, taking wrap around into account. */ + memcpy(impl->audio_encoder_staging_buffer, iov[1].iov_base, iov[1].iov_len); + memcpy(impl->audio_encoder_staging_buffer + iov[1].iov_len, iov[2].iov_base, iov[2].iov_len); + /* Zero-pad the unused trailing portions of the buffer. */ + if ((uint32_t)tosend < impl->psamples) { + memset(impl->audio_encoder_staging_buffer + iov[1].iov_len + iov[2].iov_len, + 0, (impl->psamples - tosend) * stride); + } + + samples_to_encode = impl->audio_encoder_staging_buffer; + } else if ((uint32_t)tosend < impl->psamples) { + /* Copy the audio data. */ + memcpy(impl->audio_encoder_staging_buffer, iov[1].iov_base, iov[1].iov_len); + /* Zero-pad the unused trailing portions of the buffer. */ + memset(impl->audio_encoder_staging_buffer + iov[1].iov_len, + 0, (impl->psamples - tosend) * stride); + + samples_to_encode = impl->audio_encoder_staging_buffer; + } else { + /* No wrap around happening, and no zero padding necessary. + * The audio data can be directly fed into the encoder. */ + samples_to_encode = iov[1].iov_base; + } + + ret = impl->audio_codec->encode(&(impl->audio_codec_context), + samples_to_encode, &encoded_data, &encoded_data_size); + + if (SPA_LIKELY(ret == 0)) { + /* Tweak iov to get the actual RTP payload from that single staging + * buffer instead of from the ring buffer directly. (The iov_base + * value of iov[2] is still set to encoded_data, even though its + * iov_len is 0, since it is not sure if setting its iov_base + * pointer to NULL is valid. iov_len 0 _is_ valid, and causes + * POSIX calls to ignore that iovec item. */ + iov[1].iov_base = encoded_data; + iov[1].iov_len = encoded_data_size; + iov[2].iov_base = encoded_data; + iov[2].iov_len = 0; + do_send = true; + } else { + /* Normally, an encode() error would require shutting down the + * encoder. However, this is not feasible here, since this code + * runs in the data loop thread. Furthermore, if the errors keep + * occurring, this would lead to a constantly reinitializing + * encoder. There is no discernible way to communicate the error + * to the application either. Thus, in case of an error, skip + * the send, and log it. */ + pw_log_error("could not encode audio for packet with seqnum %" PRIu16 ": %s", + seq, spa_strerror(ret)); + do_send = false; + } + } + + if (SPA_LIKELY(do_send)) + rtp_stream_call_send_packet(impl, iov, 3); seq++; first = false; diff --git a/src/modules/module-rtp/opus-codec.c b/src/modules/module-rtp/opus-codec.c new file mode 100644 index 000000000..f4a5bcccf --- /dev/null +++ b/src/modules/module-rtp/opus-codec.c @@ -0,0 +1,625 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Carlos Rafael Giani */ +/* SPDX-License-Identifier: MIT */ + +#include "config.h" + +#ifdef HAVE_OPUS + +#include +#include + +#include "opus-codec.h" + +#include +#include + +#include + +#include +#include + +PW_LOG_TOPIC_EXTERN(mod_topic); +#define PW_LOG_TOPIC_DEFAULT mod_topic + +static int opus_error_to_neg_errno(int opus_error) +{ + switch (opus_error) { + case OPUS_OK: return 0; + case OPUS_BAD_ARG: return -EINVAL; + case OPUS_BUFFER_TOO_SMALL: return -ENOBUFS; + case OPUS_INTERNAL_ERROR: return -EIO; + case OPUS_INVALID_PACKET: return -EBADMSG; + case OPUS_UNIMPLEMENTED: return -ENOTSUP; + case OPUS_INVALID_STATE: return -EBADF; + case OPUS_ALLOC_FAIL: return -ENOMEM; + default: return -EIO; + } +} + +static void rtp_opus_codec_shutdown(struct rtp_audio_codec_context *context); + +static int rtp_opus_codec_init(struct rtp_audio_codec_context *context, + struct spa_audio_info *stream_info, + enum rtp_audio_codec_type type, uint32_t samples_per_frame, + size_t max_encoded_frame_size, size_t stride, + struct pw_properties *codec_props) +{ + int ret = 0; + int opus_ret = OPUS_OK; + uint32_t i; + unsigned char mapping[255]; + uint64_t us_per_frame; + const char *codec_type_name = ""; + size_t output_buffer_size = 0; + + spa_assert(stream_info != NULL); + spa_assert(context != NULL); + spa_assert(samples_per_frame > 0); + spa_assert(max_encoded_frame_size > 0); + spa_assert(stride > 0); + + spa_memzero(context, sizeof(struct rtp_audio_codec_context)); + + /* Opus supports: + * + * - 1 to 255 channels + * - 8, 12, 16, 24, 48 kHz sample rates + * - 16-bit signed integer and 32-bit float point as sample format + * (with native endianness) + * - 2.5, 5, 10, 20, 40, 60ms frame sizes + * + * Check that the parameters satisfy these constraints. */ + + if ((stream_info->info.raw.channels == 0) || (stream_info->info.raw.channels > 255)) { + pw_log_error("Opus cannot handle %" PRIu32 " channel(s); valid channel count range: 1-255", + stream_info->info.raw.channels); + ret = -EINVAL; + goto error; + } + + switch (stream_info->info.raw.rate) { + case 8000: + case 12000: + case 16000: + case 24000: + case 48000: + break; + default: + pw_log_error("unsupported sample rate of %" PRIu32 " Hz; supported sample rates: " + "8000, 12000, 16000, 24000, 48000", stream_info->info.raw.rate); + ret = -EINVAL; + goto error; + } + + switch (stream_info->info.raw.format) { + case SPA_AUDIO_FORMAT_S16: + case SPA_AUDIO_FORMAT_F32: + break; + default: + pw_log_error("unsupported sample format %s; Opus requires 16-bit signed integer " + "or 32-bit floating point samples", + spa_type_audio_format_to_short_name(stream_info->info.raw.format)); + ret = -EINVAL; + goto error; + } + + us_per_frame = (uint64_t)samples_per_frame * 1000000 / stream_info->info.raw.rate; + + switch (us_per_frame) { + case 2500: + case 5000: + case 10000: + case 20000: + case 40000: + case 60000: + break; + default: + pw_log_error("unsupported frame length: %" PRIu32 " samples (%.1f ms)", + samples_per_frame, (double)us_per_frame / 1000.0); + ret = -EINVAL; + goto error; + } + + /* Setup the context. */ + + context->audio_info = *stream_info; + context->samples_per_frame = samples_per_frame; + context->max_encoded_frame_size = max_encoded_frame_size; + context->stride = stride; + context->type = type; + + /* TODO: Currently, we use a simple 1:1 channel mapping. Also, coupled + * streams are not used at the moment. This is a limitation of the current + * Opus integration, in part because it is unclear how to communicate + * the channel mapping and coupled streams from the en- to the decoder. + * One possibility would be to enforce a (de-facto) standard channel + * mapping, like the Vorbis channel mapping. */ + + for (i = 0; i < stream_info->info.raw.channels; i++) + mapping[i] = i; + + switch (type) { + case RTP_AUDIO_CODEC_TYPE_ENCODER: { + static const opus_int32 DEFAULT_COMPLEXITY = 10; + + bool use_computed_max_bitrate = true; + opus_int32 props_bitrate = -1; + opus_int32 computed_max_bitrate; + opus_int32 complexity = DEFAULT_COMPLEXITY; + bool restricted_lowdelay = false; + bool in_band_fec = false; + int packet_loss_percentage = 0; + opus_int32 signal_type = OPUS_AUTO; + const char *signal_type_str = "auto"; + + if (codec_props != NULL) { + const char *prop_str; + + complexity = pw_properties_get_int32(codec_props, "opus.encoder.complexity", complexity); + props_bitrate = pw_properties_get_int32(codec_props, "opus.encoder.bitrate", props_bitrate); + restricted_lowdelay = pw_properties_get_bool(codec_props, "opus.encoder.restricted-lowdelay", + restricted_lowdelay); + in_band_fec = pw_properties_get_bool(codec_props, "opus.encoder.inband-fec", in_band_fec); + packet_loss_percentage = pw_properties_get_int32(codec_props, "opus.encoder.packet-loss-percentage", + packet_loss_percentage); + + if ((packet_loss_percentage < 0) || (packet_loss_percentage > 100)) { + pw_log_error("invalid packet loss percentage %d (valid range: 0-100)", packet_loss_percentage); + ret = -EINVAL; + goto error; + } + + prop_str = pw_properties_get(codec_props, "opus.encoder.signal-type"); + if (prop_str != NULL) { + if (spa_streq(signal_type_str, "auto")) + signal_type = OPUS_AUTO; + else if (spa_streq(signal_type_str, "voice")) + signal_type = OPUS_SIGNAL_VOICE; + else if (spa_streq(signal_type_str, "music")) + signal_type = OPUS_SIGNAL_MUSIC; + else { + pw_log_error("unsupported Opus encoder signal type \"%s\"", signal_type_str); + ret = -EINVAL; + goto error; + } + } + } + + context->handle = opus_multistream_encoder_create( + stream_info->info.raw.rate, + stream_info->info.raw.channels, + stream_info->info.raw.channels, + 0, + mapping, + restricted_lowdelay ? OPUS_APPLICATION_RESTRICTED_LOWDELAY : OPUS_APPLICATION_AUDIO, + &opus_ret); + + if (opus_ret != OPUS_OK) + goto error_while_creating; + + /* Some CTL may actually not be implemented. It is safe to ignore these. */ + #define SET_OPUS_ENCODER_CTL(CTL) \ + do { \ + opus_ret = opus_multistream_encoder_ctl(context->handle, CTL); \ + switch (opus_ret) { \ + case OPUS_OK: \ + break; \ + case OPUS_UNIMPLEMENTED: \ + pw_log_debug("could not set encoder CTL %s since it is " \ + "not implemented; ignoring", #CTL); \ + break; \ + default: \ + pw_log_error("error while setting encoder CTL %s: %s", \ + #CTL, opus_strerror(opus_ret)); \ + ret = opus_error_to_neg_errno(opus_ret); \ + goto error; \ + } \ + } while (0) + + /* Use the maximum encoded frame size as the size + * for the output buffer, since that one will be + * used as the destination for the encoder. */ + output_buffer_size = max_encoded_frame_size; + + /* Hardcode encoder CTLs to ensure consistent and deterministic + * encoding behavior, even if the defaults change across + * libopus versions. */ + + /* Counterintuitively, CBR is not actually useful for RTP. RFC 7587 + * section 3.1.2 documents this. This applies to the generic audio + * use case here as well. The opus_multistream_encode() function + * (and its float variant) accept an argument that hard-limits the + * maximum encoded output size, so there is no possibility of Opus + * frames exceeding the maximum RTP payload size. Do constrain VBR + * though to not experience excessive variability. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_VBR(1)); + SET_OPUS_ENCODER_CTL(OPUS_SET_VBR_CONSTRAINT(1)); + /* This is a niche feature that is useful for avoiding audio configuration + * switching in software stacks. Signals are forcibly converted to mono or + * stereo, depending on the parameter. Not needed here: channel configuration + * is determined by the PipeWire stream format, not the codec. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_FORCE_CHANNELS(OPUS_AUTO)); + /* Allow the encoder to automatically select the appropriate bandwidth, and + * set the upper bound to the maximum to enable unconstrained selection. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_MAX_BANDWIDTH(OPUS_BANDWIDTH_FULLBAND)); + SET_OPUS_ENCODER_CTL(OPUS_SET_BANDWIDTH(OPUS_AUTO)); + /* DTX is a SILK-layer feature, and not useful for generic audio. It + * is there for detecting silence / low-energy periods during speech. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_DTX(0)); + /* This is a hint for the encoder about the actual depth of the source + * signal. For example, if 32-bit floating point audio is the input, but + * the actual audio signal has a noise floor that resembles that of + * 16-bit audio, then setting this to 16 is appropriate. But such + * signal details are not known here, so set this to the maximum. + * + * Also, from https://www.opus-codec.org/docs/opus_api-1.6/group__opus__encoderctls.html#gaa23940eb477ff617edc14b8d66e104c0 : + * > When using opus_encode() instead of opus_encode_float(), or when libopus is compiled + * > for fixed-point, the encoder uses the minimum of the value set here and the value 16. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_LSB_DEPTH(24)); + /* Disabling prediction hurts encoder efficiency substantially, and + * is only really useful for debugging and testing purposes. Keep + * prediction on. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_PREDICTION_DISABLED(0)); +#ifdef OPUS_SET_DRED_DURATION_REQUEST + /* DRED is a new deep-learning-based redundancy mechanism that embeds up + * to one second of recovery data. It is not used here, since, at this point, + * it only works with SILK, though maybe one day it will work with the CELT + * layer as well. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_DRED_DURATION(0)); +#endif +#ifdef OPUS_SET_QEXT_REQUEST + /* Quality extensions are a new feature that requires very high bitrates. + * See: https://datatracker.ietf.org/doc/draft-ietf-mlcodec-opus-scalable-quality-extension/ + * Disabled due to its experimental nature and need for very high bitrates. */ + SET_OPUS_ENCODER_CTL(OPUS_SET_QEXT(0)); +#endif + + /* Set the bitrate. */ + + /* Estimate the "max computed bitrate" out of the max_encoded_frame_size + * that make the most use of the available space. The valid bitrate range + is 500 to 512000 bits/s, so clamp the estimation to that range. */ + computed_max_bitrate = ((uint64_t)max_encoded_frame_size) * 8 * SPA_USEC_PER_SEC / us_per_frame; + computed_max_bitrate = SPA_CLAMP(computed_max_bitrate, 500, 512000); + + pw_log_debug("computed a bitrate of %" PRId32 " bits/s for Opus encoding based " + "on the max allowed encoded frame size of %zu bytes", + (int32_t)computed_max_bitrate, max_encoded_frame_size); + + if (props_bitrate >= 0) { + pw_log_info("got bitrate %" PRId32 " from properties", (int32_t)props_bitrate); + + if (props_bitrate < 500) { + pw_log_warn("bitrate %" PRId32 " from properties is invalid (must " + "be at least 500 bits/s); setting computed bitrate instead", + (int32_t)props_bitrate); + } else if (props_bitrate > computed_max_bitrate) { + pw_log_warn("bitrate %" PRId32 " from properties exceeds computed " + "max bitrate; setting computed max bitrate instead", + (int32_t)props_bitrate); + } else { + use_computed_max_bitrate = false; + } + } + + if (use_computed_max_bitrate) { + pw_log_info("setting computed max bitrate of %" PRId32 " bits/s", + (int32_t)computed_max_bitrate); + SET_OPUS_ENCODER_CTL(OPUS_SET_BITRATE(computed_max_bitrate)); + } else { + pw_log_info("setting bitrate of %" PRId32 " bits/s from opus.encoder.bitrate property", + (int32_t)props_bitrate); + SET_OPUS_ENCODER_CTL(OPUS_SET_BITRATE(props_bitrate)); + } + + /* Set the encoding complexity. */ + + if ((complexity < 0) || (complexity > 10)) { + pw_log_warn("complexity %" PRId32 " is invalid; choosing %" PRId32 " as default", + (int32_t)complexity, (int32_t)DEFAULT_COMPLEXITY); + complexity = DEFAULT_COMPLEXITY; + } + SET_OPUS_ENCODER_CTL(OPUS_SET_COMPLEXITY(complexity)); + pw_log_info("setting encoding complexity %" PRId32, (int32_t)complexity); + + /* Set the in-band FEC. */ + + SET_OPUS_ENCODER_CTL(OPUS_SET_INBAND_FEC(in_band_fec ? 1 : 0)); + SET_OPUS_ENCODER_CTL(OPUS_SET_PACKET_LOSS_PERC(packet_loss_percentage)); + pw_log_info("%s in-band FEC; packet loss percentage: %d", + in_band_fec ? "enabling" : "disabling", packet_loss_percentage); + + /* Set the signal type. */ + + SET_OPUS_ENCODER_CTL(OPUS_SET_SIGNAL(signal_type)); + pw_log_info("setting signal type \"%s\"", signal_type_str); + + #undef SET_OPUS_ENCODER_CTL + + break; + } + case RTP_AUDIO_CODEC_TYPE_DECODER: + context->handle = opus_multistream_decoder_create( + stream_info->info.raw.rate, + stream_info->info.raw.channels, + stream_info->info.raw.channels, + 0, + mapping, + &opus_ret); + + if (opus_ret != OPUS_OK) + goto error_while_creating; + + /* Use the space a fully decoded frame needs as + * the output buffer size, since that one will be + * used as the destination for the decoder. */ + output_buffer_size = samples_per_frame * stride; + break; + default: + pw_log_error("unsupported audio codec type %d", (int)(type)); + ret = -EINVAL; + goto error; + } + + codec_type_name = rtp_audio_codec_type_name(type); + + pw_log_info("Opus %s created; samples per frame: %" PRIu32 "; max encoded frame data size: %zu; " + "output buffer size: %zu; ms per frame: %.1f; sample format: %s", codec_type_name, + samples_per_frame, max_encoded_frame_size, output_buffer_size, (double)us_per_frame / 1000.0, + spa_type_audio_format_to_short_name(stream_info->info.raw.format)); + + context->output_buffer = malloc(output_buffer_size); + if (context->output_buffer == NULL) { + pw_log_error("could not allocate %s output buffer", codec_type_name); + ret = -ENOMEM; + goto error; + } + + return 0; + +error: + rtp_opus_codec_shutdown(context); + return ret; + +error_while_creating: + context->handle = NULL; + pw_log_error("error while creating %s: %s", codec_type_name, opus_strerror(opus_ret)); + ret = opus_error_to_neg_errno(opus_ret); + goto error; +} + +static void rtp_opus_codec_shutdown(struct rtp_audio_codec_context *context) +{ + spa_assert(context != NULL); + + if (context->handle != NULL) { + if (context->type == RTP_AUDIO_CODEC_TYPE_ENCODER) + opus_multistream_encoder_destroy(context->handle); + else + opus_multistream_decoder_destroy(context->handle); + context->handle = NULL; + + pw_log_info("Opus %s destroyed", rtp_audio_codec_type_name(context->type)); + } + + if (context->output_buffer != NULL) { + free(context->output_buffer); + context->output_buffer = NULL; + pw_log_debug("%s output buffer freed", rtp_audio_codec_type_name(context->type)); + } +} + +static void rtp_opus_codec_reset(struct rtp_audio_codec_context *context, char const *reason) +{ + spa_assert(context != NULL); + + if (context->type == RTP_AUDIO_CODEC_TYPE_ENCODER) + opus_multistream_encoder_ctl(context->handle, OPUS_RESET_STATE); + else + opus_multistream_decoder_ctl(context->handle, OPUS_RESET_STATE); + + if (reason != NULL) + pw_log_info("Opus %s reset, reason: %s", rtp_audio_codec_type_name(context->type), reason); + else + pw_log_info("Opus %s reset (no reason given)", rtp_audio_codec_type_name(context->type)); +} + +static int rtp_opus_codec_get_delay(struct rtp_audio_codec_context *context, size_t *delay) +{ + int ret; + opus_int32 sample_rate = 0; + opus_int32 decoder_delay = 0; + + spa_assert(context != NULL); + spa_assert(delay != NULL); + + /* The Opus specification requires that all decoder implementations + * have the exact same delay. Encoders can vary, however. libopus + * combines the en- and decoder delay into a "lookahead value", + * which is accessible from the encoder (but not the decoder). + * Since for the RTP transmissions, it is beneficial to handle + * en- and decoder delay separately, extract the encoder delay out + * of the lookahead value if this is an encoder (by subtracting + * the decoder delay from the lookahead), and if this is a + * decoder, just return the decoder delay. + * + * The decoder delay is fixed to 2.5 ms according to RFC 6716, + * so this code needs to convert this to samples. */ + + if (context->type == RTP_AUDIO_CODEC_TYPE_ENCODER) + ret = opus_multistream_encoder_ctl(context->handle, OPUS_GET_SAMPLE_RATE(&sample_rate)); + else + ret = opus_multistream_decoder_ctl(context->handle, OPUS_GET_SAMPLE_RATE(&sample_rate)); + + if (ret != OPUS_OK) { + pw_log_error("could not get %s sample rate: %s", + rtp_audio_codec_type_name(context->type), opus_strerror(ret)); + return opus_error_to_neg_errno(ret); + } + + /* Convert the fixed 2.5 ms decoder delay to samples based + * on the en/decoder's sample rate. Define 2.5ms as 2500us + * here to do the calculation purely with integers. */ + decoder_delay = 2500LL * sample_rate / SPA_USEC_PER_SEC; + + if (context->type == RTP_AUDIO_CODEC_TYPE_ENCODER) { + opus_int32 lookahead = 0; + + ret = opus_multistream_encoder_ctl(context->handle, OPUS_GET_LOOKAHEAD(&lookahead)); + if (ret != OPUS_OK) { + pw_log_error("could not get encoder lookahead: %s", opus_strerror(ret)); + return opus_error_to_neg_errno(ret); + } + + if (SPA_UNLIKELY(lookahead < decoder_delay)) { + pw_log_error("lookahead %" PRId32 " is smaller than decoder delay %" PRId32, + (int32_t)lookahead, (int32_t)decoder_delay); + return -EINVAL; + } + + *delay = lookahead - decoder_delay; + return 0; + } else { + *delay = decoder_delay; + return 0; + } +} + +static int rtp_opus_codec_encode(struct rtp_audio_codec_context *context, const uint8_t *in_samples, + uint8_t **out_encoded_data, size_t *out_encoded_data_size) +{ + int ret; + + spa_assert(context != NULL); + spa_assert(in_samples != NULL); + spa_assert(out_encoded_data != NULL); + spa_assert(out_encoded_data_size != NULL); + + switch (context->audio_info.info.raw.format) { + case SPA_AUDIO_FORMAT_S16: + ret = opus_multistream_encode(context->handle, (const opus_int16 *)in_samples, + context->samples_per_frame, context->output_buffer, context->max_encoded_frame_size); + break; + case SPA_AUDIO_FORMAT_F32: + ret = opus_multistream_encode_float(context->handle, (const float *)in_samples, + context->samples_per_frame, context->output_buffer, context->max_encoded_frame_size); + break; + default: + return -EINVAL; + } + + if (ret >= 0) { + *out_encoded_data = context->output_buffer; + *out_encoded_data_size = ret; + pw_log_trace("encoded %zu samples to %zu bytes", context->samples_per_frame, + *out_encoded_data_size); + return 0; + } else { + pw_log_error("error while encoding audio: %s", opus_strerror(ret)); + return -EIO; + } +} + +static int rtp_opus_codec_decode(struct rtp_audio_codec_context *context, const uint8_t *in_encoded_data, + size_t in_encoded_data_size, uint8_t **out_samples, size_t *out_num_samples) +{ + int ret; + + spa_assert(context != NULL); + spa_assert(in_encoded_data != NULL); + spa_assert(in_encoded_data_size > 0); + spa_assert(out_samples != NULL); + spa_assert(out_num_samples != NULL); + + switch (context->audio_info.info.raw.format) { + case SPA_AUDIO_FORMAT_S16: + ret = opus_multistream_decode(context->handle, in_encoded_data, in_encoded_data_size, + (opus_int16 *)(context->output_buffer), context->samples_per_frame, 0); + break; + case SPA_AUDIO_FORMAT_F32: + ret = opus_multistream_decode_float(context->handle, in_encoded_data, in_encoded_data_size, + (float *)(context->output_buffer), context->samples_per_frame, 0); + break; + default: + return -EINVAL; + } + + if (ret >= 0) { + *out_samples = context->output_buffer; + *out_num_samples = ret; + pw_log_trace("decoded %zu bytes to %zu samples", in_encoded_data_size, *out_num_samples); + return 0; + } else { + pw_log_error("error while decoding audio: %s", opus_strerror(ret)); + return -EIO; + } +} + +static int rtp_opus_codec_apply_plc(struct rtp_audio_codec_context *context, + uint8_t **out_samples, size_t *out_num_samples) +{ + int ret; + + spa_assert(context != NULL); + spa_assert(out_samples != NULL); + spa_assert(out_num_samples != NULL); + + /* PLC is applied by "decoding" from a nullpointer. See: + * https://www.opus-codec.org/docs/opus_api-1.6/group__opus__multistream.html#gaa4b89541efe01970cf52e4a336db3ad0 */ + + switch (context->audio_info.info.raw.format) { + case SPA_AUDIO_FORMAT_S16: + ret = opus_multistream_decode(context->handle, NULL, 0, + (opus_int16 *)(context->output_buffer), context->samples_per_frame, 0); + break; + case SPA_AUDIO_FORMAT_F32: + ret = opus_multistream_decode_float(context->handle, NULL, 0, + (float *)(context->output_buffer), context->samples_per_frame, 0); + break; + default: + return -EINVAL; + } + + if (ret >= 0) { + *out_samples = context->output_buffer; + *out_num_samples = ret; + pw_log_debug("generated %zu PLC samples", *out_num_samples); + return 0; + } else { + pw_log_error("error while applying PLC: %s", opus_strerror(ret)); + return -EIO; + } +} + +static const char * rtp_opus_codec_get_name(void) +{ + return "Opus"; +} + +const struct rtp_audio_codec* get_rtp_opus_codec(void) +{ + static const struct rtp_audio_codec codec = { + .init = rtp_opus_codec_init, + .shutdown = rtp_opus_codec_shutdown, + .reset = rtp_opus_codec_reset, + .get_delay = rtp_opus_codec_get_delay, + .encode = rtp_opus_codec_encode, + .decode = rtp_opus_codec_decode, + .apply_plc = rtp_opus_codec_apply_plc, + .get_name = rtp_opus_codec_get_name, + }; + + return &codec; +} + +#else + +#include + +const struct rtp_audio_codec* get_rtp_opus_codec(void) +{ + return NULL; +} + +#endif diff --git a/src/modules/module-rtp/opus-codec.h b/src/modules/module-rtp/opus-codec.h new file mode 100644 index 000000000..1192e3c57 --- /dev/null +++ b/src/modules/module-rtp/opus-codec.h @@ -0,0 +1,20 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2026 Carlos Rafael Giani */ +/* SPDX-License-Identifier: MIT */ + +#ifndef PIPEWIRE_RTP_OPUS_CODEC_H +#define PIPEWIRE_RTP_OPUS_CODEC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "audio-codec.h" + +const struct rtp_audio_codec* get_rtp_opus_codec(void); + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* PIPEWIRE_RTP_OPUS_CODEC_H */ diff --git a/src/modules/module-rtp/opus.c b/src/modules/module-rtp/opus.c deleted file mode 100644 index c25fc17f8..000000000 --- a/src/modules/module-rtp/opus.c +++ /dev/null @@ -1,359 +0,0 @@ -/* PipeWire */ -/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ -/* SPDX-License-Identifier: MIT */ - -#ifdef HAVE_OPUS - -#include -#include - -/* TODO: Direct timestamp mode here may require a rework. See audio.c for a reference. - * Also check out the usage of actual_max_buffer_size in audio.c. */ - -static void rtp_opus_process_playback(void *data) -{ - struct impl *impl = data; - struct pw_buffer *buf; - struct spa_data *d; - uint32_t wanted, timestamp, target_buffer, stride, maxsize; - int32_t avail; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_info("Out of stream buffers: %m"); - return; - } - d = buf->buffer->datas; - - stride = impl->stride; - - maxsize = d[0].maxsize / stride; - wanted = buf->requested ? SPA_MIN(buf->requested, maxsize) : maxsize; - - if (impl->io_position && impl->direct_timestamp) { - /* in direct mode, read directly from the timestamp index, - * because sender and receiver are in sync, this would keep - * target_buffer of samples available. */ - spa_ringbuffer_read_update(&impl->ring, - impl->io_position->clock.position); - } - avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); - - target_buffer = impl->target_buffer; - - if (avail < (int32_t)wanted) { - enum spa_log_level level; - memset(d[0].data, 0, wanted * stride); - if (impl->have_sync) { - impl->have_sync = false; - level = SPA_LOG_LEVEL_WARN; - } else { - level = SPA_LOG_LEVEL_DEBUG; - } - pw_log(level, "underrun %d/%u < %u", - avail, target_buffer, wanted); - } else { - double error, corr; - if (impl->first) { - if ((uint32_t)avail > target_buffer) { - uint32_t skip = avail - target_buffer; - pw_log_debug("first: avail:%d skip:%u target:%u", - avail, skip, target_buffer); - timestamp += skip; - avail = target_buffer; - } - impl->first = false; - } else if (avail > (int32_t)SPA_MIN(target_buffer * 8, impl->buffer_size2 / stride)) { - pw_log_warn("overrun %u > %u", avail, target_buffer * 8); - timestamp += avail - target_buffer; - avail = target_buffer; - } - if (!impl->direct_timestamp) { - /* when not using direct timestamp and clocks are not - * in sync, try to adjust our playback rate to keep the - * requested target_buffer bytes in the ringbuffer */ - error = (double)target_buffer - (double)avail; - error = SPA_CLAMPD(error, -impl->max_error, impl->max_error); - - corr = spa_dll_update(&impl->dll, error); - - pw_log_trace("avail:%u target:%u error:%f corr:%f", avail, - target_buffer, error, corr); - - pw_stream_set_rate(impl->stream, 1.0 / corr); - } - spa_ringbuffer_read_data(&impl->ring, - impl->buffer, - impl->buffer_size2, - (timestamp * stride) & impl->buffer_mask2, - d[0].data, wanted * stride); - - timestamp += wanted; - spa_ringbuffer_read_update(&impl->ring, timestamp); - } - d[0].chunk->offset = 0; - d[0].chunk->size = wanted * stride; - d[0].chunk->stride = stride; - d[0].chunk->flags = 0; - buf->size = wanted; - - pw_stream_queue_buffer(impl->stream, buf); -} - -static int rtp_opus_receive(struct impl *impl, uint8_t *buffer, ssize_t len, - ssize_t hlen, uint64_t current_time) -{ - struct rtp_header *hdr; - ssize_t plen; - uint16_t seq; - uint32_t timestamp, samples, write, expected_write; - uint32_t stride = impl->stride; - OpusMSDecoder *dec = impl->stream_data; - int32_t filled; - int res; - - hdr = (struct rtp_header*)buffer; - - seq = ntohs(hdr->sequence_number); - if (impl->have_seq && impl->seq != seq) { - pw_log_info("unexpected seq (%d != %d) SSRC:%u", - seq, impl->seq, impl->ssrc); - impl->have_sync = false; - } - impl->seq = seq + 1; - impl->have_seq = true; - - timestamp = ntohl(hdr->timestamp) - impl->ts_offset; - - impl->receiving = true; - - plen = len - hlen; - - filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_write); - - /* we always write to timestamp + delay */ - write = timestamp + impl->target_buffer; - - if (!impl->have_sync) { - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u target:%u direct:%u", - timestamp, seq, impl->ts_offset, impl->ssrc, - impl->target_buffer, impl->direct_timestamp); - - /* we read from timestamp, keeping target_buffer of data - * in the ringbuffer. */ - impl->ring.readindex = timestamp; - impl->ring.writeindex = write; - filled = impl->target_buffer; - - spa_dll_init(&impl->dll); - spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); - memset(impl->buffer, 0, impl->buffer_size); - impl->have_sync = true; - } else if (expected_write != write) { - pw_log_debug("unexpected write (%u != %u)", - write, expected_write); - } - - if (filled + 2880 > (int32_t)(impl->buffer_size2 / stride)) { - pw_log_debug("capture overrun %u + %d > %u", filled, 2880, - impl->buffer_size2 / stride); - impl->have_sync = false; - } else { - uint32_t index = (write * stride) & impl->buffer_mask2, end; - - res = opus_multistream_decode_float(dec, - &buffer[hlen], plen, - (float*)&impl->buffer[index], 2880, - 0); - - end = index + (res * stride); - /* fold to the lower part of the ringbuffer when overflow */ - if (end > impl->buffer_size2) - memmove(impl->buffer, &impl->buffer[impl->buffer_size2], end - impl->buffer_size2); - - pw_log_info("receiving %zd len:%d timestamp:%d %u", plen, res, timestamp, index); - samples = res; - - write += samples; - spa_ringbuffer_write_update(&impl->ring, write); - } - return 0; -} - -static void rtp_opus_flush_packets(struct impl *impl) -{ - int32_t avail, tosend; - uint32_t stride, timestamp, offset; - uint8_t out[1280]; - struct iovec iov[2]; - struct rtp_header header; - OpusMSEncoder *enc = impl->stream_data; - int res = 0; - - avail = spa_ringbuffer_get_read_index(&impl->ring, ×tamp); - tosend = impl->psamples; - - if (avail < tosend) - return; - - stride = impl->stride; - - spa_zero(header); - header.v = 2; - header.pt = impl->payload; - header.ssrc = htonl(impl->ssrc); - - iov[0].iov_base = &header; - iov[0].iov_len = sizeof(header); - iov[1].iov_base = out; - iov[1].iov_len = 0; - - offset = 0; - while (avail >= tosend) { - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->ts_offset + timestamp); - - res = opus_multistream_encode_float(enc, - (const float*)&impl->buffer[offset * stride], tosend, - out, sizeof(out)); - - pw_log_trace("sending %d len:%d timestamp:%d", tosend, res, timestamp); - iov[1].iov_len = res; - - rtp_stream_call_send_packet(impl, iov, 2); - - impl->seq++; - timestamp += tosend; - offset += tosend; - avail -= tosend; - } - - pw_log_trace("move %d offset:%d", avail, offset); - memmove(impl->buffer, &impl->buffer[offset * stride], avail * stride); - - spa_ringbuffer_read_update(&impl->ring, timestamp); -} - -static void rtp_opus_process_capture(void *data) -{ - struct impl *impl = data; - struct pw_buffer *buf; - struct spa_data *d; - uint32_t offs, size, timestamp, expected_timestamp, stride; - int32_t filled, wanted; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_info("Out of stream buffers: %m"); - return; - } - d = buf->buffer->datas; - - offs = SPA_MIN(d[0].chunk->offset, d[0].maxsize); - size = SPA_MIN(d[0].chunk->size, d[0].maxsize - offs); - stride = impl->stride; - wanted = size / stride; - - filled = spa_ringbuffer_get_write_index(&impl->ring, &expected_timestamp); - - if (SPA_LIKELY(impl->io_position)) { - uint32_t rate = impl->io_position->clock.rate.denom; - timestamp = impl->io_position->clock.position * impl->rate / rate; - } else - timestamp = expected_timestamp; - - if (!impl->have_sync) { - pw_log_info("sync to timestamp:%u seq:%u ts_offset:%u SSRC:%u", - timestamp, impl->seq, impl->ts_offset, impl->ssrc); - impl->ring.readindex = impl->ring.writeindex = expected_timestamp = timestamp; - memset(impl->buffer, 0, impl->buffer_size); - impl->have_sync = true; - } else { - if (SPA_ABS((int32_t)expected_timestamp - (int32_t)timestamp) > 32) { - pw_log_warn("expected %u != timestamp %u", expected_timestamp, timestamp); - impl->have_sync = false; - } else if (filled + wanted > (int32_t)(impl->buffer_size / stride)) { - pw_log_warn("overrun %u + %u > %u", filled, wanted, impl->buffer_size / stride); - impl->have_sync = false; - } - } - - spa_ringbuffer_write_data(&impl->ring, - impl->buffer, - impl->buffer_size, - (filled * stride) & impl->buffer_mask, - SPA_PTROFF(d[0].data, offs, void), wanted * stride); - expected_timestamp += wanted; - spa_ringbuffer_write_update(&impl->ring, expected_timestamp); - - pw_stream_queue_buffer(impl->stream, buf); - - rtp_opus_flush_packets(impl); -} - -static void rtp_opus_deinit(struct impl *impl, enum spa_direction direction) -{ - if (impl->stream_data) { - if (direction == SPA_DIRECTION_INPUT) - opus_multistream_encoder_destroy(impl->stream_data); - else - opus_multistream_decoder_destroy(impl->stream_data); - } -} - -static int rtp_opus_init(struct impl *impl, enum spa_direction direction) -{ - int err; - unsigned char mapping[255]; - uint32_t i; - - if (impl->info.info.opus.channels > 255) - return -EINVAL; - - if (impl->psamples >= 2880) - impl->psamples = 2880; - else if (impl->psamples >= 1920) - impl->psamples = 1920; - else if (impl->psamples >= 960) - impl->psamples = 960; - else if (impl->psamples >= 480) - impl->psamples = 480; - else if (impl->psamples >= 240) - impl->psamples = 240; - else - impl->psamples = 120; - - for (i = 0; i < impl->info.info.opus.channels; i++) - mapping[i] = i; - - impl->deinit = rtp_opus_deinit; - impl->receive_rtp = rtp_opus_receive; - if (direction == SPA_DIRECTION_INPUT) { - impl->stream_events.process = rtp_opus_process_capture; - - impl->stream_data = opus_multistream_encoder_create( - impl->info.info.opus.rate, - impl->info.info.opus.channels, - impl->info.info.opus.channels, 0, - mapping, - OPUS_APPLICATION_AUDIO, - &err); - } - else { - impl->stream_events.process = rtp_opus_process_playback; - - impl->stream_data = opus_multistream_decoder_create( - impl->info.info.opus.rate, - impl->info.info.opus.channels, - impl->info.info.opus.channels, 0, - mapping, - &err); - } - if (!impl->stream_data) - pw_log_error("opus error: %d", err); - return impl->stream_data ? 0 : err; -} -#else -static int rtp_opus_init(struct impl *impl, enum spa_direction direction) -{ - return -ENOTSUP; -} -#endif diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 5bbc43abc..7ead41d0e 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,9 @@ #include #include #include +#include +#include +#include PW_LOG_TOPIC_EXTERN(mod_topic); #define PW_LOG_TOPIC_DEFAULT mod_topic @@ -105,7 +109,6 @@ struct impl { const struct rtp_format_info *rtp_format_info; enum spa_direction direction; - void *stream_data; uint32_t rate; uint32_t stride; @@ -113,6 +116,15 @@ struct impl { uint8_t payload; uint32_t ssrc; uint16_t seq; + /* This is used for pre-checking for received packets that arrive + * out of order. Depending on the audio type, a jitter buffer + * may be used intermediately, and then, the seq field above + * will be set _after_ the jitter buffer reorders packets, + * so it cannot be used for the pre-checking purpose. Thus, + * this separate seq field is required. + * It is int32_t to be able to store -1 as an indicator that + * no expected seqnum is set yet. */ + int32_t next_expected_incoming_seq; unsigned fixed_ssrc:1; unsigned have_ssrc:1; unsigned ignore_ssrc:1; @@ -187,6 +199,19 @@ struct impl { void (*deinit)(struct impl *impl, enum spa_direction direction); int (*resend_packets)(struct impl *impl, uint16_t seq, uint16_t num); + struct rtp_jitter_buffer jitter_buffer; + const struct rtp_audio_codec *audio_codec; + struct rtp_audio_codec_context audio_codec_context; + /* This buffer is needed in case the data that is to be encoded is wrapped + * around the ring buffer border. In such a case, the two halves have to + * be copied and merged into this buffer, since audio codecs expect one + * contiguous input memory block as the data to encode. */ + uint8_t *audio_encoder_staging_buffer; + + /* Delay of the audio codec. Depending on how the audio codec is configured, + * this is either the decoder delay or the encoder delay. In samples. */ + size_t codec_delay; + /* * pw_filter where the filter would be driven at the PTP clock * rate with RTP sink being driven at the sink driver clock rate @@ -311,7 +336,6 @@ static int do_finish_stopping_state(struct spa_loop *loop, bool async, uint32_t #include "module-rtp/audio.c" #include "module-rtp/midi.c" -#include "module-rtp/opus.c" struct rtp_format_info { uint32_t media_subtype; @@ -407,6 +431,11 @@ static int stream_start(struct impl *impl) pw_log_error("error while closing leftover connection: %s", spa_strerror(res)); } + impl->next_expected_incoming_seq = -1; + + if (impl->audio_codec != NULL) + reset_rtp_audio_codec(impl, "starting new stream"); + impl->reset_ringbuffer(impl); res = 0; @@ -519,23 +548,18 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, } } -static void update_latency_params(struct impl *impl) +static void fill_latency_params(struct impl *impl, struct spa_pod_builder *b, + const struct spa_pod **params, uint32_t *n_params) { - uint32_t n_params = 0; - const struct spa_pod *params[2]; - uint8_t buffer[1024]; - struct spa_pod_builder b; struct spa_latency_info main_latency; - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - /* main_latency is the latency in the direction indicated by impl->direction. - * In RTP streams, this consists solely of the process latency. (In theory, - * PipeWire SPA nodes could have additional latencies on top of the process - * latency, but this is not the case here.) The other direction is already - * handled by pw_stream. + * In RTP streams, this consists of the process latency. In the INPUT direction + * (which is what sinks use), the encoder delay is also part of main_latency. + * The full latency params also include latency in the other direction - + * this is already handled by pw_stream. * - * The main_latncy is passed as updated SPA_PARAM_Latency params to the stream. + * The main_latency is passed as updated SPA_PARAM_Latency params to the stream. * That way, the stream always gets information of latency for _both_ directions; * the direction indicated by impl->direction is covered by main_latency, and * the opposite direction is already taken care of by the default pw_stream @@ -547,10 +571,27 @@ static void update_latency_params(struct impl *impl) main_latency = SPA_LATENCY_INFO(impl->direction); spa_process_latency_info_add(&impl->process_latency, &main_latency); - params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &main_latency); - params[n_params++] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, - &impl->process_latency); + if (impl->direction == PW_DIRECTION_INPUT) { + int64_t codec_delay_ns = (int64_t)(impl->codec_delay) * SPA_NSEC_PER_SEC / impl->rate; + main_latency.min_ns += codec_delay_ns; + main_latency.max_ns += codec_delay_ns; + } + params[(*n_params)++] = spa_latency_build(b, SPA_PARAM_Latency, &main_latency); + params[(*n_params)++] = spa_process_latency_build(b, SPA_PARAM_ProcessLatency, + &impl->process_latency); +} + +static void update_latency_params(struct impl *impl) +{ + const struct spa_pod *params[2]; + uint32_t n_params; + uint8_t buffer[1024]; + struct spa_pod_builder b; + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + fill_latency_params(impl, &b, params, &n_params); pw_stream_update_params(impl->stream, params, n_params); } @@ -609,11 +650,12 @@ static const struct rtp_format_info *find_rtp_pcm_audio_format_info(const struct return NULL; } -static int parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) +static int parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info, + const char *default_format) { return spa_audio_info_raw_init_dict_keys(info, &SPA_DICT_ITEMS( - SPA_DICT_ITEM(SPA_KEY_AUDIO_FORMAT, DEFAULT_RAW_AUDIO_FORMAT), + SPA_DICT_ITEM(SPA_KEY_AUDIO_FORMAT, default_format), SPA_DICT_ITEM(SPA_KEY_AUDIO_RATE, SPA_STRINGIFY(DEFAULT_RATE)), SPA_DICT_ITEM(SPA_KEY_AUDIO_POSITION, DEFAULT_POSITION)), &props->dict, @@ -660,6 +702,8 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, float latency_msec; int res; bool process_latency_from_sess; + uint32_t audio_codec_type = 0; + const char *default_audio_format = NULL; impl = calloc(1, sizeof(*impl)); if (impl == NULL) { @@ -690,11 +734,15 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->info.media_type = SPA_MEDIA_TYPE_audio; impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; impl->payload = 127; + audio_codec_type = SPA_MEDIA_SUBTYPE_raw; + default_audio_format = DEFAULT_RAW_AUDIO_FORMAT; } else if (spa_streq(str, "raop")) { impl->info.media_type = SPA_MEDIA_TYPE_audio; impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; impl->payload = 0x60; + audio_codec_type = SPA_MEDIA_SUBTYPE_raw; + default_audio_format = DEFAULT_RAOP_AUDIO_FORMAT; } else if (spa_streq(str, "midi")) { impl->info.media_type = SPA_MEDIA_TYPE_application; @@ -702,10 +750,14 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->payload = 0x61; } #ifdef HAVE_OPUS + /* The "opus" sess.media type is actually raw audio that + * is encoded with Opus before sending it out as RTP. */ else if (spa_streq(str, "opus")) { impl->info.media_type = SPA_MEDIA_TYPE_audio; - impl->info.media_subtype = SPA_MEDIA_SUBTYPE_opus; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; impl->payload = 127; + audio_codec_type = SPA_MEDIA_SUBTYPE_opus; + default_audio_format = DEFAULT_OPUS_AUDIO_FORMAT; } #endif else { @@ -716,22 +768,61 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, switch (impl->info.media_subtype) { case SPA_MEDIA_SUBTYPE_raw: - if ((res = parse_audio_info(props, &impl->info.info.raw)) < 0) { + if ((res = parse_audio_info(props, &impl->info.info.raw, default_audio_format)) < 0) { pw_log_error("can't parse format: %s", spa_strerror(res)); goto out; } impl->stream_info = impl->info; - impl->rtp_format_info = find_rtp_pcm_audio_format_info(&impl->info); - if (impl->rtp_format_info == NULL) { - pw_log_error("unsupported audio format:%d (%s) channels:%d", + impl->rate = impl->stream_info.info.raw.rate; + + /* Pick the RTP information and stride values suitable for + * the specified codec type. If the codec type is set to + * SPA_MEDIA_SUBTYPE_raw, then no special encoding is done, + * and PCM samples are transmitted directly over RTP. */ + switch (audio_codec_type) { + case SPA_MEDIA_SUBTYPE_raw: + impl->rtp_format_info = find_rtp_pcm_audio_format_info(&impl->info); + if (impl->rtp_format_info == NULL) { + pw_log_error("unsupported audio format:%d (%s) channels:%d", impl->stream_info.info.raw.format, spa_type_audio_format_to_short_name(impl->stream_info.info.raw.format), impl->stream_info.info.raw.channels); + res = -EINVAL; + goto out; + } + impl->stride = impl->rtp_format_info->size * impl->stream_info.info.raw.channels; + pw_log_info("configured raw PCM RTP payload: MIME: %s format: %s rate: %" + PRIu32 " stride: %" PRIu32, impl->rtp_format_info->mime, + spa_type_audio_format_to_short_name(impl->rtp_format_info->format), + impl->rate, impl->stride); + break; + case SPA_MEDIA_SUBTYPE_opus: + impl->rtp_format_info = &rtp_opus_format_info; + switch (impl->stream_info.info.raw.format) { + case SPA_AUDIO_FORMAT_S16: + impl->stride = 2 * impl->stream_info.info.raw.channels; + break; + case SPA_AUDIO_FORMAT_F32: + impl->stride = 4 * impl->stream_info.info.raw.channels; + break; + default: + pw_log_error("unsupported raw audio format for encoding to Opus:%d (%s)", + impl->stream_info.info.raw.format, + spa_type_audio_format_to_short_name(impl->stream_info.info.raw.format)); + res = -EINVAL; + goto out; + } + pw_log_info("configured Opus RTP payload: format: %s rate: %" PRIu32 " stride: %" + PRIu32, spa_type_audio_format_to_short_name(impl->stream_info.info.raw.format), + impl->rate, impl->stride); + break; + default: + pw_log_error("unsupported audio encoding:%d (%s)", audio_codec_type, + spa_type_to_short_name(audio_codec_type, + spa_type_media_subtype, "")); res = -EINVAL; goto out; } - impl->stride = impl->rtp_format_info->size * impl->stream_info.info.raw.channels; - impl->rate = impl->stream_info.info.raw.rate; break; case SPA_MEDIA_SUBTYPE_control: impl->stream_info = impl->info; @@ -741,21 +832,8 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->rate = pw_properties_get_uint32(props, "midi.rate", 10000); if (impl->rate == 0) impl->rate = 10000; - break; - case SPA_MEDIA_SUBTYPE_opus: - impl->stream_info.media_type = SPA_MEDIA_TYPE_audio; - impl->stream_info.media_subtype = SPA_MEDIA_SUBTYPE_raw; - if ((res = parse_audio_info(props, &impl->stream_info.info.raw)) < 0) { - pw_log_error("can't parse format: %s", spa_strerror(res)); - goto out; - } - impl->stream_info.info.raw.format = SPA_AUDIO_FORMAT_F32; - impl->info.info.opus.rate = impl->stream_info.info.raw.rate; - impl->info.info.opus.channels = impl->stream_info.info.raw.channels; - - impl->rtp_format_info = &rtp_opus_format_info; - impl->stride = impl->rtp_format_info->size * impl->stream_info.info.raw.channels; - impl->rate = impl->stream_info.info.raw.rate; + pw_log_info("configured MIDI RTP payload: rate: %" PRIu32 " stride: %" PRIu32, + impl->rate, impl->stride); break; default: spa_assert_not_reached(); @@ -845,6 +923,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->payload_size = impl->mtu - impl->header_size; impl->seq = pw_rand32(); + impl->next_expected_incoming_seq = -1; str = pw_properties_get(props, "sess.min-ptime"); if (!spa_atof(str, &min_ptime)) @@ -954,6 +1033,26 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, spa_dll_set_bw(&impl->dll, SPA_DLL_BW_MIN, 128, impl->rate); impl->corr = 1.0; + switch (impl->info.media_subtype) { + case SPA_MEDIA_SUBTYPE_raw: + switch (audio_codec_type) { + case SPA_MEDIA_SUBTYPE_opus: + res = setup_rtp_audio_codec(impl, get_rtp_opus_codec(), props); + break; + default: + res = 0; + break; + } + + if (SPA_UNLIKELY(res < 0)) + goto out; + + break; + + default: + break; + } + impl->stream = pw_stream_new(core, "rtp-session", spa_steal_ptr(props)); if (impl->stream == NULL) { res = -errno; @@ -980,12 +1079,6 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_control)); rtp_midi_init(impl, direction); break; - case SPA_MEDIA_SUBTYPE_opus: - params[n_params++] = spa_format_audio_build(&b, - SPA_PARAM_EnumFormat, &impl->stream_info); - flags |= PW_STREAM_FLAG_AUTOCONNECT; - rtp_opus_init(impl, direction); - break; default: res = -EINVAL; goto out; @@ -998,24 +1091,20 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, * quantity in turn is subjected to constraint checks (see above), it is * possible that the _actual_ session latency no longer equals the value * of sess.latency.msec by the time this location is reached. To take into - * account these constraint adjustments, convert back the impl->target_buffer - * to nanoseconds, and use that as the process latency. + * account these constraint adjustments, fill_latency_params() converts + * back the impl->target_buffer to nanoseconds, and uses that as the + * process latency. * - * Then, just like how update_latency_params() does it, construct the - * SPA_PARAM_Latency and SPA_PARAM_ProcessLatency params to let the new + * Then, just like in update_latency_params(), the SPA_PARAM_Latency + * and SPA_PARAM_ProcessLatency params are constructed to let the new * pw_stream know of these latency figures right from the start. */ - struct spa_latency_info latency; - impl->process_latency.ns = (int64_t)(impl->target_buffer * 1e9 / impl->rate); - pw_log_debug("set process latency to %" PRId64 " based on sess.latency.msec " - "value %f", impl->process_latency.ns, latency_msec); + pw_log_debug("set process latency to %" PRId64 " ns based on sess.latency.msec " + "value %f ms (= %" PRIu32 " samples)", impl->process_latency.ns, + latency_msec, impl->target_buffer); - latency = SPA_LATENCY_INFO(impl->direction); - spa_process_latency_info_add(&(impl->process_latency), &latency); - params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency); - params[n_params++] = spa_process_latency_build(&b, SPA_PARAM_ProcessLatency, - &(impl->process_latency)); + fill_latency_params(impl, &b, params, &n_params); } pw_stream_add_listener(impl->stream, @@ -1040,6 +1129,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, return (struct rtp_stream*)impl; out: + teardown_rtp_audio_codec(impl); pw_properties_free(props); if (impl) { if (impl->stream) @@ -1058,6 +1148,8 @@ void rtp_stream_destroy(struct rtp_stream *s) rtp_stream_emit_destroy(impl); + teardown_rtp_audio_codec(impl); + if (impl->deinit) impl->deinit(impl, impl->direction); @@ -1142,7 +1234,17 @@ unexpected_ssrc: int rtp_stream_resend_packets(struct rtp_stream *s, uint16_t seq, uint16_t num) { struct impl *impl = (struct impl*)s; - if (impl->resend_packets) + /* Resending only works with raw data, since codecs usually + * have internal state that is updated after encoding a + * packet. This then means that resend attempts would not + * yield the exact same packet, and this can corrupt the + * state of the decoder in a receiver. + * To support resending with codecs, the last N packets + * have to be cached somehow. Given the fact that, thus + * far, only the RAOP sink resends packets, and RAOP only + * supports raw PCM, it is currently easier to just + * disable retransmissions when a codec is in use. */ + if (impl->resend_packets && (impl->audio_codec == NULL)) return impl->resend_packets(impl, seq, num); else return -ENOTSUP; diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 94ceb3650..0cf44eda7 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -13,6 +13,7 @@ struct rtp_stream; #define DEFAULT_RAW_AUDIO_FORMAT "S16BE" #define DEFAULT_RAOP_AUDIO_FORMAT "S16LE" +#define DEFAULT_OPUS_AUDIO_FORMAT "F32" #define DEFAULT_RATE 48000 #define DEFAULT_CHANNELS 2 #define DEFAULT_POSITION "[ FL FR ]"