diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 3b5b95fac..c34fe99fc 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -25,16 +25,20 @@ #include "config.h" #include -#include #include +#include +#include +#include #include #include +#include #include #include #include -#include #include +#include +#include #include #include @@ -70,10 +74,11 @@ * context.modules = [ * { name = libpipewire-module-rtp-source * args = { - * local.ip = 224.0.0.56 + * #sap.address = 224.0.0.56 + * #sap.port = 9875 + * #local.ifname = eth0 * sess.latency.msec = 200 - * source.name = "RTP Source" - * source.props = { + * stream.props = { * node.name = "rtp-source" * } * } @@ -85,6 +90,18 @@ #define NAME "rtp-source" +static const struct spa_dict_item module_info[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "RTP Source" }, + { PW_KEY_MODULE_USAGE, "sap.address= " + "sap.port= " + "local.ifname= " + "sess.latency.msec= " + "source.props= { key=value ... }" }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + + PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define PW_LOG_TOPIC_DEFAULT mod_topic @@ -92,6 +109,10 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define SAP_DEFAULT_PORT 9875 #define DEFAULT_SESS_LATENCY 200 +#define ERROR_MSEC 2 +#define MAX_SESSIONS 16 +#define MTU 1500 + struct impl { struct pw_impl_module *module; struct spa_hook module_listener; @@ -106,15 +127,17 @@ struct impl { struct spa_source *sap_source; - struct pw_properties *playback_props; + struct pw_properties *stream_props; unsigned int do_disconnect:1; - char *local_ip; - int local_port; + char *ifname; + char *sap_address; + int sap_port; int sess_latency_msec; struct spa_list sessions; + uint32_t n_sessions; }; struct sdp_info { @@ -144,21 +167,32 @@ struct session { struct spa_source *source; - struct pw_stream *playback; - struct spa_hook playback_listener; + struct pw_stream *stream; + struct spa_hook stream_listener; + + uint32_t expected_ssrc; + uint16_t expected_seq; + unsigned have_ssrc:1; + unsigned have_seq:1; + unsigned have_sync:1; struct spa_ringbuffer ring; uint8_t buffer[BUFFER_SIZE]; + + struct spa_dll dll; + uint32_t target_buffer; + float max_error; + unsigned buffering:1; }; static void stream_destroy(void *d) { struct session *sess = d; - spa_hook_remove(&sess->playback_listener); - sess->playback = NULL; + spa_hook_remove(&sess->stream_listener); + sess->stream = NULL; } -static void playback_process(void *data) +static void stream_process(void *data) { struct session *sess = data; struct pw_buffer *buf; @@ -166,8 +200,8 @@ static void playback_process(void *data) uint32_t index; int32_t avail, wanted; - if ((buf = pw_stream_dequeue_buffer(sess->playback)) == NULL) { - pw_log_debug("Out of playback buffers: %m"); + if ((buf = pw_stream_dequeue_buffer(sess->stream)) == NULL) { + pw_log_debug("Out of stream buffers: %m"); return; } d = buf->buffer->datas; @@ -178,15 +212,34 @@ static void playback_process(void *data) avail = spa_ringbuffer_get_read_index(&sess->ring, &index); - if (avail < wanted) { - pw_log_warn("capture underrun %d < %d", avail, wanted); + if (avail < wanted || sess->buffering) { memset(d[0].data, 0, wanted); + if (!sess->buffering) + pw_log_warn("underrun %u < %u", avail, wanted); } else { - spa_ringbuffer_read_data(&sess->ring, - sess->buffer, + float error, corr; + if (avail > (int32_t)BUFFER_SIZE) { + index += avail - sess->target_buffer; + avail = sess->target_buffer; + pw_log_warn("overrun %u > %u", avail, BUFFER_SIZE); + } else { + error = (float)sess->target_buffer - (float)avail; + error = SPA_CLAMP(error, -sess->max_error, sess->max_error); + + corr = spa_dll_update(&sess->dll, error); + + pw_log_debug("avail:%u target:%u error:%f corr:%f", avail, + sess->target_buffer, error, corr); + + pw_stream_set_control(sess->stream, + SPA_PROP_rate, 1, &corr, NULL); + } + spa_ringbuffer_read_data(&sess->ring, + sess->buffer, BUFFER_SIZE, index & BUFFER_MASK, d[0].data, wanted); + index += wanted; spa_ringbuffer_read_update(&sess->ring, index); } @@ -195,7 +248,7 @@ static void playback_process(void *data) d[0].chunk->offset = 0; buf->size = wanted / sess->info.stride; - pw_stream_queue_buffer(sess->playback, buf); + pw_stream_queue_buffer(sess->stream, buf); } static void on_stream_state_changed(void *d, enum pw_stream_state old, @@ -221,44 +274,72 @@ static const struct pw_stream_events out_stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = on_stream_state_changed, - .process = playback_process + .process = stream_process }; static void on_rtp_io(void *data, int fd, uint32_t mask) { struct session *sess = data; + struct rtp_header *hdr; + ssize_t len, hlen; + uint8_t buffer[2048], *payload; if (mask & SPA_IO_IN) { - uint8_t buffer[2048], *payload; - ssize_t len, hlen; - struct rtp_header *hdr; - uint32_t index; + uint32_t index, expected_index, timestamp; + uint16_t seq; int32_t filled; pw_log_debug("got rtp"); - if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) { - pw_log_warn("recv error: %m"); - return; - } + if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) + goto receive_error; + if (len < 12) - return; + goto short_packet; hdr = (struct rtp_header*)buffer; if (hdr->v != 2) - return; + goto invalid_version; hlen = 12 + hdr->cc * 4; if (hlen > len) - return; + goto invalid_len; - len -= hlen; + if (sess->have_ssrc && sess->expected_ssrc != hdr->ssrc) + goto unexpected_ssrc; + sess->expected_ssrc = hdr->ssrc; + sess->have_ssrc = true; + + seq = ntohs(hdr->sequence_number); + if (sess->have_seq && sess->expected_seq != seq) { + pw_log_warn("unexpected seq (%d != %d)", seq, sess->expected_seq); + } + sess->expected_seq = seq + 1; + sess->have_seq = true; + + len = SPA_ROUND_DOWN(len - hlen, sess->info.stride); payload = &buffer[hlen]; filled = spa_ringbuffer_get_write_index(&sess->ring, &index); + timestamp = ntohl(hdr->timestamp); + expected_index = timestamp * sess->info.stride; + + if (!sess->have_sync) { + sess->ring.readindex = sess->ring.writeindex = + index = expected_index; + sess->have_sync = true; + sess->buffering = true; + pw_log_info("sync to timestamp %u", index); + } else if (expected_index != index) { + pw_log_warn("unexpected timestamp (%u != %u)", + index / sess->info.stride, + expected_index / sess->info.stride); + index = expected_index; + } + if (filled + len > BUFFER_SIZE) { - pw_log_warn("capture overrun"); + pw_log_warn("capture overrun %u %zd", filled, len); } else { spa_ringbuffer_write_data(&sess->ring, sess->buffer, @@ -266,14 +347,37 @@ on_rtp_io(void *data, int fd, uint32_t mask) index & BUFFER_MASK, payload, len); index += len; + filled += len; spa_ringbuffer_write_update(&sess->ring, index); + + if ((uint32_t)filled > sess->target_buffer) + sess->buffering = false; } } + return; + +receive_error: + pw_log_warn("recv error: %m"); + return; +short_packet: + pw_log_warn("short packet received"); + return; +invalid_version: + pw_log_warn("invalid RTP version"); + return; +invalid_len: + pw_log_warn("invalid RTP length"); + return; +unexpected_ssrc: + pw_log_warn("unexpected SSRC (expected %u != %u)", + sess->expected_ssrc, hdr->ssrc); + return; } -static int make_multicast_socket(const struct sockaddr* sa, socklen_t salen) +static int make_multicast_socket(const struct sockaddr* sa, socklen_t salen, char *ifname) { int af, fd, val, res; + struct ifreq req; af = sa->sa_family; if ((fd = socket(af, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)) < 0) { @@ -294,14 +398,23 @@ static int make_multicast_socket(const struct sockaddr* sa, socklen_t salen) pw_log_error("setsockopt failed: %m"); goto error; } + + spa_zero(req); + if (ifname) { + snprintf(req.ifr_name, sizeof(req.ifr_name), "%s", ifname); + res = ioctl(fd, SIOCGIFINDEX, &req); + if (res < 0) + pw_log_warn("SIOCGIFINDEX %s failed: %m", ifname); + } res = 0; if (af == AF_INET) { static const uint32_t ipv4_mcast_mask = 0xe0000000; const struct sockaddr_in *sa4 = (struct sockaddr_in*)sa; if ((ntohl(sa4->sin_addr.s_addr) & ipv4_mcast_mask) == ipv4_mcast_mask) { - struct ip_mreq mr4; + struct ip_mreqn mr4; memset(&mr4, 0, sizeof(mr4)); mr4.imr_multiaddr = sa4->sin_addr; + mr4.imr_ifindex = req.ifr_ifindex; res = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mr4, sizeof(mr4)); } } else if (af == AF_INET6) { @@ -310,6 +423,7 @@ static int make_multicast_socket(const struct sockaddr* sa, socklen_t salen) struct ipv6_mreq mr6; memset(&mr6, 0, sizeof(mr6)); mr6.ipv6mr_multiaddr = sa6->sin6_addr; + mr6.ipv6mr_interface = req.ifr_ifindex; res = setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mr6, sizeof(mr6)); } } else { @@ -333,7 +447,12 @@ error: return res; } -static int session_new(struct impl *impl, struct sdp_info *sdp) +static uint32_t msec_to_bytes(struct sdp_info *info, uint32_t msec) +{ + return msec * info->stride * info->info.rate / 1000; +} + +static int session_new(struct impl *impl, struct sdp_info *info) { struct session *session; const struct spa_pod *params[1]; @@ -343,39 +462,59 @@ static int session_new(struct impl *impl, struct sdp_info *sdp) struct pw_properties *props; int res, fd; + if (impl->n_sessions >= MAX_SESSIONS) { + pw_log_warn("too many sessions (%u >= %u)", impl->n_sessions, MAX_SESSIONS); + return -EMFILE; + } + session = calloc(1, sizeof(struct session)); if (session == NULL) return -errno; session->impl = impl; - session->info = *sdp; + session->info = *info; - props = pw_properties_copy(impl->playback_props); + session->target_buffer = msec_to_bytes(info, impl->sess_latency_msec); + session->max_error = msec_to_bytes(info, ERROR_MSEC); + + spa_dll_init(&session->dll); + spa_dll_set_bw(&session->dll, SPA_DLL_BW_MIN, 128, session->info.info.rate); + + props = pw_properties_copy(impl->stream_props); if (props == NULL) return -errno; - pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", sdp->info.rate); - pw_properties_set(props, "rtp.origin", sdp->origin); - pw_properties_set(props, "rtp.session", sdp->session); - pw_properties_setf(props, "rtp.payload", "%u", sdp->payload); + pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%d", info->info.rate); + pw_properties_setf(props, PW_KEY_NODE_LATENCY, "%d/%d", + session->target_buffer / info->stride, info->info.rate); + pw_properties_set(props, "rtp.origin", info->origin); + pw_properties_setf(props, "rtp.payload", "%u", info->payload); + if (info->session[0]) { + pw_properties_set(props, "rtp.session", info->session); + pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Stream (%s)", + info->session); + } else { + pw_properties_set(props, PW_KEY_MEDIA_NAME, "RTP Stream"); + } - pw_log_info("new session %s %s", sdp->origin, sdp->session); + pw_log_info("new session %s %s", info->origin, info->session); - session->playback = pw_stream_new(impl->core, + session->stream = pw_stream_new(impl->core, "rtp-source playback", props); - if (session->playback == NULL) + if (session->stream == NULL) return -errno; - pw_stream_add_listener(session->playback, - &session->playback_listener, + pw_stream_add_listener(session->stream, + &session->stream_listener, &out_stream_events, session); n_params = 0; spa_pod_builder_init(&b, buffer, sizeof(buffer)); params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, - &sdp->info); + &info->info); - if ((res = pw_stream_connect(session->playback, + + if ((res = pw_stream_connect(session->stream, PW_DIRECTION_OUTPUT, PW_ID_ANY, PW_STREAM_FLAG_MAP_BUFFERS | @@ -384,7 +523,8 @@ static int session_new(struct impl *impl, struct sdp_info *sdp) params, n_params)) < 0) return res; - if ((fd = make_multicast_socket((const struct sockaddr *)&sdp->sa, sdp->salen)) < 0) + if ((fd = make_multicast_socket((const struct sockaddr *)&info->sa, + info->salen, impl->ifname)) < 0) return fd; pw_log_info("starting RTP listener"); @@ -394,15 +534,17 @@ static int session_new(struct impl *impl, struct sdp_info *sdp) return -errno; spa_list_append(&impl->sessions, &session->link); + impl->n_sessions++; return 0; } static void session_free(struct session *sess) { + sess->impl->n_sessions--; spa_list_remove(&sess->link); - if (sess->playback) - pw_stream_destroy(sess->playback); + if (sess->stream) + pw_stream_destroy(sess->stream); if (sess->source) pw_loop_destroy_source(sess->impl->loop, sess->source); free(sess); @@ -642,20 +784,20 @@ static int start_sap_listener(struct impl *impl) socklen_t salen; int fd, res; - if (inet_pton(AF_INET, impl->local_ip, &sa4.sin_addr) > 0) { + if (inet_pton(AF_INET, impl->sap_address, &sa4.sin_addr) > 0) { sa4.sin_family = AF_INET; - sa4.sin_port = htons(impl->local_port); + sa4.sin_port = htons(impl->sap_port); sa = (struct sockaddr*) &sa4; salen = sizeof(sa4); - } else if (inet_pton(AF_INET6, impl->local_ip, &sa6.sin6_addr) > 0) { + } else if (inet_pton(AF_INET6, impl->sap_address, &sa6.sin6_addr) > 0) { sa6.sin6_family = AF_INET6; - sa6.sin6_port = htons(impl->local_port); + sa6.sin6_port = htons(impl->sap_port); sa = (struct sockaddr*) &sa6; salen = sizeof(sa6); } else return -EINVAL; - if ((fd = make_multicast_socket(sa, salen)) < 0) + if ((fd = make_multicast_socket(sa, salen, impl->ifname)) < 0) return fd; pw_log_info("starting SAP listener"); @@ -693,10 +835,11 @@ static void impl_destroy(struct impl *impl) if (impl->core && impl->do_disconnect) pw_core_disconnect(impl->core); - pw_properties_free(impl->playback_props); + pw_properties_free(impl->stream_props); pw_properties_free(impl->props); - free(impl->local_ip); + free(impl->ifname); + free(impl->sap_address); free(impl); } @@ -728,22 +871,12 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; -static const struct spa_dict_item module_info[] = { - { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, - { PW_KEY_MODULE_DESCRIPTION, "rtp source" }, - { PW_KEY_MODULE_USAGE, "source.name= " - "sess.latency.msec= " - "local.ip= " - "source.props= { key=value ... }" }, - { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, -}; - SPA_EXPORT int pipewire__module_init(struct pw_impl_module *module, const char *args) { struct pw_context *context = pw_impl_module_get_context(module); struct impl *impl; - struct pw_properties *props = NULL, *playback_props = NULL; + struct pw_properties *props = NULL, *stream_props = NULL; const char *str; int res = 0; @@ -765,49 +898,39 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) spa_list_init(&impl->sessions); impl->props = props; - playback_props = pw_properties_new(NULL, NULL); - if (playback_props == NULL) { + stream_props = pw_properties_new(NULL, NULL); + if (stream_props == NULL) { res = -errno; pw_log_error( "can't create properties: %m"); goto out; } - impl->playback_props = playback_props; + impl->stream_props = stream_props; impl->module = module; impl->module_context = context; impl->loop = pw_context_get_main_loop(context); - if ((str = pw_properties_get(props, "source.name")) != NULL) { - pw_properties_set(playback_props, PW_KEY_NODE_NAME, str); - pw_properties_set(props, "source.name", NULL); - } + if ((str = pw_properties_get(props, "stream.props")) != NULL) + pw_properties_update_string(stream_props, str, strlen(str)); - if ((str = pw_properties_get(props, "source.props")) != NULL) - pw_properties_update_string(playback_props, str, strlen(str)); + if (pw_properties_get(stream_props, PW_KEY_NODE_NAME) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_NAME, "rtp-source"); + if (pw_properties_get(stream_props, PW_KEY_NODE_DESCRIPTION) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_DESCRIPTION, "RTP Source"); + if (pw_properties_get(stream_props, PW_KEY_NODE_VIRTUAL) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_VIRTUAL, "true"); + if (pw_properties_get(stream_props, PW_KEY_NODE_NETWORK) == NULL) + pw_properties_set(stream_props, PW_KEY_NODE_NETWORK, "true"); - if (pw_properties_get(playback_props, PW_KEY_NODE_NAME) == NULL) - pw_properties_set(playback_props, PW_KEY_NODE_NAME, "rtp-source"); - if (pw_properties_get(playback_props, PW_KEY_NODE_DESCRIPTION) == NULL) - pw_properties_set(playback_props, PW_KEY_NODE_DESCRIPTION, "RTP Source"); - if (pw_properties_get(playback_props, PW_KEY_NODE_VIRTUAL) == NULL) - pw_properties_set(playback_props, PW_KEY_NODE_VIRTUAL, "true"); - if (pw_properties_get(playback_props, PW_KEY_NODE_NETWORK) == NULL) - pw_properties_set(playback_props, PW_KEY_NODE_NETWORK, "true"); + str = pw_properties_get(props, "local.ifname"); + impl->ifname = str ? strdup(str) : NULL; - if ((str = pw_properties_get(props, "local.ip")) != NULL) { - impl->local_ip = strdup(str); - pw_properties_set(props, "local.ip", NULL); - } else { - impl->local_ip = strdup(SAP_DEFAULT_IP); - } - impl->local_port = SAP_DEFAULT_PORT; - - if ((str = pw_properties_get(props, "sess.latency.msec")) != NULL) { - impl->sess_latency_msec = pw_properties_parse_int(str); - pw_properties_set(props, "sess.latency.msec", NULL); - } else { - impl->sess_latency_msec = DEFAULT_SESS_LATENCY; - } + str = pw_properties_get(props, "sap.address"); + impl->sap_address = strdup(str ? str : SAP_DEFAULT_IP); + impl->sap_port = pw_properties_get_uint32(props, + "sap.port", SAP_DEFAULT_PORT); + impl->sess_latency_msec = pw_properties_get_uint32(props, + "sess.latency.msec", DEFAULT_SESS_LATENCY); impl->core = pw_context_get_object(impl->module_context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) {