From 4715fa1775a2537c71bc6bb47b085d5d313eebcd Mon Sep 17 00:00:00 2001 From: Jonas Holmberg Date: Tue, 6 Feb 2024 15:04:43 +0100 Subject: [PATCH] module-rtp: Add source/destination.ip to props Make it possible to change source.ip in module-rtp-source and destination.ip in module-rtp-sink. --- src/modules/module-rtp-sink.c | 167 ++++++++++++++++++++++---------- src/modules/module-rtp-source.c | 62 ++++++++++++ src/modules/module-rtp/stream.c | 6 ++ src/modules/module-rtp/stream.h | 2 + 4 files changed, 187 insertions(+), 50 deletions(-) diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 84b280c6d..9e2b2e6ca 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -186,47 +186,6 @@ struct impl { int rtp_fd; }; -static void stream_destroy(void *d) -{ - struct impl *impl = d; - impl->stream = NULL; -} - -static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) -{ - struct impl *impl = data; - struct msghdr msg; - ssize_t n; - - spa_zero(msg); - msg.msg_iov = iov; - msg.msg_iovlen = iovlen; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); - if (n < 0) - pw_log_warn("sendmsg() failed: %m"); -} - -static void stream_state_changed(void *data, bool started, const char *error) -{ - struct impl *impl = data; - - if (error) { - pw_log_error("stream error: %s", error); - pw_impl_module_schedule_destroy(impl->module); - } -} - -static const struct rtp_stream_events stream_events = { - RTP_VERSION_STREAM_EVENTS, - .destroy = stream_destroy, - .state_changed = stream_state_changed, - .send_packet = stream_send_packet, -}; - static int parse_address(const char *address, uint16_t port, struct sockaddr_storage *addr, socklen_t *len) { @@ -315,6 +274,123 @@ error: return res; } +static void stream_destroy(void *d) +{ + struct impl *impl = d; + impl->stream = NULL; +} + +static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) +{ + struct impl *impl = data; + struct msghdr msg; + ssize_t n; + + spa_zero(msg); + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); + if (n < 0) + pw_log_warn("sendmsg() failed: %m"); +} + +static void stream_state_changed(void *data, bool started, const char *error) +{ + struct impl *impl = data; + + if (error) { + pw_log_error("stream error: %s", error); + pw_impl_module_schedule_destroy(impl->module); + } else if (started) { + int res; + + if ((res = make_socket(&impl->src_addr, impl->src_len, + &impl->dst_addr, impl->dst_len, + impl->mcast_loop, impl->ttl, impl->dscp, + impl->ifname)) < 0) { + pw_log_error("can't make socket: %s", spa_strerror(res)); + return; + } + impl->rtp_fd = res; + } else { + close(impl->rtp_fd); + impl->rtp_fd = -1; + } +} + +static void stream_props_changed(struct impl *impl, uint32_t id, const struct spa_pod *param) +{ + struct spa_pod_object *obj = (struct spa_pod_object *)param; + struct spa_pod_prop *prop; + + if (param == NULL) + return; + + SPA_POD_OBJECT_FOREACH(obj, prop) { + if (prop->key == SPA_PROP_params) { + struct spa_pod *params = NULL; + struct spa_pod_parser prs; + struct spa_pod_frame f; + const char *key; + struct spa_pod *pod; + const char *value; + + if (spa_pod_parse_object(param, SPA_TYPE_OBJECT_Props, NULL, SPA_PROP_params, + SPA_POD_OPT_Pod(¶ms)) < 0) + return; + spa_pod_parser_pod(&prs, params); + if (spa_pod_parser_push_struct(&prs, &f) < 0) + return; + + while (true) { + if (spa_pod_parser_get_string(&prs, &key) < 0) + break; + if (spa_pod_parser_get_pod(&prs, &pod) < 0) + break; + if (spa_pod_get_string(pod, &value) < 0) + continue; + pw_log_info("key '%s', value '%s'", key, value); + if (!spa_streq(key, "destination.ip")) + continue; + if (parse_address(value, impl->dst_port, &impl->dst_addr, + &impl->dst_len) < 0) { + pw_log_error("invalid destination.ip: '%s'", value); + break; + } + pw_properties_set(impl->stream_props, "rtp.destination.ip", value); + struct spa_dict_item item[1]; + item[0] = SPA_DICT_ITEM_INIT("rtp.destination.ip", value); + rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1)); + break; + } + } + } +} + +static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = data; + + switch (id) { + case SPA_PARAM_Props: + if (param != NULL) + stream_props_changed(impl, id, param); + break; + } +} + +static const struct rtp_stream_events stream_events = { + RTP_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = stream_state_changed, + .param_changed = stream_param_changed, + .send_packet = stream_send_packet, +}; + static int get_ip(const struct sockaddr_storage *sa, char *ip, size_t len) { if (sa->ss_family == AF_INET) { @@ -531,15 +607,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - if ((res = make_socket(&impl->src_addr, impl->src_len, - &impl->dst_addr, impl->dst_len, - impl->mcast_loop, impl->ttl, impl->dscp, - impl->ifname)) < 0) { - pw_log_error("can't make socket: %s", spa_strerror(res)); - goto out; - } - impl->rtp_fd = res; - impl->stream = rtp_stream_new(impl->core, PW_DIRECTION_INPUT, pw_properties_copy(stream_props), &stream_events, impl); diff --git a/src/modules/module-rtp-source.c b/src/modules/module-rtp-source.c index 62c8eb7d2..bbab67228 100644 --- a/src/modules/module-rtp-source.c +++ b/src/modules/module-rtp-source.c @@ -378,10 +378,72 @@ static void stream_state_changed(void *data, bool started, const char *error) } } +static void stream_props_changed(struct impl *impl, uint32_t id, const struct spa_pod *param) +{ + struct spa_pod_object *obj = (struct spa_pod_object *)param; + struct spa_pod_prop *prop; + + if (param == NULL) + return; + + SPA_POD_OBJECT_FOREACH(obj, prop) { + if (prop->key == SPA_PROP_params) { + struct spa_pod *params = NULL; + struct spa_pod_parser prs; + struct spa_pod_frame f; + const char *key; + struct spa_pod *pod; + const char *value; + + if (spa_pod_parse_object(param, SPA_TYPE_OBJECT_Props, NULL, SPA_PROP_params, + SPA_POD_OPT_Pod(¶ms)) < 0) + return; + spa_pod_parser_pod(&prs, params); + if (spa_pod_parser_push_struct(&prs, &f) < 0) + return; + + while (true) { + if (spa_pod_parser_get_string(&prs, &key) < 0) + break; + if (spa_pod_parser_get_pod(&prs, &pod) < 0) + break; + if (spa_pod_get_string(pod, &value) < 0) + continue; + pw_log_info("key '%s', value '%s'", key, value); + if (!spa_streq(key, "source.ip")) + continue; + if (parse_address(value, impl->src_port, &impl->src_addr, + &impl->src_len) < 0) { + pw_log_error("invalid source.ip: '%s'", value); + break; + } + pw_properties_set(impl->stream_props, "rtp.source.ip", value); + struct spa_dict_item item[1]; + item[0] = SPA_DICT_ITEM_INIT("rtp.source.ip", value); + rtp_stream_update_properties(impl->stream, &SPA_DICT_INIT(item, 1)); + break; + } + } + } +} + +static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) +{ + struct impl *impl = data; + + switch (id) { + case SPA_PARAM_Props: + if (param != NULL) + stream_props_changed(impl, id, param); + break; + } +} + static const struct rtp_stream_events stream_events = { RTP_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = stream_state_changed, + .param_changed = stream_param_changed, }; static void on_timer_event(void *data, uint64_t expirations) diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 3ef0fe931..7bc2a9e51 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -602,6 +602,12 @@ void rtp_stream_destroy(struct rtp_stream *s) free(impl); } +int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *dict) +{ + struct impl *impl = (struct impl*)s; + return pw_stream_update_properties(impl->stream, dict); +} + int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len) { struct impl *impl = (struct impl*)s; diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index 96f6416c2..e81422392 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -44,6 +44,8 @@ struct rtp_stream *rtp_stream_new(struct pw_core *core, void rtp_stream_destroy(struct rtp_stream *s); +int rtp_stream_update_properties(struct rtp_stream *s, const struct spa_dict *dict); + int rtp_stream_receive_packet(struct rtp_stream *s, uint8_t *buffer, size_t len); uint64_t rtp_stream_get_time(struct rtp_stream *s, uint64_t *rate);