From 98db54f55dffe542345662603231447e964edee8 Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 2 Oct 2023 14:52:22 +0200 Subject: [PATCH 1/8] module-raop-sink: Simplify rtp send functions Reorganizes the rtp send functions. Part of porting module-raop-sink towards the module-rtp/stream facilities. --- src/modules/module-raop-sink.c | 217 +++++++++++++-------------------- 1 file changed, 88 insertions(+), 129 deletions(-) diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 1da8280ae..cfd346f2b 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -215,7 +215,6 @@ struct impl { struct pw_stream *stream; struct spa_hook stream_listener; struct spa_audio_info_raw info; - uint32_t frame_size; struct pw_rtsp_client *rtsp; struct spa_hook rtsp_listener; @@ -246,7 +245,8 @@ struct impl { int server_fd; struct spa_source *server_source; - uint32_t block_size; + uint32_t mtu; + uint32_t stride; uint32_t latency; uint16_t seq, cseq; @@ -307,11 +307,9 @@ static inline uint64_t ntp_now(void) return timespec_to_ntp(&now); } -static int send_udp_sync_packet(struct impl *impl, - struct sockaddr *dest_addr, socklen_t addrlen) +static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime) { uint32_t out[3]; - uint32_t rtptime = impl->rtptime; uint32_t latency = impl->latency; uint64_t transmitted; struct rtp_header header; @@ -339,8 +337,8 @@ static int send_udp_sync_packet(struct impl *impl, iov[1].iov_base = out; iov[1].iov_len = sizeof(out); - msg.msg_name = dest_addr; - msg.msg_namelen = addrlen; + msg.msg_name = NULL; + msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 2; msg.msg_control = NULL; @@ -440,146 +438,93 @@ static int write_codec_pcm(void *dst, void *frames, uint32_t n_frames) return bp - b + 1; } -static int flush_to_udp_packet(struct impl *impl) +static ssize_t send_packet(int fd, struct msghdr *msg) { - const size_t max = 8 + impl->block_size; - uint32_t out[max], len, n_frames; - struct rtp_header header; - struct iovec iov[2]; - struct msghdr msg; - uint8_t *dst; - int res; - - if (!impl->recording) - return 0; - - if (impl->first || ++impl->sync == impl->sync_period) { - impl->sync = 0; - send_udp_sync_packet(impl, NULL, 0); - } - - spa_zero(header); - header.v = 2; - header.pt = 96; - if (impl->first) - header.m = 1; - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->rtptime); - header.ssrc = htonl(impl->ssrc); - - iov[0].iov_base = &header; - iov[0].iov_len = 12; - - n_frames = impl->filled / impl->frame_size; - dst = (uint8_t*)&out[0]; - - switch (impl->codec) { - case CODEC_PCM: - case CODEC_ALAC: - len = write_codec_pcm(dst, impl->buffer, n_frames); - break; - default: - len = 8 + impl->block_size; - memset(dst, 0, len); - break; - } - if (impl->encryption == CRYPTO_RSA) - aes_encrypt(impl, dst, len); - - iov[1].iov_base = out; - iov[1].iov_len = len; - - impl->rtptime += n_frames; - impl->seq = (impl->seq + 1) & 0xffff; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 2; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - res = sendmsg(impl->server_fd, &msg, MSG_NOSIGNAL); - if (res < 0) { - res = -errno; - pw_log_warn("error streaming packet: %d", res); - } - - impl->first = false; - - return res; + ssize_t n; + n = sendmsg(fd, msg, MSG_NOSIGNAL); + if (n < 0) + pw_log_debug("sendmsg() failed: %m"); + return n; } -static int flush_to_tcp_packet(struct impl *impl) +static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) { - const size_t max = 8 + impl->block_size; - uint32_t tcp_pkt[1], out[max], len, n_frames; - struct rtp_header header; - struct iovec iov[3]; + struct impl *impl = data; + const size_t max = 8 + impl->mtu; + uint32_t tcp_pkt[1], out[max], len, n_frames, rtptime; + struct iovec tcp_iov[3], udp_iov[2]; + struct rtp_header *header; struct msghdr msg; uint8_t *dst; - int res; if (!impl->recording) - return 0; + return; - tcp_pkt[0] = htonl(0x24000000); + header = (struct rtp_header*)iov[0].iov_base; + if (header->v != 2) + pw_log_warn("invalid rtp packet version"); - iov[0].iov_base = &tcp_pkt; - iov[0].iov_len = 4; + rtptime = htonl(header->timestamp); - spa_zero(header); - header.v = 2; - header.pt = 96; - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->rtptime); - header.ssrc = htonl(impl->ssrc); + if (header->m || impl->first || ++impl->sync == impl->sync_period) { + send_udp_sync_packet(impl, rtptime); + impl->sync = 0; + } - iov[1].iov_base = &header; - iov[1].iov_len = 12; + n_frames = iov[1].iov_len / impl->stride; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; - n_frames = impl->filled / impl->frame_size; dst = (uint8_t*)&out[0]; switch (impl->codec) { case CODEC_PCM: case CODEC_ALAC: - len = write_codec_pcm(dst, impl->buffer, n_frames); + len = write_codec_pcm(dst, (void *)iov[1].iov_base, n_frames); break; default: - len = 8 + impl->block_size; + len = 8 + impl->mtu; memset(dst, 0, len); break; } if (impl->encryption == CRYPTO_RSA) aes_encrypt(impl, dst, len); - out[0] |= htonl((uint32_t) len + 12); + switch (impl->protocol) { + case PROTO_UDP: + udp_iov[0].iov_base = header; + udp_iov[0].iov_len = 12; + udp_iov[1].iov_base = out; + udp_iov[1].iov_len = len; - iov[2].iov_base = out; - iov[2].iov_len = len; + msg.msg_iov = udp_iov; + msg.msg_iovlen = 2; - impl->rtptime += n_frames; - impl->seq = (impl->seq + 1) & 0xffff; + break; + case PROTO_TCP: + out[0] |= htonl((uint32_t) len + 12); + tcp_pkt[0] = htonl(0x24000000); - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 2; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; + tcp_iov[0].iov_base = &tcp_pkt; + tcp_iov[0].iov_len = 4; + tcp_iov[1].iov_base = header; + tcp_iov[1].iov_len = 12; + tcp_iov[2].iov_base = out; + tcp_iov[2].iov_len = len; - res = sendmsg(impl->server_fd, &msg, MSG_NOSIGNAL); - if (res < 0) { - res = -errno; - pw_log_warn("error streaming packet: %d", res); + msg.msg_iov = tcp_iov; + msg.msg_iovlen = 3; + + break; } - impl->first = false; + pw_log_debug("raop sending header:%ld data:%ld", udp_iov[0].iov_len, udp_iov[1].iov_len); - return res; + send_packet(impl->server_fd, &msg); } static void playback_stream_process(void *d) @@ -589,6 +534,8 @@ static void playback_stream_process(void *d) struct spa_data *bd; uint8_t *data; uint32_t offs, size; + struct iovec iov[2]; + struct rtp_header header; if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { pw_log_debug("out of buffers: %m"); @@ -601,10 +548,20 @@ static void playback_stream_process(void *d) size = SPA_MIN(bd->chunk->size, bd->maxsize - offs); data = SPA_PTROFF(bd->data, offs, uint8_t); - while (size > 0 && impl->block_size > 0) { + spa_zero(header); + header.v = 2; + header.pt = 96; + if (impl->first) + header.m = 1; + header.ssrc = htonl(impl->ssrc); + + while (size > 0 && impl->mtu > 0) { uint32_t avail, to_fill; - avail = impl->block_size - impl->filled; + header.sequence_number = htons(impl->seq); + header.timestamp = htonl(impl->rtptime); + + avail = impl->mtu - impl->filled; to_fill = SPA_MIN(avail, size); memcpy(&impl->buffer[impl->filled], data, to_fill); @@ -615,15 +572,17 @@ static void playback_stream_process(void *d) data += to_fill; if (avail == 0) { - switch (impl->protocol) { - case PROTO_UDP: - flush_to_udp_packet(impl); - break; - case PROTO_TCP: - flush_to_tcp_packet(impl); - break; - } + iov[0].iov_base = &header; + iov[0].iov_len = 12; + iov[1].iov_base = impl->buffer; + iov[1].iov_len = impl->filled; + + stream_send_packet(impl, iov, 2); + + impl->rtptime += (impl->filled / impl->stride); + impl->seq = (impl->seq + 1) & 0xffff; impl->filled = 0; + impl->first = false; } } @@ -1013,7 +972,7 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head impl->first = true; impl->sync = 0; - impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size); + impl->sync_period = impl->info.rate / (impl->mtu / impl->stride); impl->recording = true; rtsp_send_volume(impl); @@ -1327,7 +1286,7 @@ static int rtsp_do_announce(struct impl *impl) else frames = FRAMES_PER_UDP_PACKET; - impl->block_size = frames * impl->frame_size; + impl->mtu = frames * impl->stride; pw_rtsp_client_get_local_ip(impl->rtsp, &ip_version, local_ip, sizeof(local_ip)); @@ -1937,7 +1896,7 @@ static void parse_audio_info(const struct pw_properties *props, struct spa_audio parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); } -static int calc_frame_size(struct spa_audio_info_raw *info) +static int calc_stride(struct spa_audio_info_raw *info) { int res = info->channels; switch (info->format) { @@ -2081,8 +2040,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) parse_audio_info(impl->stream_props, &impl->info); - impl->frame_size = calc_frame_size(&impl->info); - if (impl->frame_size == 0) { + impl->stride = calc_stride(&impl->info); + if (impl->stride == 0) { pw_log_error("unsupported audio format:%d channels:%d", impl->info.format, impl->info.channels); res = -EINVAL; From 9eba60a6358c44b623dd04dd69cd73787053166a Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:20:39 +0200 Subject: [PATCH 2/8] module-rtp/stream: Add ability to set marker on first packet --- src/modules/module-rtp/audio.c | 7 ++++++- src/modules/module-rtp/stream.c | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index d6305a686..326c65d84 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -226,6 +226,10 @@ static void rtp_audio_flush_packets(struct impl *impl) iov[0].iov_len = sizeof(header); while (avail >= tosend) { + if (impl->marker_on_first && impl->first) + header.m = 1; + else + header.m = 0; header.sequence_number = htons(impl->seq); header.timestamp = htonl(impl->ts_offset + timestamp); @@ -234,11 +238,12 @@ static void rtp_audio_flush_packets(struct impl *impl) (timestamp * stride) & BUFFER_MASK, &iov[1], tosend * stride); - pw_log_trace("sending %d timestamp:%d", tosend, timestamp); + pw_log_trace("sending %d avail:%d ts_offset:%d timestamp:%d", tosend, avail, impl->ts_offset, timestamp); rtp_stream_emit_send_packet(impl, iov, 3); impl->seq++; + impl->first = false; timestamp += tosend; avail -= tosend; } diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 3eb8d5b05..ff69e3e2d 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -58,6 +58,7 @@ struct impl { unsigned have_ssrc:1; unsigned ignore_ssrc:1; unsigned have_seq:1; + unsigned marker_on_first:1; uint32_t ts_offset; uint32_t psamples; uint32_t mtu; @@ -132,6 +133,8 @@ static int stream_start(struct impl *impl) if (impl->started) return 0; + impl->first = true; + rtp_stream_emit_state_changed(impl, true, NULL); impl->started = true; @@ -364,6 +367,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, if (pw_properties_get(props, PW_KEY_NODE_NETWORK) == NULL) pw_properties_set(props, PW_KEY_NODE_NETWORK, "true"); + impl->marker_on_first = pw_properties_get_bool(props, "sess.marker-on-first", false); impl->ignore_ssrc = pw_properties_get_bool(props, "sess.ignore-ssrc", false); impl->direct_timestamp = pw_properties_get_bool(props, "sess.ts-direct", false); From 35330cf461caf3910d24bbfcd732115e3b4c5207 Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:21:17 +0200 Subject: [PATCH 3/8] module-rtp/stream: Add param_changed method This method can be used to access the param_changed method of the underlying pw_stream. Also adds new public functions rtp_stream_set_param and rtp_stream_update_params which plum things through to pw_stream_set_param and pw_stream_update_params respectively. --- src/modules/module-rtp/stream.c | 24 ++++++++++++++++++++++++ src/modules/module-rtp/stream.h | 7 +++++++ 2 files changed, 31 insertions(+) diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index ff69e3e2d..cfe3a6fe4 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -32,6 +32,7 @@ struct rtp_stream_events, m, v, ##__VA_ARGS__) #define rtp_stream_emit_destroy(s) rtp_stream_emit(s, destroy, 0) #define rtp_stream_emit_state_changed(s,n,e) rtp_stream_emit(s, state_changed,0,n,e) +#define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p) #define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l) #define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq) @@ -179,10 +180,17 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, } } +static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = d; + rtp_stream_emit_param_changed(impl, id, param); +}; + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = on_stream_state_changed, + .param_changed = on_stream_param_changed, .io_changed = stream_io_changed, }; @@ -534,3 +542,19 @@ uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate) return pos->clock.position * impl->rate * pos->clock.rate.num / pos->clock.rate.denom; } + +int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_set_param(impl->stream, id, param); +} + +int rtp_stream_update_params(struct rtp_stream *s, + const struct spa_pod **params, + uint32_t n_params) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_update_params(impl->stream, params, n_params); +} \ No newline at end of file diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 7686c565e..84ec1c58f 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -31,6 +31,8 @@ struct rtp_stream_events { void (*state_changed) (void *data, bool started, const char *error); + void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param); + void (*send_packet) (void *data, struct iovec *iov, size_t iovlen); void (*send_feedback) (void *data, uint32_t senum); @@ -46,6 +48,11 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate); +int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param); + +int rtp_stream_update_params(struct rtp_stream *stream, + const struct spa_pod **params, + uint32_t n_params); #ifdef __cplusplus } From 1200bd7d20f2ecca4cf89f053db35fc7fc75d46e Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:22:06 +0200 Subject: [PATCH 4/8] module-rtp/stream: Add getter for property --- src/modules/module-rtp/stream.c | 7 +++++++ src/modules/module-rtp/stream.h | 2 ++ 2 files changed, 9 insertions(+) diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index cfe3a6fe4..36996ea90 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -543,6 +543,13 @@ uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate) pos->clock.rate.num / pos->clock.rate.denom; } +uint16_t rtp_stream_get_seq(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + + return impl->seq; +} + int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param) { struct impl *impl = (struct impl*)s; diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 84ec1c58f..bfc126616 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -48,6 +48,8 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate); +uint16_t rtp_stream_get_seq(struct rtp_stream *s); + int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param); int rtp_stream_update_params(struct rtp_stream *stream, From 89d935c9f66c606e07c2481ec4e0acd52d4dcddb Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:22:32 +0200 Subject: [PATCH 5/8] module-rtp/stream: Add setter for property --- src/modules/module-rtp/stream.c | 7 +++++++ src/modules/module-rtp/stream.h | 2 ++ 2 files changed, 9 insertions(+) diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 36996ea90..749f07b23 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -550,6 +550,13 @@ uint16_t rtp_stream_get_seq(struct rtp_stream *s) return impl->seq; } +void rtp_stream_set_first(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + + impl->first = true; +} + int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param) { struct impl *impl = (struct impl*)s; diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index bfc126616..9be5c0d84 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -50,6 +50,8 @@ uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate); uint16_t rtp_stream_get_seq(struct rtp_stream *s); +void rtp_stream_set_first(struct rtp_stream *s); + int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param); int rtp_stream_update_params(struct rtp_stream *stream, From 8704aaa044c75f23773e8e45d3093504867fc5cf Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:23:02 +0200 Subject: [PATCH 6/8] module-rtp/stream: Add getter for pw_stream state --- src/modules/module-rtp/stream.c | 7 +++++++ src/modules/module-rtp/stream.h | 2 ++ 2 files changed, 9 insertions(+) diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 749f07b23..86f63ab59 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -557,6 +557,13 @@ void rtp_stream_set_first(struct rtp_stream *s) impl->first = true; } +enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **error) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_get_state(impl->stream, error); +} + int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param) { struct impl *impl = (struct impl*)s; diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 9be5c0d84..d53db7ecd 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -52,6 +52,8 @@ uint16_t rtp_stream_get_seq(struct rtp_stream *s); void rtp_stream_set_first(struct rtp_stream *s); +enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **error); + int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param); int rtp_stream_update_params(struct rtp_stream *stream, From cbac8c9040abc7982f6cba7ced910e21bd7208b8 Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:23:44 +0200 Subject: [PATCH 7/8] module-rtp/stream: Add support for RAOP --- src/modules/module-rtp/stream.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 86f63ab59..217c9a876 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -104,6 +104,7 @@ static const struct format_info audio_format_info[] = { { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_LE, 2, "L16", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, { SPA_MEDIA_SUBTYPE_opus, 0, 4, "opus", "audio" }, @@ -298,6 +299,12 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; impl->payload = 127; } + else if (spa_streq(str, "raop")) { + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + impl->payload = 0x60; + impl->marker_on_first = 1; + } else if (spa_streq(str, "midi")) { impl->info.media_type = SPA_MEDIA_TYPE_application; impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; From 60d0943c19efe307dd9d63865e140f51b1e62bd5 Mon Sep 17 00:00:00 2001 From: Christian Glombek Date: Mon, 9 Oct 2023 07:24:30 +0200 Subject: [PATCH 8/8] module-raop-sink: Port to rtp-module/stream --- src/modules/meson.build | 5 +- src/modules/module-raop-sink.c | 558 +++++++++++---------------------- 2 files changed, 194 insertions(+), 369 deletions(-) diff --git a/src/modules/meson.build b/src/modules/meson.build index 3f0d6edd7..b78e69282 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -607,12 +607,13 @@ build_module_raop = openssl_lib.found() if build_module_raop pipewire_module_raop_sink = shared_library('pipewire-module-raop-sink', [ 'module-raop-sink.c', - 'module-raop/rtsp-client.c' ], + 'module-raop/rtsp-client.c', + 'module-rtp/stream.c' ], include_directories : [configinc], install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, openssl_lib], + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep, openssl_lib], ) endif summary({'raop-sink (requires OpenSSL)': build_module_raop}, bool_yn: true, section: 'Optional Modules') diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index cfd346f2b..c91f80a9f 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -45,6 +45,7 @@ #include "module-raop/rtsp-client.h" #include "module-rtp/rtp.h" +#include "module-rtp/stream.h" /** \page page_module_raop_sink PipeWire Module: AirPlay Sink * @@ -121,36 +122,37 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define FRAMES_PER_TCP_PACKET 4096 -#define FRAMES_PER_UDP_PACKET 352 +#define BUFFER_SIZE (1u<<22) +#define BUFFER_MASK (BUFFER_SIZE-1) +#define BUFFER_SIZE2 (BUFFER_SIZE>>1) +#define BUFFER_MASK2 (BUFFER_SIZE2-1) -#define RAOP_LATENCY_MIN 11025u -#define DEFAULT_LATENCY_MS "1500" +#define FRAMES_PER_TCP_PACKET 4096 +#define FRAMES_PER_UDP_PACKET 352 -#define DEFAULT_TCP_AUDIO_PORT 6000 -#define DEFAULT_UDP_AUDIO_PORT 6000 -#define DEFAULT_UDP_CONTROL_PORT 6001 -#define DEFAULT_UDP_TIMING_PORT 6002 +#define RAOP_AUDIO_PORT 6000 +#define RAOP_UDP_CONTROL_PORT 6001 +#define RAOP_UDP_TIMING_PORT 6002 #define AES_CHUNK_SIZE 16 #ifndef MD5_DIGEST_LENGTH #define MD5_DIGEST_LENGTH 16 #endif -#define MD5_HASH_LENGTH (2*MD5_DIGEST_LENGTH) +#define MD5_HASH_LENGTH (2*MD5_DIGEST_LENGTH) #define DEFAULT_USER_NAME "PipeWire" #define RAOP_AUTH_USER_NAME "iTunes" -#define MAX_PORT_RETRY 128 +#define MAX_PORT_RETRY 128 -#define DEFAULT_FORMAT "S16" -#define DEFAULT_RATE 44100 -#define DEFAULT_CHANNELS 2 -#define DEFAULT_POSITION "[ FL FR ]" +#define RAOP_FORMAT "S16LE" +#define RAOP_STRIDE (2*DEFAULT_CHANNELS) +#define RAOP_RATE 44100 +#define RAOP_LATENCY_MS 250 -#define VOLUME_MAX 0.0 -#define VOLUME_MIN -30.0 -#define VOLUME_MUTE -144.0 +#define VOLUME_MAX 0.0 +#define VOLUME_MIN -30.0 +#define VOLUME_MUTE -144.0 #define MODULE_USAGE "( raop.ip= ) " \ "( raop.port= ) " \ @@ -163,8 +165,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); "( node.latency= ) " \ "( node.name= ) " \ "( node.description= ) " \ - "( audio.format= ) " \ - "( audio.rate= ) " \ + "( audio.format= ) " \ + "( audio.rate= ) " \ "( audio.channels= ) " \ "( audio.position= ) " \ "( stream.props= ) " @@ -212,9 +214,7 @@ struct impl { struct spa_hook core_listener; struct pw_properties *stream_props; - struct pw_stream *stream; - struct spa_hook stream_listener; - struct spa_audio_info_raw info; + struct rtp_stream *stream; struct pw_rtsp_client *rtsp; struct spa_hook rtsp_listener; @@ -245,16 +245,15 @@ struct impl { int server_fd; struct spa_source *server_source; + uint32_t psamples; + uint64_t rate; uint32_t mtu; uint32_t stride; uint32_t latency; - uint16_t seq, cseq; - uint32_t rtptime; uint32_t ssrc; uint32_t sync; uint32_t sync_period; - unsigned int first:1; unsigned int connected:1; unsigned int ready:1; unsigned int recording:1; @@ -262,17 +261,14 @@ struct impl { bool mute; float volume; - uint8_t buffer[FRAMES_PER_TCP_PACKET * 4]; + struct spa_ringbuffer ring; + uint8_t buffer[BUFFER_SIZE]; + + struct spa_io_position *io_position; + uint32_t filled; }; -static void stream_destroy(void *d) -{ - struct impl *impl = d; - spa_hook_remove(&impl->stream_listener); - impl->stream = NULL; -} - static inline void bit_writer(uint8_t **p, int *pos, uint8_t data, int len) { int rb = 8 - *pos - len; @@ -307,7 +303,7 @@ static inline uint64_t ntp_now(void) return timespec_to_ntp(&now); } -static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime) +static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime, unsigned int first) { uint32_t out[3]; uint32_t latency = impl->latency; @@ -319,11 +315,11 @@ static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime) spa_zero(header); header.v = 2; - if (impl->first) + if (first) header.x = 1; header.m = 1; header.pt = 84; - header.sequence_number = htons(impl->cseq); + header.sequence_number = 7; header.timestamp = htonl(rtptime - latency); iov[0].iov_base = &header; @@ -351,10 +347,8 @@ static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime) pw_log_warn("error sending control packet: %d", res); } - impl->cseq = (impl->cseq + 1) & 0xffff; - - pw_log_debug("raop control sync: cseq:%d first:%d latency:%u now:%"PRIx64" rtptime:%u", - impl->cseq, impl->first, latency, transmitted, rtptime); + pw_log_debug("raop control sync: first:%d latency:%u now:%"PRIx64" rtptime:%u", + first, latency, transmitted, rtptime); return res; } @@ -452,7 +446,7 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) struct impl *impl = data; const size_t max = 8 + impl->mtu; uint32_t tcp_pkt[1], out[max], len, n_frames, rtptime; - struct iovec tcp_iov[3], udp_iov[2]; + struct iovec out_vec[3]; struct rtp_header *header; struct msghdr msg; uint8_t *dst; @@ -466,8 +460,8 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) rtptime = htonl(header->timestamp); - if (header->m || impl->first || ++impl->sync == impl->sync_period) { - send_udp_sync_packet(impl, rtptime); + if (header->m || ++impl->sync == impl->sync_period) { + send_udp_sync_packet(impl, rtptime, header->m); impl->sync = 0; } @@ -475,6 +469,8 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) msg.msg_name = NULL; msg.msg_namelen = 0; + msg.msg_iov = out_vec; + msg.msg_iovlen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; @@ -494,99 +490,28 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) if (impl->encryption == CRYPTO_RSA) aes_encrypt(impl, dst, len); - switch (impl->protocol) { - case PROTO_UDP: - udp_iov[0].iov_base = header; - udp_iov[0].iov_len = 12; - udp_iov[1].iov_base = out; - udp_iov[1].iov_len = len; - - msg.msg_iov = udp_iov; - msg.msg_iovlen = 2; - - break; - case PROTO_TCP: + if (impl->protocol == PROTO_TCP) { out[0] |= htonl((uint32_t) len + 12); tcp_pkt[0] = htonl(0x24000000); - - tcp_iov[0].iov_base = &tcp_pkt; - tcp_iov[0].iov_len = 4; - tcp_iov[1].iov_base = header; - tcp_iov[1].iov_len = 12; - tcp_iov[2].iov_base = out; - tcp_iov[2].iov_len = len; - - msg.msg_iov = tcp_iov; - msg.msg_iovlen = 3; - - break; + out_vec[msg.msg_iovlen++] = (struct iovec) { tcp_pkt, 4 }; } - pw_log_debug("raop sending header:%ld data:%ld", udp_iov[0].iov_len, udp_iov[1].iov_len); + out_vec[msg.msg_iovlen++] = (struct iovec) { header, 12 }; + out_vec[msg.msg_iovlen++] = (struct iovec) { out, len }; + + pw_log_debug("raop sending %ld", out_vec[0].iov_len + out_vec[1].iov_len + out_vec[2].iov_len); send_packet(impl->server_fd, &msg); } -static void playback_stream_process(void *d) +static inline void +set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, + uint32_t offset, struct iovec *iov, uint32_t len) { - struct impl *impl = d; - struct pw_buffer *buf; - struct spa_data *bd; - uint8_t *data; - uint32_t offs, size; - struct iovec iov[2]; - struct rtp_header header; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("out of buffers: %m"); - return; - } - - bd = &buf->buffer->datas[0]; - - offs = SPA_MIN(bd->chunk->offset, bd->maxsize); - size = SPA_MIN(bd->chunk->size, bd->maxsize - offs); - data = SPA_PTROFF(bd->data, offs, uint8_t); - - spa_zero(header); - header.v = 2; - header.pt = 96; - if (impl->first) - header.m = 1; - header.ssrc = htonl(impl->ssrc); - - while (size > 0 && impl->mtu > 0) { - uint32_t avail, to_fill; - - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->rtptime); - - avail = impl->mtu - impl->filled; - to_fill = SPA_MIN(avail, size); - - memcpy(&impl->buffer[impl->filled], data, to_fill); - - impl->filled += to_fill; - avail -= to_fill; - size -= to_fill; - data += to_fill; - - if (avail == 0) { - iov[0].iov_base = &header; - iov[0].iov_len = 12; - iov[1].iov_base = impl->buffer; - iov[1].iov_len = impl->filled; - - stream_send_packet(impl, iov, 2); - - impl->rtptime += (impl->filled / impl->stride); - impl->seq = (impl->seq + 1) & 0xffff; - impl->filled = 0; - impl->first = false; - } - } - - pw_stream_queue_buffer(impl->stream, buf); + iov[0].iov_len = SPA_MIN(len, size - offset); + iov[0].iov_base = SPA_PTROFF(buffer, offset, void); + iov[1].iov_len = len - iov[0].iov_len; + iov[1].iov_base = buffer; } static int create_udp_socket(struct impl *impl, uint16_t *port) @@ -931,6 +856,11 @@ static void rtsp_do_post_feedback(void *data, uint64_t expirations) NULL, NULL, 0, rtsp_log_reply_status, impl); } +static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) +{ + return msec * impl->rate / 1000; +} + static int rtsp_record_reply(void *data, int status, const struct spa_dict *headers, const struct pw_array *content) { struct impl *impl = data; @@ -962,17 +892,18 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head spa_zero(latency); latency.direction = PW_DIRECTION_INPUT; - latency.min_rate = latency.max_rate = impl->latency + RAOP_LATENCY_MIN; + latency.min_rate = latency.max_rate = impl->latency + msec_to_samples(impl, RAOP_LATENCY_MS); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency); - pw_stream_update_params(impl->stream, params, n_params); + rtp_stream_update_params(impl->stream, params, n_params); + + rtp_stream_set_first(impl->stream); - impl->first = true; impl->sync = 0; - impl->sync_period = impl->info.rate / (impl->mtu / impl->stride); + impl->sync_period = impl->rate / (impl->mtu / impl->stride); impl->recording = true; rtsp_send_volume(impl); @@ -984,13 +915,18 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head static int rtsp_do_record(struct impl *impl) { int res; + uint16_t seq; + uint32_t rtptime; if (!impl->ready || impl->recording) return 0; + seq = rtp_stream_get_seq(impl->stream); + rtptime = rtp_stream_get_time(impl->stream, &impl->rate); + pw_properties_set(impl->headers, "Range", "npt=0-"); pw_properties_setf(impl->headers, "RTP-Info", - "seq=%u;rtptime=%u", impl->seq, impl->rtptime); + "seq=%u;rtptime=%u", seq, rtptime); res = rtsp_send(impl, "RECORD", NULL, NULL, rtsp_record_reply); @@ -1023,7 +959,7 @@ on_server_source_io(void *data, int fd, uint32_t mask) goto error; impl->ready = true; - if (pw_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) + if (rtp_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) rtsp_do_record(impl); } return; @@ -1038,7 +974,6 @@ static int rtsp_setup_reply(void *data, int status, const struct spa_dict *heade size_t len; uint64_t ntp; uint16_t control_port, timing_port; - int res; pw_log_info("setup status: %d", status); @@ -1070,12 +1005,6 @@ static int rtsp_setup_reply(void *data, int status, const struct spa_dict *heade return 0; } - if ((res = pw_getrandom(&impl->seq, sizeof(impl->seq), 0)) < 0 || - (res = pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0)) < 0) { - pw_log_error("error generating random seq and rtptime: %s", spa_strerror(res)); - return 0; - } - pw_log_info("server port:%u", impl->server_port); switch (impl->protocol) { @@ -1114,7 +1043,7 @@ static int rtsp_setup_reply(void *data, int status, const struct spa_dict *heade SPA_IO_IN, false, on_control_source_io, impl); impl->ready = true; - if (pw_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) + if (rtp_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) rtsp_do_record(impl); break; default: @@ -1134,8 +1063,8 @@ static int rtsp_do_setup(struct impl *impl) break; case PROTO_UDP: - impl->control_port = DEFAULT_UDP_CONTROL_PORT; - impl->timing_port = DEFAULT_UDP_TIMING_PORT; + impl->control_port = RAOP_UDP_CONTROL_PORT; + impl->timing_port = RAOP_UDP_TIMING_PORT; impl->control_fd = create_udp_socket(impl, &impl->control_port); impl->timing_fd = create_udp_socket(impl, &impl->timing_port); @@ -1274,19 +1203,14 @@ static int rtsp_do_announce(struct impl *impl) { const char *host; uint8_t rsakey[512]; + uint32_t rtp_latency; char key[512*2]; char iv[16*2]; - int res, frames, rsa_len, ip_version; + int res, rsa_len, ip_version; spa_autofree char *sdp = NULL; char local_ip[256]; host = pw_properties_get(impl->props, "raop.ip"); - - if (impl->protocol == PROTO_TCP) - frames = FRAMES_PER_TCP_PACKET; - else - frames = FRAMES_PER_UDP_PACKET; - - impl->mtu = frames * impl->stride; + rtp_latency = msec_to_samples(impl, RAOP_LATENCY_MS); pw_rtsp_client_get_local_ip(impl->rtsp, &ip_version, local_ip, sizeof(local_ip)); @@ -1302,7 +1226,7 @@ static int rtsp_do_announce(struct impl *impl) "a=rtpmap:96 AppleLossless\r\n" "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 %u\r\n", impl->session_id, ip_version, local_ip, - ip_version, host, frames, impl->info.rate); + ip_version, host, impl->psamples, (uint32_t)impl->rate); if (!sdp) return -errno; break; @@ -1317,8 +1241,8 @@ static int rtsp_do_announce(struct impl *impl) "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 %u\r\n" "a=min-latency:%d", impl->session_id, ip_version, local_ip, - ip_version, host, frames, impl->info.rate, - RAOP_LATENCY_MIN); + ip_version, host, impl->psamples, (uint32_t)impl->rate, + rtp_latency); if (!sdp) return -errno; break; @@ -1354,7 +1278,7 @@ static int rtsp_do_announce(struct impl *impl) "a=rsaaeskey:%s\r\n" "a=aesiv:%s\r\n", impl->session_id, ip_version, local_ip, - ip_version, host, frames, impl->info.rate, + ip_version, host, impl->psamples, (uint32_t)impl->rate, key, iv); if (!sdp) return -errno; @@ -1576,21 +1500,23 @@ static const struct pw_rtsp_client_events rtsp_events = { .message = rtsp_message, }; -static void stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) +static void stream_destroy(void *d) { struct impl *impl = d; - switch (state) { - case PW_STREAM_STATE_ERROR: - case PW_STREAM_STATE_UNCONNECTED: + impl->stream = NULL; +} + +static void stream_state_changed(void *data, bool started, const char *error) +{ + struct impl *impl = data; + + if (error) { + pw_log_error("stream error: %s", error); pw_impl_module_schedule_destroy(impl->module); - break; - case PW_STREAM_STATE_STREAMING: - rtsp_do_record(impl); - break; - default: - break; + return; } + if (started) + rtsp_do_record(impl); } static int rtsp_do_connect(struct impl *impl) @@ -1705,7 +1631,7 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp } param = spa_pod_builder_pop(&b, &f[0]); - pw_stream_set_param(impl->stream, id, param); + rtp_stream_set_param(impl->stream, id, param); } static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) @@ -1728,57 +1654,14 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * } } -static const struct pw_stream_events playback_stream_events = { - PW_VERSION_STREAM_EVENTS, +static const struct rtp_stream_events stream_events = { + RTP_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = stream_state_changed, .param_changed = stream_param_changed, - .process = playback_stream_process + .send_packet = stream_send_packet }; -static int create_stream(struct impl *impl) -{ - int res; - uint32_t n_params; - const struct spa_pod *params[1]; - uint8_t buffer[1024]; - struct spa_pod_builder b; - - impl->stream = pw_stream_new(impl->core, "RAOP sink", impl->stream_props); - impl->stream_props = NULL; - - if (impl->stream == NULL) - return -errno; - - pw_stream_add_listener(impl->stream, - &impl->stream_listener, - &playback_stream_events, impl); - - n_params = 0; - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - params[n_params++] = spa_format_audio_raw_build(&b, - SPA_PARAM_EnumFormat, &impl->info); - - if ((res = pw_stream_connect(impl->stream, - PW_DIRECTION_INPUT, - PW_ID_ANY, - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, - params, n_params)) < 0) - return res; - - impl->headers = pw_properties_new(NULL, NULL); - - impl->rtsp = pw_rtsp_client_new(impl->loop, NULL, 0); - if (impl->rtsp == NULL) - return -errno; - - pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener, - &rtsp_events, impl); - - return 0; -} - static void core_error(void *data, uint32_t id, int seq, int res, const char *message) { struct impl *impl = data; @@ -1810,7 +1693,7 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { if (impl->stream) - pw_stream_destroy(impl->stream); + rtp_stream_destroy(impl->stream); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -1839,97 +1722,6 @@ static const struct pw_impl_module_events module_events = { .destroy = module_destroy, }; -static inline uint32_t format_from_name(const char *name, size_t len) -{ - int i; - for (i = 0; spa_type_audio_format[i].name; i++) { - if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) - return spa_type_audio_format[i].type; - } - return SPA_AUDIO_FORMAT_UNKNOWN; -} - -static uint32_t channel_from_name(const char *name) -{ - int i; - for (i = 0; spa_type_audio_channel[i].name; i++) { - if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) - return spa_type_audio_channel[i].type; - } - return SPA_AUDIO_CHANNEL_UNKNOWN; -} - -static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) -{ - struct spa_json it[2]; - char v[256]; - - spa_json_init(&it[0], val, len); - if (spa_json_enter_array(&it[0], &it[1]) <= 0) - spa_json_init(&it[1], val, len); - - info->channels = 0; - while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && - info->channels < SPA_AUDIO_MAX_CHANNELS) { - info->position[info->channels++] = channel_from_name(v); - } -} - -static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) -{ - const char *str; - - spa_zero(*info); - if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) - str = DEFAULT_FORMAT; - info->format = format_from_name(str, strlen(str)); - - info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); - if (info->rate == 0) - info->rate = DEFAULT_RATE; - - info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); - info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); - if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) - parse_position(info, str, strlen(str)); - if (info->channels == 0) - parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); -} - -static int calc_stride(struct spa_audio_info_raw *info) -{ - int res = info->channels; - switch (info->format) { - case SPA_AUDIO_FORMAT_U8: - case SPA_AUDIO_FORMAT_S8: - case SPA_AUDIO_FORMAT_ALAW: - case SPA_AUDIO_FORMAT_ULAW: - return res; - case SPA_AUDIO_FORMAT_S16: - case SPA_AUDIO_FORMAT_S16_OE: - case SPA_AUDIO_FORMAT_U16: - return res * 2; - case SPA_AUDIO_FORMAT_S24: - case SPA_AUDIO_FORMAT_S24_OE: - case SPA_AUDIO_FORMAT_U24: - return res * 3; - case SPA_AUDIO_FORMAT_S24_32: - case SPA_AUDIO_FORMAT_S24_32_OE: - case SPA_AUDIO_FORMAT_S32: - case SPA_AUDIO_FORMAT_S32_OE: - case SPA_AUDIO_FORMAT_U32: - case SPA_AUDIO_FORMAT_U32_OE: - case SPA_AUDIO_FORMAT_F32: - case SPA_AUDIO_FORMAT_F32_OE: - return res * 4; - case SPA_AUDIO_FORMAT_F64: - case SPA_AUDIO_FORMAT_F64_OE: - return res * 8; - default: - return 0; - } -} - static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -1946,7 +1738,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_properties *props = NULL; struct impl *impl; const char *str, *name, *hostname, *ip, *port; - int res; + int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -1994,67 +1786,15 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto error; } - if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); - - if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL) - pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); - - if (pw_properties_get(props, PW_KEY_DEVICE_ICON_NAME) == NULL) - pw_properties_set(props, PW_KEY_DEVICE_ICON_NAME, "audio-speakers"); - - if ((name = pw_properties_get(props, "raop.name")) == NULL) - name = "RAOP"; - - if ((str = strstr(name, "@"))) { - str++; - if (strlen(str) > 0) - name = str; - } - if ((hostname = pw_properties_get(props, "raop.hostname")) == NULL) - hostname = name; - - if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) - pw_properties_setf(props, PW_KEY_NODE_NAME, "raop_sink.%s.%s.%s", - hostname, ip, port); - if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) - pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, - "%s", name); - if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) - pw_properties_set(props, PW_KEY_NODE_LATENCY, "352/44100"); - - if ((str = pw_properties_get(props, "stream.props")) != NULL) - pw_properties_update_string(impl->stream_props, str, strlen(str)); - - copy_props(impl, props, PW_KEY_AUDIO_FORMAT); - copy_props(impl, props, PW_KEY_AUDIO_RATE); - copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); - copy_props(impl, props, SPA_KEY_AUDIO_POSITION); - copy_props(impl, props, PW_KEY_DEVICE_ICON_NAME); - copy_props(impl, props, PW_KEY_NODE_NAME); - copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); - copy_props(impl, props, PW_KEY_NODE_GROUP); - copy_props(impl, props, PW_KEY_NODE_LATENCY); - copy_props(impl, props, PW_KEY_NODE_VIRTUAL); - copy_props(impl, props, PW_KEY_MEDIA_CLASS); - - parse_audio_info(impl->stream_props, &impl->info); - - impl->stride = calc_stride(&impl->info); - if (impl->stride == 0) { - pw_log_error("unsupported audio format:%d channels:%d", - impl->info.format, impl->info.channels); - res = -EINVAL; - goto error; - } - if ((str = pw_properties_get(props, "raop.transport")) == NULL) str = "udp"; - if (spa_streq(str, "udp")) + if (spa_streq(str, "udp")) { impl->protocol = PROTO_UDP; - else if (spa_streq(str, "tcp")) + impl->psamples = FRAMES_PER_UDP_PACKET; + } else if (spa_streq(str, "tcp")) { impl->protocol = PROTO_TCP; - else { + impl->psamples = FRAMES_PER_TCP_PACKET; + } else { pw_log_error( "can't handle transport %s", str); res = -EINVAL; goto error; @@ -2088,9 +1828,78 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) str = pw_properties_get(props, "raop.password"); impl->password = str ? strdup(str) : NULL; - if ((str = pw_properties_get(props, "raop.latency.ms")) == NULL) - str = DEFAULT_LATENCY_MS; - impl->latency = SPA_MAX(atoi(str) * impl->info.rate / 1000u, RAOP_LATENCY_MIN); + if ((name = pw_properties_get(props, "raop.name")) == NULL) + name = "RAOP"; + + if ((str = strchr(name, '@')) != NULL) { + str++; + if (strlen(str) > 0) + name = str; + } + if ((hostname = pw_properties_get(props, "raop.hostname")) == NULL) + hostname = name; + + impl->rate = RAOP_RATE; + impl->latency = msec_to_samples(impl, RAOP_LATENCY_MS); + impl->stride = RAOP_STRIDE; + impl->mtu = impl->stride * impl->psamples; + impl->sync_period = impl->rate / impl->psamples; + + if (pw_properties_get(props, PW_KEY_AUDIO_FORMAT) == NULL) + pw_properties_setf(props, PW_KEY_AUDIO_FORMAT, "%s", RAOP_FORMAT); + if (pw_properties_get(props, PW_KEY_AUDIO_RATE) == NULL) + pw_properties_setf(props, PW_KEY_AUDIO_RATE, "%ld", impl->rate); + if (pw_properties_get(props, PW_KEY_DEVICE_ICON_NAME) == NULL) + pw_properties_set(props, PW_KEY_DEVICE_ICON_NAME, "audio-speakers"); + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "raop_sink.%s.%s.%s", + hostname, ip, port); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s", name); + if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) + pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%ld", + impl->psamples, impl->rate); + if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) + pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); + if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + if (pw_properties_get(props, PW_KEY_MEDIA_FORMAT) == NULL) + pw_properties_setf(props, PW_KEY_MEDIA_FORMAT, "%d", SPA_AUDIO_FORMAT_S16_LE); + if (pw_properties_get(props, "net.mtu") == NULL) + pw_properties_setf(props, "net.mtu", "%d", impl->mtu); + if (pw_properties_get(props, "rtp.sender-ts-offset") == NULL) + pw_properties_setf(props, "rtp.sender-ts-offset", "%d", 0); + if (pw_properties_get(props, "sess.ts-direct") == NULL) + pw_properties_set(props, "sess.ts-direct", 0); + if (pw_properties_get(props, "sess.media") == NULL) + pw_properties_set(props, "sess.media", "raop"); + if (pw_properties_get(props, "sess.latency.msec") == NULL) + pw_properties_setf(props, "sess.latency.msec", "%d", RAOP_LATENCY_MS); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(impl->stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_AUDIO_FORMAT); + copy_props(impl, props, PW_KEY_AUDIO_RATE); + copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_DEVICE_ICON_NAME); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + copy_props(impl, props, PW_KEY_MEDIA_FORMAT); + copy_props(impl, props, "net.mtu"); + copy_props(impl, props, "rtp.sender-ts-offset"); + copy_props(impl, props, "sess.media"); + copy_props(impl, props, "sess.name"); + copy_props(impl, props, "sess.min-ptime"); + copy_props(impl, props, "sess.max-ptime"); + copy_props(impl, props, "sess.latency.msec"); + copy_props(impl, props, "sess.ts-refclk"); + copy_props(impl, props, "sess.ts-direct"); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { @@ -2115,8 +1924,23 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - if ((res = create_stream(impl)) < 0) + impl->stream = rtp_stream_new(impl->core, + PW_DIRECTION_INPUT, pw_properties_copy(impl->stream_props), + &stream_events, impl); + if (impl->stream == NULL) { + res = -errno; + pw_log_error("can't create raop stream: %m"); goto error; + } + + impl->headers = pw_properties_new(NULL, NULL); + + impl->rtsp = pw_rtsp_client_new(impl->loop, NULL, 0); + if (impl->rtsp == NULL) + goto error; + + pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener, + &rtsp_events, impl); pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);