module-raop-sink: Simplify rtp send functions

Reorganizes the rtp send functions.

Part of porting module-raop-sink towards the
module-rtp/stream facilities.
This commit is contained in:
Christian Glombek 2023-10-02 14:52:22 +02:00
parent d063dbdb62
commit 98db54f55d

View file

@ -215,7 +215,6 @@ struct impl {
struct pw_stream *stream; struct pw_stream *stream;
struct spa_hook stream_listener; struct spa_hook stream_listener;
struct spa_audio_info_raw info; struct spa_audio_info_raw info;
uint32_t frame_size;
struct pw_rtsp_client *rtsp; struct pw_rtsp_client *rtsp;
struct spa_hook rtsp_listener; struct spa_hook rtsp_listener;
@ -246,7 +245,8 @@ struct impl {
int server_fd; int server_fd;
struct spa_source *server_source; struct spa_source *server_source;
uint32_t block_size; uint32_t mtu;
uint32_t stride;
uint32_t latency; uint32_t latency;
uint16_t seq, cseq; uint16_t seq, cseq;
@ -307,11 +307,9 @@ static inline uint64_t ntp_now(void)
return timespec_to_ntp(&now); return timespec_to_ntp(&now);
} }
static int send_udp_sync_packet(struct impl *impl, static int send_udp_sync_packet(struct impl *impl, uint32_t rtptime)
struct sockaddr *dest_addr, socklen_t addrlen)
{ {
uint32_t out[3]; uint32_t out[3];
uint32_t rtptime = impl->rtptime;
uint32_t latency = impl->latency; uint32_t latency = impl->latency;
uint64_t transmitted; uint64_t transmitted;
struct rtp_header header; struct rtp_header header;
@ -339,8 +337,8 @@ static int send_udp_sync_packet(struct impl *impl,
iov[1].iov_base = out; iov[1].iov_base = out;
iov[1].iov_len = sizeof(out); iov[1].iov_len = sizeof(out);
msg.msg_name = dest_addr; msg.msg_name = NULL;
msg.msg_namelen = addrlen; msg.msg_namelen = 0;
msg.msg_iov = iov; msg.msg_iov = iov;
msg.msg_iovlen = 2; msg.msg_iovlen = 2;
msg.msg_control = NULL; msg.msg_control = NULL;
@ -440,146 +438,93 @@ static int write_codec_pcm(void *dst, void *frames, uint32_t n_frames)
return bp - b + 1; return bp - b + 1;
} }
static int flush_to_udp_packet(struct impl *impl) static ssize_t send_packet(int fd, struct msghdr *msg)
{ {
const size_t max = 8 + impl->block_size; ssize_t n;
uint32_t out[max], len, n_frames; n = sendmsg(fd, msg, MSG_NOSIGNAL);
struct rtp_header header; if (n < 0)
struct iovec iov[2]; pw_log_debug("sendmsg() failed: %m");
struct msghdr msg; return n;
uint8_t *dst;
int res;
if (!impl->recording)
return 0;
if (impl->first || ++impl->sync == impl->sync_period) {
impl->sync = 0;
send_udp_sync_packet(impl, NULL, 0);
}
spa_zero(header);
header.v = 2;
header.pt = 96;
if (impl->first)
header.m = 1;
header.sequence_number = htons(impl->seq);
header.timestamp = htonl(impl->rtptime);
header.ssrc = htonl(impl->ssrc);
iov[0].iov_base = &header;
iov[0].iov_len = 12;
n_frames = impl->filled / impl->frame_size;
dst = (uint8_t*)&out[0];
switch (impl->codec) {
case CODEC_PCM:
case CODEC_ALAC:
len = write_codec_pcm(dst, impl->buffer, n_frames);
break;
default:
len = 8 + impl->block_size;
memset(dst, 0, len);
break;
}
if (impl->encryption == CRYPTO_RSA)
aes_encrypt(impl, dst, len);
iov[1].iov_base = out;
iov[1].iov_len = len;
impl->rtptime += n_frames;
impl->seq = (impl->seq + 1) & 0xffff;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 2;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
res = sendmsg(impl->server_fd, &msg, MSG_NOSIGNAL);
if (res < 0) {
res = -errno;
pw_log_warn("error streaming packet: %d", res);
}
impl->first = false;
return res;
} }
static int flush_to_tcp_packet(struct impl *impl) static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
{ {
const size_t max = 8 + impl->block_size; struct impl *impl = data;
uint32_t tcp_pkt[1], out[max], len, n_frames; const size_t max = 8 + impl->mtu;
struct rtp_header header; uint32_t tcp_pkt[1], out[max], len, n_frames, rtptime;
struct iovec iov[3]; struct iovec tcp_iov[3], udp_iov[2];
struct rtp_header *header;
struct msghdr msg; struct msghdr msg;
uint8_t *dst; uint8_t *dst;
int res;
if (!impl->recording) if (!impl->recording)
return 0; return;
tcp_pkt[0] = htonl(0x24000000); header = (struct rtp_header*)iov[0].iov_base;
if (header->v != 2)
pw_log_warn("invalid rtp packet version");
iov[0].iov_base = &tcp_pkt; rtptime = htonl(header->timestamp);
iov[0].iov_len = 4;
spa_zero(header); if (header->m || impl->first || ++impl->sync == impl->sync_period) {
header.v = 2; send_udp_sync_packet(impl, rtptime);
header.pt = 96; impl->sync = 0;
header.sequence_number = htons(impl->seq); }
header.timestamp = htonl(impl->rtptime);
header.ssrc = htonl(impl->ssrc);
iov[1].iov_base = &header; n_frames = iov[1].iov_len / impl->stride;
iov[1].iov_len = 12;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
n_frames = impl->filled / impl->frame_size;
dst = (uint8_t*)&out[0]; dst = (uint8_t*)&out[0];
switch (impl->codec) { switch (impl->codec) {
case CODEC_PCM: case CODEC_PCM:
case CODEC_ALAC: case CODEC_ALAC:
len = write_codec_pcm(dst, impl->buffer, n_frames); len = write_codec_pcm(dst, (void *)iov[1].iov_base, n_frames);
break; break;
default: default:
len = 8 + impl->block_size; len = 8 + impl->mtu;
memset(dst, 0, len); memset(dst, 0, len);
break; break;
} }
if (impl->encryption == CRYPTO_RSA) if (impl->encryption == CRYPTO_RSA)
aes_encrypt(impl, dst, len); aes_encrypt(impl, dst, len);
out[0] |= htonl((uint32_t) len + 12); switch (impl->protocol) {
case PROTO_UDP:
udp_iov[0].iov_base = header;
udp_iov[0].iov_len = 12;
udp_iov[1].iov_base = out;
udp_iov[1].iov_len = len;
iov[2].iov_base = out; msg.msg_iov = udp_iov;
iov[2].iov_len = len;
impl->rtptime += n_frames;
impl->seq = (impl->seq + 1) & 0xffff;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 2; msg.msg_iovlen = 2;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
res = sendmsg(impl->server_fd, &msg, MSG_NOSIGNAL); break;
if (res < 0) { case PROTO_TCP:
res = -errno; out[0] |= htonl((uint32_t) len + 12);
pw_log_warn("error streaming packet: %d", res); tcp_pkt[0] = htonl(0x24000000);
tcp_iov[0].iov_base = &tcp_pkt;
tcp_iov[0].iov_len = 4;
tcp_iov[1].iov_base = header;
tcp_iov[1].iov_len = 12;
tcp_iov[2].iov_base = out;
tcp_iov[2].iov_len = len;
msg.msg_iov = tcp_iov;
msg.msg_iovlen = 3;
break;
} }
impl->first = false; pw_log_debug("raop sending header:%ld data:%ld", udp_iov[0].iov_len, udp_iov[1].iov_len);
return res; send_packet(impl->server_fd, &msg);
} }
static void playback_stream_process(void *d) static void playback_stream_process(void *d)
@ -589,6 +534,8 @@ static void playback_stream_process(void *d)
struct spa_data *bd; struct spa_data *bd;
uint8_t *data; uint8_t *data;
uint32_t offs, size; uint32_t offs, size;
struct iovec iov[2];
struct rtp_header header;
if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) { if ((buf = pw_stream_dequeue_buffer(impl->stream)) == NULL) {
pw_log_debug("out of buffers: %m"); pw_log_debug("out of buffers: %m");
@ -601,10 +548,20 @@ static void playback_stream_process(void *d)
size = SPA_MIN(bd->chunk->size, bd->maxsize - offs); size = SPA_MIN(bd->chunk->size, bd->maxsize - offs);
data = SPA_PTROFF(bd->data, offs, uint8_t); data = SPA_PTROFF(bd->data, offs, uint8_t);
while (size > 0 && impl->block_size > 0) { spa_zero(header);
header.v = 2;
header.pt = 96;
if (impl->first)
header.m = 1;
header.ssrc = htonl(impl->ssrc);
while (size > 0 && impl->mtu > 0) {
uint32_t avail, to_fill; uint32_t avail, to_fill;
avail = impl->block_size - impl->filled; header.sequence_number = htons(impl->seq);
header.timestamp = htonl(impl->rtptime);
avail = impl->mtu - impl->filled;
to_fill = SPA_MIN(avail, size); to_fill = SPA_MIN(avail, size);
memcpy(&impl->buffer[impl->filled], data, to_fill); memcpy(&impl->buffer[impl->filled], data, to_fill);
@ -615,15 +572,17 @@ static void playback_stream_process(void *d)
data += to_fill; data += to_fill;
if (avail == 0) { if (avail == 0) {
switch (impl->protocol) { iov[0].iov_base = &header;
case PROTO_UDP: iov[0].iov_len = 12;
flush_to_udp_packet(impl); iov[1].iov_base = impl->buffer;
break; iov[1].iov_len = impl->filled;
case PROTO_TCP:
flush_to_tcp_packet(impl); stream_send_packet(impl, iov, 2);
break;
} impl->rtptime += (impl->filled / impl->stride);
impl->seq = (impl->seq + 1) & 0xffff;
impl->filled = 0; impl->filled = 0;
impl->first = false;
} }
} }
@ -1013,7 +972,7 @@ static int rtsp_record_reply(void *data, int status, const struct spa_dict *head
impl->first = true; impl->first = true;
impl->sync = 0; impl->sync = 0;
impl->sync_period = impl->info.rate / (impl->block_size / impl->frame_size); impl->sync_period = impl->info.rate / (impl->mtu / impl->stride);
impl->recording = true; impl->recording = true;
rtsp_send_volume(impl); rtsp_send_volume(impl);
@ -1327,7 +1286,7 @@ static int rtsp_do_announce(struct impl *impl)
else else
frames = FRAMES_PER_UDP_PACKET; frames = FRAMES_PER_UDP_PACKET;
impl->block_size = frames * impl->frame_size; impl->mtu = frames * impl->stride;
pw_rtsp_client_get_local_ip(impl->rtsp, &ip_version, pw_rtsp_client_get_local_ip(impl->rtsp, &ip_version,
local_ip, sizeof(local_ip)); local_ip, sizeof(local_ip));
@ -1937,7 +1896,7 @@ static void parse_audio_info(const struct pw_properties *props, struct spa_audio
parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION)); parse_position(info, DEFAULT_POSITION, strlen(DEFAULT_POSITION));
} }
static int calc_frame_size(struct spa_audio_info_raw *info) static int calc_stride(struct spa_audio_info_raw *info)
{ {
int res = info->channels; int res = info->channels;
switch (info->format) { switch (info->format) {
@ -2081,8 +2040,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
parse_audio_info(impl->stream_props, &impl->info); parse_audio_info(impl->stream_props, &impl->info);
impl->frame_size = calc_frame_size(&impl->info); impl->stride = calc_stride(&impl->info);
if (impl->frame_size == 0) { if (impl->stride == 0) {
pw_log_error("unsupported audio format:%d channels:%d", pw_log_error("unsupported audio format:%d channels:%d",
impl->info.format, impl->info.channels); impl->info.format, impl->info.channels);
res = -EINVAL; res = -EINVAL;