diff --git a/src/modules/meson.build b/src/modules/meson.build index 3f0d6edd7..b78e69282 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -607,12 +607,13 @@ build_module_raop = openssl_lib.found() if build_module_raop pipewire_module_raop_sink = shared_library('pipewire-module-raop-sink', [ 'module-raop-sink.c', - 'module-raop/rtsp-client.c' ], + 'module-raop/rtsp-client.c', + 'module-rtp/stream.c' ], include_directories : [configinc], install : true, install_dir : modules_install_dir, install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, openssl_lib], + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, opus_dep, openssl_lib], ) endif summary({'raop-sink (requires OpenSSL)': build_module_raop}, bool_yn: true, section: 'Optional Modules') diff --git a/src/modules/module-raop-sink.c b/src/modules/module-raop-sink.c index 1da8280ae..c91f80a9f 100644 --- a/src/modules/module-raop-sink.c +++ b/src/modules/module-raop-sink.c @@ -45,6 +45,7 @@ #include "module-raop/rtsp-client.h" #include "module-rtp/rtp.h" +#include "module-rtp/stream.h" /** \page page_module_raop_sink PipeWire Module: AirPlay Sink * @@ -121,36 +122,37 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic -#define FRAMES_PER_TCP_PACKET 4096 -#define FRAMES_PER_UDP_PACKET 352 +#define BUFFER_SIZE (1u<<22) +#define BUFFER_MASK (BUFFER_SIZE-1) +#define BUFFER_SIZE2 (BUFFER_SIZE>>1) +#define BUFFER_MASK2 (BUFFER_SIZE2-1) -#define RAOP_LATENCY_MIN 11025u -#define DEFAULT_LATENCY_MS "1500" +#define FRAMES_PER_TCP_PACKET 4096 +#define FRAMES_PER_UDP_PACKET 352 -#define DEFAULT_TCP_AUDIO_PORT 6000 -#define DEFAULT_UDP_AUDIO_PORT 6000 -#define DEFAULT_UDP_CONTROL_PORT 6001 -#define DEFAULT_UDP_TIMING_PORT 6002 +#define RAOP_AUDIO_PORT 6000 +#define RAOP_UDP_CONTROL_PORT 6001 +#define RAOP_UDP_TIMING_PORT 6002 #define AES_CHUNK_SIZE 16 #ifndef MD5_DIGEST_LENGTH #define MD5_DIGEST_LENGTH 16 #endif -#define MD5_HASH_LENGTH (2*MD5_DIGEST_LENGTH) +#define MD5_HASH_LENGTH (2*MD5_DIGEST_LENGTH) #define DEFAULT_USER_NAME "PipeWire" #define RAOP_AUTH_USER_NAME "iTunes" -#define MAX_PORT_RETRY 128 +#define MAX_PORT_RETRY 128 -#define DEFAULT_FORMAT "S16" -#define DEFAULT_RATE 44100 -#define DEFAULT_CHANNELS 2 -#define DEFAULT_POSITION "[ FL FR ]" +#define RAOP_FORMAT "S16LE" +#define RAOP_STRIDE (2*DEFAULT_CHANNELS) +#define RAOP_RATE 44100 +#define RAOP_LATENCY_MS 250 -#define VOLUME_MAX 0.0 -#define VOLUME_MIN -30.0 -#define VOLUME_MUTE -144.0 +#define VOLUME_MAX 0.0 +#define VOLUME_MIN -30.0 +#define VOLUME_MUTE -144.0 #define MODULE_USAGE "( raop.ip= ) " \ "( raop.port= ) " \ @@ -163,8 +165,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); "( node.latency= ) " \ "( node.name= ) " \ "( node.description= ) " \ - "( audio.format= ) " \ - "( audio.rate= ) " \ + "( audio.format= ) " \ + "( audio.rate= ) " \ "( audio.channels= ) " \ "( audio.position= ) " \ "( stream.props= ) " @@ -212,10 +214,7 @@ struct impl { struct spa_hook core_listener; struct pw_properties *stream_props; - struct pw_stream *stream; - struct spa_hook stream_listener; - struct spa_audio_info_raw info; - uint32_t frame_size; + struct rtp_stream *stream; struct pw_rtsp_client *rtsp; struct spa_hook rtsp_listener; @@ -246,15 +245,15 @@ struct impl { int server_fd; struct spa_source *server_source; - uint32_t block_size; + uint32_t psamples; + uint64_t rate; + uint32_t mtu; + uint32_t stride; uint32_t latency; - uint16_t seq, cseq; - uint32_t rtptime; uint32_t ssrc; uint32_t sync; uint32_t sync_period; - unsigned int first:1; unsigned int connected:1; unsigned int ready:1; unsigned int recording:1; @@ -262,17 +261,14 @@ struct impl { bool mute; float volume; - uint8_t buffer[FRAMES_PER_TCP_PACKET * 4]; + struct spa_ringbuffer ring; + uint8_t buffer[BUFFER_SIZE]; + + struct spa_io_position *io_position; + uint32_t filled; }; -static void stream_destroy(void *d) -{ - struct impl *impl = d; - spa_hook_remove(&impl->stream_listener); - impl->stream = NULL; -} - static inline void bit_writer(uint8_t **p, int *pos, uint8_t data, int len) { int rb = 8 - *pos - len; @@ -307,11 +303,9 @@ static inline uint64_t ntp_now(void) return timespec_to_ntp(&now); } -static int send_udp_sync_packet(struct impl *impl, - struct sockaddr *dest_addr, socklen_t addrlen) +static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime, unsigned int first) { uint32_t out[3]; - uint32_t rtptime = impl->rtptime; uint32_t latency = impl->latency; uint64_t transmitted; struct rtp_header header; @@ -321,11 +315,11 @@ static int send_udp_sync_packet(struct impl *impl, spa_zero(header); header.v = 2; - if (impl->first) + if (first) header.x = 1; header.m = 1; header.pt = 84; - header.sequence_number = htons(impl->cseq); + header.sequence_number = 7; header.timestamp = htonl(rtptime - latency); iov[0].iov_base = &header; @@ -339,8 +333,8 @@ static int send_udp_sync_packet(struct impl *impl, iov[1].iov_base = out; iov[1].iov_len = sizeof(out); - msg.msg_name = dest_addr; - msg.msg_namelen = addrlen; + msg.msg_name = NULL; + msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 2; msg.msg_control = NULL; @@ -353,10 +347,8 @@ static int send_udp_sync_packet(struct impl *impl, pw_log_warn("error sending control packet: %d", res); } - impl->cseq = (impl->cseq + 1) & 0xffff; - - pw_log_debug("raop control sync: cseq:%d first:%d latency:%u now:%"PRIx64" rtptime:%u", - impl->cseq, impl->first, latency, transmitted, rtptime); + pw_log_debug("raop control sync: first:%d latency:%u now:%"PRIx64" rtptime:%u", + first, latency, transmitted, rtptime); return res; } @@ -440,194 +432,86 @@ static int write_codec_pcm(void *dst, void *frames, uint32_t n_frames) return bp - b + 1; } -static int flush_to_udp_packet(struct impl *impl) +static ssize_t send_packet(int fd, struct msghdr *msg) { - const size_t max = 8 + impl->block_size; - uint32_t out[max], len, n_frames; - struct rtp_header header; - struct iovec iov[2]; - struct msghdr msg; - uint8_t *dst; - int res; - - if (!impl->recording) - return 0; - - if (impl->first || ++impl->sync == impl->sync_period) { - impl->sync = 0; - send_udp_sync_packet(impl, NULL, 0); - } - - spa_zero(header); - header.v = 2; - header.pt = 96; - if (impl->first) - header.m = 1; - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->rtptime); - header.ssrc = htonl(impl->ssrc); - - iov[0].iov_base = &header; - iov[0].iov_len = 12; - - n_frames = impl->filled / impl->frame_size; - dst = (uint8_t*)&out[0]; - - switch (impl->codec) { - case CODEC_PCM: - case CODEC_ALAC: - len = write_codec_pcm(dst, impl->buffer, n_frames); - break; - default: - len = 8 + impl->block_size; - memset(dst, 0, len); - break; - } - if (impl->encryption == CRYPTO_RSA) - aes_encrypt(impl, dst, len); - - iov[1].iov_base = out; - iov[1].iov_len = len; - - impl->rtptime += n_frames; - impl->seq = (impl->seq + 1) & 0xffff; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 2; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - res = sendmsg(impl->server_fd, &msg, MSG_NOSIGNAL); - if (res < 0) { - res = -errno; - pw_log_warn("error streaming packet: %d", res); - } - - impl->first = false; - - return res; + ssize_t n; + n = sendmsg(fd, msg, MSG_NOSIGNAL); + if (n < 0) + pw_log_debug("sendmsg() failed: %m"); + return n; } -static int flush_to_tcp_packet(struct impl *impl) +static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) { - const size_t max = 8 + impl->block_size; - uint32_t tcp_pkt[1], out[max], len, n_frames; - struct rtp_header header; - struct iovec iov[3]; + struct impl *impl = data; + const size_t max = 8 + impl->mtu; + uint32_t tcp_pkt[1], out[max], len, n_frames, rtptime; + struct iovec out_vec[3]; + struct rtp_header *header; struct msghdr msg; uint8_t *dst; - int res; if (!impl->recording) - return 0; - - tcp_pkt[0] = htonl(0x24000000); - - iov[0].iov_base = &tcp_pkt; - iov[0].iov_len = 4; - - spa_zero(header); - header.v = 2; - header.pt = 96; - header.sequence_number = htons(impl->seq); - header.timestamp = htonl(impl->rtptime); - header.ssrc = htonl(impl->ssrc); - - iov[1].iov_base = &header; - iov[1].iov_len = 12; - - n_frames = impl->filled / impl->frame_size; - dst = (uint8_t*)&out[0]; - - switch (impl->codec) { - case CODEC_PCM: - case CODEC_ALAC: - len = write_codec_pcm(dst, impl->buffer, n_frames); - break; - default: - len = 8 + impl->block_size; - memset(dst, 0, len); - break; - } - if (impl->encryption == CRYPTO_RSA) - aes_encrypt(impl, dst, len); - - out[0] |= htonl((uint32_t) len + 12); - - iov[2].iov_base = out; - iov[2].iov_len = len; - - impl->rtptime += n_frames; - impl->seq = (impl->seq + 1) & 0xffff; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 2; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - res = sendmsg(impl->server_fd, &msg, MSG_NOSIGNAL); - if (res < 0) { - res = -errno; - pw_log_warn("error streaming packet: %d", res); - } - - impl->first = false; - - return res; -} - -static void playback_stream_process(void *d) -{ - struct impl *impl = d; - struct pw_buffer *buf; - struct spa_data *bd; - uint8_t *data; - uint32_t offs, size; - - if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { - pw_log_debug("out of buffers: %m"); return; + + header = (struct rtp_header*)iov[0].iov_base; + if (header->v != 2) + pw_log_warn("invalid rtp packet version"); + + rtptime = htonl(header->timestamp); + + if (header->m || ++impl->sync == impl->sync_period) { + send_udp_sync_packet(impl, rtptime, header->m); + impl->sync = 0; } - bd = &buf->buffer->datas[0]; + n_frames = iov[1].iov_len / impl->stride; - offs = SPA_MIN(bd->chunk->offset, bd->maxsize); - size = SPA_MIN(bd->chunk->size, bd->maxsize - offs); - data = SPA_PTROFF(bd->data, offs, uint8_t); + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = out_vec; + msg.msg_iovlen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; - while (size > 0 && impl->block_size > 0) { - uint32_t avail, to_fill; + dst = (uint8_t*)&out[0]; - avail = impl->block_size - impl->filled; - to_fill = SPA_MIN(avail, size); + switch (impl->codec) { + case CODEC_PCM: + case CODEC_ALAC: + len = write_codec_pcm(dst, (void *)iov[1].iov_base, n_frames); + break; + default: + len = 8 + impl->mtu; + memset(dst, 0, len); + break; + } + if (impl->encryption == CRYPTO_RSA) + aes_encrypt(impl, dst, len); - memcpy(&impl->buffer[impl->filled], data, to_fill); - - impl->filled += to_fill; - avail -= to_fill; - size -= to_fill; - data += to_fill; - - if (avail == 0) { - switch (impl->protocol) { - case PROTO_UDP: - flush_to_udp_packet(impl); - break; - case PROTO_TCP: - flush_to_tcp_packet(impl); - break; - } - impl->filled = 0; - } + if (impl->protocol == PROTO_TCP) { + out[0] |= htonl((uint32_t) len + 12); + tcp_pkt[0] = htonl(0x24000000); + out_vec[msg.msg_iovlen++] = (struct iovec) { tcp_pkt, 4 }; } - pw_stream_queue_buffer(impl->stream, buf); + out_vec[msg.msg_iovlen++] = (struct iovec) { header, 12 }; + out_vec[msg.msg_iovlen++] = (struct iovec) { out, len }; + + pw_log_debug("raop sending %ld", out_vec[0].iov_len + out_vec[1].iov_len + out_vec[2].iov_len); + + send_packet(impl->server_fd, &msg); +} + +static inline void +set_iovec(struct spa_ringbuffer *rbuf, void *buffer, uint32_t size, + uint32_t offset, struct iovec *iov, uint32_t len) +{ + iov[0].iov_len = SPA_MIN(len, size - offset); + iov[0].iov_base = SPA_PTROFF(buffer, offset, void); + iov[1].iov_len = len - iov[0].iov_len; + iov[1].iov_base = buffer; } static int create_udp_socket(struct impl *impl, uint16_t *port) @@ -972,6 +856,11 @@ static void rtsp_do_post_feedback(void *data, uint64_t expirations) NULL, NULL, 0, rtsp_log_reply_status, impl); } +static uint32_t msec_to_samples(struct impl *impl, uint32_t msec) +{ + return msec * impl->rate / 1000; +} + static int rtsp_record_reply(void *data, int status, const struct spa_dict *headers, const struct pw_array *content) { struct impl *impl = data; @@ -1003,17 +892,18 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head spa_zero(latency); latency.direction = PW_DIRECTION_INPUT; - latency.min_rate = latency.max_rate = impl->latency + RAOP_LATENCY_MIN; + latency.min_rate = latency.max_rate = impl->latency + msec_to_samples(impl, RAOP_LATENCY_MS); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_latency_build(&b, SPA_PARAM_Latency, &latency); - pw_stream_update_params(impl->stream, params, n_params); + rtp_stream_update_params(impl->stream, params, n_params); + + rtp_stream_set_first(impl->stream); - impl->first = true; impl->sync = 0; - impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size); + impl->sync_period = impl->rate / (impl->mtu / impl->stride); impl->recording = true; rtsp_send_volume(impl); @@ -1025,13 +915,18 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head static int rtsp_do_record(struct impl *impl) { int res; + uint16_t seq; + uint32_t rtptime; if (!impl->ready || impl->recording) return 0; + seq = rtp_stream_get_seq(impl->stream); + rtptime = rtp_stream_get_time(impl->stream, &impl->rate); + pw_properties_set(impl->headers, "Range", "npt=0-"); pw_properties_setf(impl->headers, "RTP-Info", - "seq=%u;rtptime=%u", impl->seq, impl->rtptime); + "seq=%u;rtptime=%u", seq, rtptime); res = rtsp_send(impl, "RECORD", NULL, NULL, rtsp_record_reply); @@ -1064,7 +959,7 @@ on_server_source_io(void *data, int fd, uint32_t mask) goto error; impl->ready = true; - if (pw_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) + if (rtp_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) rtsp_do_record(impl); } return; @@ -1079,7 +974,6 @@ static int rtsp_setup_reply(void *data, int status, const struct spa_dict *heade size_t len; uint64_t ntp; uint16_t control_port, timing_port; - int res; pw_log_info("setup status: %d", status); @@ -1111,12 +1005,6 @@ static int rtsp_setup_reply(void *data, int status, const struct spa_dict *heade return 0; } - if ((res = pw_getrandom(&impl->seq, sizeof(impl->seq), 0)) < 0 || - (res = pw_getrandom(&impl->rtptime, sizeof(impl->rtptime), 0)) < 0) { - pw_log_error("error generating random seq and rtptime: %s", spa_strerror(res)); - return 0; - } - pw_log_info("server port:%u", impl->server_port); switch (impl->protocol) { @@ -1155,7 +1043,7 @@ static int rtsp_setup_reply(void *data, int status, const struct spa_dict *heade SPA_IO_IN, false, on_control_source_io, impl); impl->ready = true; - if (pw_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) + if (rtp_stream_get_state(impl->stream, NULL) == PW_STREAM_STATE_STREAMING) rtsp_do_record(impl); break; default: @@ -1175,8 +1063,8 @@ static int rtsp_do_setup(struct impl *impl) break; case PROTO_UDP: - impl->control_port = DEFAULT_UDP_CONTROL_PORT; - impl->timing_port = DEFAULT_UDP_TIMING_PORT; + impl->control_port = RAOP_UDP_CONTROL_PORT; + impl->timing_port = RAOP_UDP_TIMING_PORT; impl->control_fd = create_udp_socket(impl, &impl->control_port); impl->timing_fd = create_udp_socket(impl, &impl->timing_port); @@ -1315,19 +1203,14 @@ static int rtsp_do_announce(struct impl *impl) { const char *host; uint8_t rsakey[512]; + uint32_t rtp_latency; char key[512*2]; char iv[16*2]; - int res, frames, rsa_len, ip_version; + int res, rsa_len, ip_version; spa_autofree char *sdp = NULL; char local_ip[256]; host = pw_properties_get(impl->props, "raop.ip"); - - if (impl->protocol == PROTO_TCP) - frames = FRAMES_PER_TCP_PACKET; - else - frames = FRAMES_PER_UDP_PACKET; - - impl->block_size = frames * impl->frame_size; + rtp_latency = msec_to_samples(impl, RAOP_LATENCY_MS); pw_rtsp_client_get_local_ip(impl->rtsp, &ip_version, local_ip, sizeof(local_ip)); @@ -1343,7 +1226,7 @@ static int rtsp_do_announce(struct impl *impl) "a=rtpmap:96 AppleLossless\r\n" "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 %u\r\n", impl->session_id, ip_version, local_ip, - ip_version, host, frames, impl->info.rate); + ip_version, host, impl->psamples, (uint32_t)impl->rate); if (!sdp) return -errno; break; @@ -1358,8 +1241,8 @@ static int rtsp_do_announce(struct impl *impl) "a=fmtp:96 %d 0 16 40 10 14 2 255 0 0 %u\r\n" "a=min-latency:%d", impl->session_id, ip_version, local_ip, - ip_version, host, frames, impl->info.rate, - RAOP_LATENCY_MIN); + ip_version, host, impl->psamples, (uint32_t)impl->rate, + rtp_latency); if (!sdp) return -errno; break; @@ -1395,7 +1278,7 @@ static int rtsp_do_announce(struct impl *impl) "a=rsaaeskey:%s\r\n" "a=aesiv:%s\r\n", impl->session_id, ip_version, local_ip, - ip_version, host, frames, impl->info.rate, + ip_version, host, impl->psamples, (uint32_t)impl->rate, key, iv); if (!sdp) return -errno; @@ -1617,21 +1500,23 @@ static const struct pw_rtsp_client_events rtsp_events = { .message = rtsp_message, }; -static void stream_state_changed(void *d, enum pw_stream_state old, - enum pw_stream_state state, const char *error) +static void stream_destroy(void *d) { struct impl *impl = d; - switch (state) { - case PW_STREAM_STATE_ERROR: - case PW_STREAM_STATE_UNCONNECTED: + impl->stream = NULL; +} + +static void stream_state_changed(void *data, bool started, const char *error) +{ + struct impl *impl = data; + + if (error) { + pw_log_error("stream error: %s", error); pw_impl_module_schedule_destroy(impl->module); - break; - case PW_STREAM_STATE_STREAMING: - rtsp_do_record(impl); - break; - default: - break; + return; } + if (started) + rtsp_do_record(impl); } static int rtsp_do_connect(struct impl *impl) @@ -1746,7 +1631,7 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp } param = spa_pod_builder_pop(&b, &f[0]); - pw_stream_set_param(impl->stream, id, param); + rtp_stream_set_param(impl->stream, id, param); } static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) @@ -1769,57 +1654,14 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * } } -static const struct pw_stream_events playback_stream_events = { - PW_VERSION_STREAM_EVENTS, +static const struct rtp_stream_events stream_events = { + RTP_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = stream_state_changed, .param_changed = stream_param_changed, - .process = playback_stream_process + .send_packet = stream_send_packet }; -static int create_stream(struct impl *impl) -{ - int res; - uint32_t n_params; - const struct spa_pod *params[1]; - uint8_t buffer[1024]; - struct spa_pod_builder b; - - impl->stream = pw_stream_new(impl->core, "RAOP sink", impl->stream_props); - impl->stream_props = NULL; - - if (impl->stream == NULL) - return -errno; - - pw_stream_add_listener(impl->stream, - &impl->stream_listener, - &playback_stream_events, impl); - - n_params = 0; - spa_pod_builder_init(&b, buffer, sizeof(buffer)); - params[n_params++] = spa_format_audio_raw_build(&b, - SPA_PARAM_EnumFormat, &impl->info); - - if ((res = pw_stream_connect(impl->stream, - PW_DIRECTION_INPUT, - PW_ID_ANY, - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, - params, n_params)) < 0) - return res; - - impl->headers = pw_properties_new(NULL, NULL); - - impl->rtsp = pw_rtsp_client_new(impl->loop, NULL, 0); - if (impl->rtsp == NULL) - return -errno; - - pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener, - &rtsp_events, impl); - - return 0; -} - static void core_error(void *data, uint32_t id, int seq, int res, const char *message) { struct impl *impl = data; @@ -1851,7 +1693,7 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { if (impl->stream) - pw_stream_destroy(impl->stream); + rtp_stream_destroy(impl->stream); if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); @@ -1880,97 +1722,6 @@ static const struct pw_impl_module_events module_events = { .destroy = module_destroy, }; -static inline uint32_t format_from_name(const char *name, size_t len) -{ - int i; - for (i = 0; spa_type_audio_format[i].name; i++) { - if (strncmp(name, spa_debug_type_short_name(spa_type_audio_format[i].name), len) == 0) - return spa_type_audio_format[i].type; - } - return SPA_AUDIO_FORMAT_UNKNOWN; -} - -static uint32_t channel_from_name(const char *name) -{ - int i; - for (i = 0; spa_type_audio_channel[i].name; i++) { - if (spa_streq(name, spa_debug_type_short_name(spa_type_audio_channel[i].name))) - return spa_type_audio_channel[i].type; - } - return SPA_AUDIO_CHANNEL_UNKNOWN; -} - -static void parse_position(struct spa_audio_info_raw *info, const char *val, size_t len) -{ - struct spa_json it[2]; - char v[256]; - - spa_json_init(&it[0], val, len); - if (spa_json_enter_array(&it[0], &it[1]) <= 0) - spa_json_init(&it[1], val, len); - - info->channels = 0; - while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 && - info->channels < SPA_AUDIO_MAX_CHANNELS) { - info->position[info->channels++] = channel_from_name(v); - } -} - -static void parse_audio_info(const struct pw_properties *props, struct spa_audio_info_raw *info) -{ - const char *str; - - spa_zero(*info); - if ((str = pw_properties_get(props, PW_KEY_AUDIO_FORMAT)) == NULL) - str = DEFAULT_FORMAT; - info->format = format_from_name(str, strlen(str)); - - info->rate = pw_properties_get_uint32(props, PW_KEY_AUDIO_RATE, info->rate); - if (info->rate == 0) - info->rate = DEFAULT_RATE; - - info->channels = pw_properties_get_uint32(props, PW_KEY_AUDIO_CHANNELS, info->channels); - info->channels = SPA_MIN(info->channels, SPA_AUDIO_MAX_CHANNELS); - if ((str = pw_properties_get(props, SPA_KEY_AUDIO_POSITION)) != NULL) - parse_position(info, str, strlen(str)); - if (info->channels == 0) - parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); -} - -static int calc_frame_size(struct spa_audio_info_raw *info) -{ - int res = info->channels; - switch (info->format) { - case SPA_AUDIO_FORMAT_U8: - case SPA_AUDIO_FORMAT_S8: - case SPA_AUDIO_FORMAT_ALAW: - case SPA_AUDIO_FORMAT_ULAW: - return res; - case SPA_AUDIO_FORMAT_S16: - case SPA_AUDIO_FORMAT_S16_OE: - case SPA_AUDIO_FORMAT_U16: - return res * 2; - case SPA_AUDIO_FORMAT_S24: - case SPA_AUDIO_FORMAT_S24_OE: - case SPA_AUDIO_FORMAT_U24: - return res * 3; - case SPA_AUDIO_FORMAT_S24_32: - case SPA_AUDIO_FORMAT_S24_32_OE: - case SPA_AUDIO_FORMAT_S32: - case SPA_AUDIO_FORMAT_S32_OE: - case SPA_AUDIO_FORMAT_U32: - case SPA_AUDIO_FORMAT_U32_OE: - case SPA_AUDIO_FORMAT_F32: - case SPA_AUDIO_FORMAT_F32_OE: - return res * 4; - case SPA_AUDIO_FORMAT_F64: - case SPA_AUDIO_FORMAT_F64_OE: - return res * 8; - default: - return 0; - } -} - static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -1987,7 +1738,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) struct pw_properties *props = NULL; struct impl *impl; const char *str, *name, *hostname, *ip, *port; - int res; + int res = 0; PW_LOG_TOPIC_INIT(mod_topic); @@ -2035,67 +1786,15 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) goto error; } - if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); - - if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL) - pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); - - if (pw_properties_get(props, PW_KEY_DEVICE_ICON_NAME) == NULL) - pw_properties_set(props, PW_KEY_DEVICE_ICON_NAME, "audio-speakers"); - - if ((name = pw_properties_get(props, "raop.name")) == NULL) - name = "RAOP"; - - if ((str = strstr(name, "@"))) { - str++; - if (strlen(str) > 0) - name = str; - } - if ((hostname = pw_properties_get(props, "raop.hostname")) == NULL) - hostname = name; - - if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) - pw_properties_setf(props, PW_KEY_NODE_NAME, "raop_sink.%s.%s.%s", - hostname, ip, port); - if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) - pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, - "%s", name); - if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) - pw_properties_set(props, PW_KEY_NODE_LATENCY, "352/44100"); - - if ((str = pw_properties_get(props, "stream.props")) != NULL) - pw_properties_update_string(impl->stream_props, str, strlen(str)); - - copy_props(impl, props, PW_KEY_AUDIO_FORMAT); - copy_props(impl, props, PW_KEY_AUDIO_RATE); - copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); - copy_props(impl, props, SPA_KEY_AUDIO_POSITION); - copy_props(impl, props, PW_KEY_DEVICE_ICON_NAME); - copy_props(impl, props, PW_KEY_NODE_NAME); - copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); - copy_props(impl, props, PW_KEY_NODE_GROUP); - copy_props(impl, props, PW_KEY_NODE_LATENCY); - copy_props(impl, props, PW_KEY_NODE_VIRTUAL); - copy_props(impl, props, PW_KEY_MEDIA_CLASS); - - parse_audio_info(impl->stream_props, &impl->info); - - impl->frame_size = calc_frame_size(&impl->info); - if (impl->frame_size == 0) { - pw_log_error("unsupported audio format:%d channels:%d", - impl->info.format, impl->info.channels); - res = -EINVAL; - goto error; - } - if ((str = pw_properties_get(props, "raop.transport")) == NULL) str = "udp"; - if (spa_streq(str, "udp")) + if (spa_streq(str, "udp")) { impl->protocol = PROTO_UDP; - else if (spa_streq(str, "tcp")) + impl->psamples = FRAMES_PER_UDP_PACKET; + } else if (spa_streq(str, "tcp")) { impl->protocol = PROTO_TCP; - else { + impl->psamples = FRAMES_PER_TCP_PACKET; + } else { pw_log_error( "can't handle transport %s", str); res = -EINVAL; goto error; @@ -2129,9 +1828,78 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) str = pw_properties_get(props, "raop.password"); impl->password = str ? strdup(str) : NULL; - if ((str = pw_properties_get(props, "raop.latency.ms")) == NULL) - str = DEFAULT_LATENCY_MS; - impl->latency = SPA_MAX(atoi(str) * impl->info.rate / 1000u, RAOP_LATENCY_MIN); + if ((name = pw_properties_get(props, "raop.name")) == NULL) + name = "RAOP"; + + if ((str = strchr(name, '@')) != NULL) { + str++; + if (strlen(str) > 0) + name = str; + } + if ((hostname = pw_properties_get(props, "raop.hostname")) == NULL) + hostname = name; + + impl->rate = RAOP_RATE; + impl->latency = msec_to_samples(impl, RAOP_LATENCY_MS); + impl->stride = RAOP_STRIDE; + impl->mtu = impl->stride * impl->psamples; + impl->sync_period = impl->rate / impl->psamples; + + if (pw_properties_get(props, PW_KEY_AUDIO_FORMAT) == NULL) + pw_properties_setf(props, PW_KEY_AUDIO_FORMAT, "%s", RAOP_FORMAT); + if (pw_properties_get(props, PW_KEY_AUDIO_RATE) == NULL) + pw_properties_setf(props, PW_KEY_AUDIO_RATE, "%ld", impl->rate); + if (pw_properties_get(props, PW_KEY_DEVICE_ICON_NAME) == NULL) + pw_properties_set(props, PW_KEY_DEVICE_ICON_NAME, "audio-speakers"); + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) + pw_properties_setf(props, PW_KEY_NODE_NAME, "raop_sink.%s.%s.%s", + hostname, ip, port); + if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s", name); + if (pw_properties_get(props, PW_KEY_NODE_LATENCY) == NULL) + pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%ld", + impl->psamples, impl->rate); + if (pw_properties_get(props, PW_KEY_NODE_VIRTUAL) == NULL) + pw_properties_set(props, PW_KEY_NODE_VIRTUAL, "true"); + if (pw_properties_get(props, PW_KEY_MEDIA_CLASS) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + if (pw_properties_get(props, PW_KEY_MEDIA_FORMAT) == NULL) + pw_properties_setf(props, PW_KEY_MEDIA_FORMAT, "%d", SPA_AUDIO_FORMAT_S16_LE); + if (pw_properties_get(props, "net.mtu") == NULL) + pw_properties_setf(props, "net.mtu", "%d", impl->mtu); + if (pw_properties_get(props, "rtp.sender-ts-offset") == NULL) + pw_properties_setf(props, "rtp.sender-ts-offset", "%d", 0); + if (pw_properties_get(props, "sess.ts-direct") == NULL) + pw_properties_set(props, "sess.ts-direct", 0); + if (pw_properties_get(props, "sess.media") == NULL) + pw_properties_set(props, "sess.media", "raop"); + if (pw_properties_get(props, "sess.latency.msec") == NULL) + pw_properties_setf(props, "sess.latency.msec", "%d", RAOP_LATENCY_MS); + + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(impl->stream_props, str, strlen(str)); + + copy_props(impl, props, PW_KEY_AUDIO_FORMAT); + copy_props(impl, props, PW_KEY_AUDIO_RATE); + copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); + copy_props(impl, props, SPA_KEY_AUDIO_POSITION); + copy_props(impl, props, PW_KEY_DEVICE_ICON_NAME); + copy_props(impl, props, PW_KEY_NODE_NAME); + copy_props(impl, props, PW_KEY_NODE_DESCRIPTION); + copy_props(impl, props, PW_KEY_NODE_GROUP); + copy_props(impl, props, PW_KEY_NODE_LATENCY); + copy_props(impl, props, PW_KEY_NODE_VIRTUAL); + copy_props(impl, props, PW_KEY_MEDIA_CLASS); + copy_props(impl, props, PW_KEY_MEDIA_FORMAT); + copy_props(impl, props, "net.mtu"); + copy_props(impl, props, "rtp.sender-ts-offset"); + copy_props(impl, props, "sess.media"); + copy_props(impl, props, "sess.name"); + copy_props(impl, props, "sess.min-ptime"); + copy_props(impl, props, "sess.max-ptime"); + copy_props(impl, props, "sess.latency.msec"); + copy_props(impl, props, "sess.ts-refclk"); + copy_props(impl, props, "sess.ts-direct"); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { @@ -2156,8 +1924,23 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - if ((res = create_stream(impl)) < 0) + impl->stream = rtp_stream_new(impl->core, + PW_DIRECTION_INPUT, pw_properties_copy(impl->stream_props), + &stream_events, impl); + if (impl->stream == NULL) { + res = -errno; + pw_log_error("can't create raop stream: %m"); goto error; + } + + impl->headers = pw_properties_new(NULL, NULL); + + impl->rtsp = pw_rtsp_client_new(impl->loop, NULL, 0); + if (impl->rtsp == NULL) + goto error; + + pw_rtsp_client_add_listener(impl->rtsp, &impl->rtsp_listener, + &rtsp_events, impl); pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl); diff --git a/src/modules/module-rtp/audio.c b/src/modules/module-rtp/audio.c index d6305a686..326c65d84 100644 --- a/src/modules/module-rtp/audio.c +++ b/src/modules/module-rtp/audio.c @@ -226,6 +226,10 @@ static void rtp_audio_flush_packets(struct impl *impl) iov[0].iov_len = sizeof(header); while (avail >= tosend) { + if (impl->marker_on_first && impl->first) + header.m = 1; + else + header.m = 0; header.sequence_number = htons(impl->seq); header.timestamp = htonl(impl->ts_offset + timestamp); @@ -234,11 +238,12 @@ static void rtp_audio_flush_packets(struct impl *impl) (timestamp * stride) & BUFFER_MASK, &iov[1], tosend * stride); - pw_log_trace("sending %d timestamp:%d", tosend, timestamp); + pw_log_trace("sending %d avail:%d ts_offset:%d timestamp:%d", tosend, avail, impl->ts_offset, timestamp); rtp_stream_emit_send_packet(impl, iov, 3); impl->seq++; + impl->first = false; timestamp += tosend; avail -= tosend; } diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 3eb8d5b05..217c9a876 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -32,6 +32,7 @@ struct rtp_stream_events, m, v, ##__VA_ARGS__) #define rtp_stream_emit_destroy(s) rtp_stream_emit(s, destroy, 0) #define rtp_stream_emit_state_changed(s,n,e) rtp_stream_emit(s, state_changed,0,n,e) +#define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p) #define rtp_stream_emit_send_packet(s,i,l) rtp_stream_emit(s, send_packet,0,i,l) #define rtp_stream_emit_send_feedback(s,seq) rtp_stream_emit(s, send_feedback,0,seq) @@ -58,6 +59,7 @@ struct impl { unsigned have_ssrc:1; unsigned ignore_ssrc:1; unsigned have_seq:1; + unsigned marker_on_first:1; uint32_t ts_offset; uint32_t psamples; uint32_t mtu; @@ -102,6 +104,7 @@ static const struct format_info audio_format_info[] = { { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ALAW, 1, "PCMA", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_ULAW, 1, "PCMU", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_BE, 2, "L16", "audio" }, + { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S16_LE, 2, "L16", "audio" }, { SPA_MEDIA_SUBTYPE_raw, SPA_AUDIO_FORMAT_S24_BE, 3, "L24", "audio" }, { SPA_MEDIA_SUBTYPE_control, 0, 1, "rtp-midi", "audio" }, { SPA_MEDIA_SUBTYPE_opus, 0, 4, "opus", "audio" }, @@ -132,6 +135,8 @@ static int stream_start(struct impl *impl) if (impl->started) return 0; + impl->first = true; + rtp_stream_emit_state_changed(impl, true, NULL); impl->started = true; @@ -176,10 +181,17 @@ static void on_stream_state_changed(void *d, enum pw_stream_state old, } } +static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = d; + rtp_stream_emit_param_changed(impl, id, param); +}; + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = on_stream_state_changed, + .param_changed = on_stream_param_changed, .io_changed = stream_io_changed, }; @@ -287,6 +299,12 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; impl->payload = 127; } + else if (spa_streq(str, "raop")) { + impl->info.media_type = SPA_MEDIA_TYPE_audio; + impl->info.media_subtype = SPA_MEDIA_SUBTYPE_raw; + impl->payload = 0x60; + impl->marker_on_first = 1; + } else if (spa_streq(str, "midi")) { impl->info.media_type = SPA_MEDIA_TYPE_application; impl->info.media_subtype = SPA_MEDIA_SUBTYPE_control; @@ -364,6 +382,7 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, if (pw_properties_get(props, PW_KEY_NODE_NETWORK) == NULL) pw_properties_set(props, PW_KEY_NODE_NETWORK, "true"); + impl->marker_on_first = pw_properties_get_bool(props, "sess.marker-on-first", false); impl->ignore_ssrc = pw_properties_get_bool(props, "sess.ignore-ssrc", false); impl->direct_timestamp = pw_properties_get_bool(props, "sess.ts-direct", false); @@ -530,3 +549,40 @@ uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate) return pos->clock.position * impl->rate * pos->clock.rate.num / pos->clock.rate.denom; } + +uint16_t rtp_stream_get_seq(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + + return impl->seq; +} + +void rtp_stream_set_first(struct rtp_stream *s) +{ + struct impl *impl = (struct impl*)s; + + impl->first = true; +} + +enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **error) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_get_state(impl->stream, error); +} + +int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_set_param(impl->stream, id, param); +} + +int rtp_stream_update_params(struct rtp_stream *s, + const struct spa_pod **params, + uint32_t n_params) +{ + struct impl *impl = (struct impl*)s; + + return pw_stream_update_params(impl->stream, params, n_params); +} \ No newline at end of file diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 7686c565e..d53db7ecd 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -31,6 +31,8 @@ struct rtp_stream_events { void (*state_changed) (void *data, bool started, const char *error); + void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param); + void (*send_packet) (void *data, struct iovec *iov, size_t iovlen); void (*send_feedback) (void *data, uint32_t senum); @@ -46,6 +48,17 @@ int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate); +uint16_t rtp_stream_get_seq(struct rtp_stream *s); + +void rtp_stream_set_first(struct rtp_stream *s); + +enum pw_stream_state rtp_stream_get_state(struct rtp_stream *s, const char **error); + +int rtp_stream_set_param(struct rtp_stream *s, uint32_t id, const struct spa_pod *param); + +int rtp_stream_update_params(struct rtp_stream *stream, + const struct spa_pod **params, + uint32_t n_params); #ifdef __cplusplus }