diff --git a/src/modules/module-rtp-sink.c b/src/modules/module-rtp-sink.c index 2e294d63f..e0d34482b 100644 --- a/src/modules/module-rtp-sink.c +++ b/src/modules/module-rtp-sink.c @@ -15,7 +15,10 @@ #include #include +#include +#include #include +#include #include #include #include @@ -121,6 +124,51 @@ *] *\endcode * + * ## Adding and removing receivers through commands + * + * The following commands can be sent to the RTP sink node via `pw_node_send_command()`: + * + * * `add-receiver` : Adds a receiver to the sink's list. If the given + * IP address <-> port combination was already added, the command is + * logged, but otherwise ignored. Arguments: + * - `destination.ip` : IP address to send data to. Can be a uni- or + * multicast address, but must be a valid address. + * - `destination.port` : Port to send data to. Must be valid. + * - `local.ifname`, `source.ip`, `net.ttl`, `net.dscp`, `net.loop` : + * These are all optional, and work just like in the RTP sink + * module's properties. + * + * * `remove-receiver` : Removes a receiver from the sink's list. The + * receiver is identified by the given IP address. A port can optionally + * be specified as well. If it isn't, then the first receiver with that IP + * address is removed. If no matching receiver is in the sink's list, + * this command does nothing. Arguments: + * - `destination.ip` : IP address to send data to. Can be a uni- or + * multicast address, but must be a valid address. + * - `destination.port` : Port to send data to. This is optional. But, if + * it is set, it must be a valid port number. + * + * * `clear-receivers` : Removes all receivers from the sink's list. If the + * list is empty, this does nothing. This command has no arguments. + * + * If the RTP sink module is created with the `destination.ip` and + * `destination.port` properties set, it behaves as if `add-receiver` were + * called right after the module was initialized. This means that if none + * of these commands are used, the module behaves just as it did prior to + * this patch. Note that the `remove-receivers` command can remove this + * initial receiver as well. + * + * If no receivers are added, the module continues to work normally. + * Adding and removing receivers mid-operation is supported. + * + * Example pw-cli calls (56 is the ID of the RTP sink node): + * + * \code{.unparsed} + * pw-cli c 56 User '{ extra="{ \"command.id\" : \"add-receiver\" , \"destination.ip\" : \"10.42.0.1\", \"destination.port\" : 55001 }" }' + * pw-cli c 56 User '{ extra="{ \"command.id\" : \"remove-receiver\", \"destination.ip\" : \"10.42.0.1\" }" }' + * pw-cli c 56 User '{ extra="{ \"command.id\" : \"clear-receivers\" }" }' + * \endcode + * * \since 0.3.60 */ @@ -166,6 +214,36 @@ static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; +enum rtp_target_type { + RTP_TARGET_TYPE_UNICAST, + RTP_TARGET_TYPE_MULTICAST +}; + +struct rtp_target { + struct spa_list link; + + /* Common multicast and unicast fields */ + + enum rtp_target_type type; + + uint16_t dest_port; + struct sockaddr_storage dest_addr; + socklen_t dest_addrlen; + + int socket_fd; + + uint32_t ttl; + uint32_t dscp; + + /* Multicast specific fields */ + + char *ifname; + bool mcast_loop; + + struct sockaddr_storage src_addr; + socklen_t src_addrlen; +}; + struct impl { struct pw_context *context; @@ -186,19 +264,15 @@ struct impl { unsigned int do_disconnect_core:1; - char *ifname; - uint32_t ttl; - bool mcast_loop; - uint32_t dscp; + struct spa_list rtp_targets; + size_t num_rtp_targets; - struct sockaddr_storage src_addr; - socklen_t src_len; - - uint16_t dst_port; - struct sockaddr_storage dst_addr; - socklen_t dst_len; - - int rtp_fd; + /* This flag is needed to know whether on_add_receiver() shall connect + * the socket immediately, or keep the target disconnected. The latter + * case is done when the PW node is not running; then, once it does start + * running, the stream.c stream_start() call will in turn call + * stream_open_connection(), which will connect the socket. */ + bool stream_connected; }; static int make_socket(struct sockaddr_storage *src, socklen_t src_len, @@ -274,6 +348,255 @@ static void stream_destroy(void *d) impl->stream = NULL; } +static void teardown_rtp_target(struct rtp_target *target); + +static int setup_rtp_target(struct rtp_target *target, + const char *ifname, const char *destination_ip, uint16_t destination_port, + const char *source_ip, uint32_t ttl, uint32_t dscp, bool mcast_loop) +{ + int res = 0; + + memset(target, 0, sizeof(struct rtp_target)); + target->socket_fd = -1; + + if (destination_port == 0) + target->dest_port = (DEFAULT_PORT + ((uint32_t) (pw_rand32() % 512) << 1)); + else + target->dest_port = destination_port; + + if ((res = pw_net_parse_address(destination_ip, target->dest_port, + &target->dest_addr, &target->dest_addrlen)) < 0) { + pw_log_error("invalid destination IP \"%s\": %s", destination_ip, spa_strerror(res)); + goto error; + } + + if (source_ip == NULL) + source_ip = (target->dest_addr.ss_family == AF_INET) ? + DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6; + if ((res = pw_net_parse_address(source_ip, 0, &target->src_addr, + &target->src_addrlen)) < 0) { + pw_log_error("invalid source IP \"%s\": %s", source_ip, spa_strerror(res)); + goto error; + } + + target->ttl = ttl; + target->dscp = dscp; + target->mcast_loop = mcast_loop; + target->ifname = ifname ? strdup(ifname) : NULL; + + target->type = pw_net_is_multicast(&(target->dest_addr)) + ? RTP_TARGET_TYPE_MULTICAST + : RTP_TARGET_TYPE_UNICAST; + +out: + return res; + +error: + teardown_rtp_target(target); + goto out; +} + +static int setup_rtp_target_from_props(struct rtp_target *target, struct pw_properties *props, bool *target_was_setup) +{ + const char *destination_ip; + uint16_t destination_port; + const char *ifname; + const char *source_ip; + uint32_t ttl; + uint32_t dscp; + bool mcast_loop; + int res; + + *target_was_setup = false; + + destination_ip = pw_properties_get(props, "destination.ip"); + if (destination_ip == NULL) + return 0; + + destination_port = pw_properties_get_uint32(props, "destination.port", 0); + ifname = pw_properties_get(props, "local.ifname"); + source_ip = pw_properties_get(props, "source.ip"); + ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); + dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP); + mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); + + res = setup_rtp_target(target, ifname, destination_ip, destination_port, source_ip, ttl, dscp, mcast_loop); + + if (res == 0) + *target_was_setup = true; + + return res; +} + +static void teardown_rtp_target(struct rtp_target *target) +{ + if (target->socket_fd >= 0) { + close(target->socket_fd); + target->socket_fd = -1; + } + + free(target->ifname); + target->ifname = NULL; +} + +#define LOG_RTP_TARGET(LOGLEVEL, PREFIX, TARGET) \ + do { \ + if (SPA_UNLIKELY(pw_log_topic_enabled((LOGLEVEL), PW_LOG_TOPIC_DEFAULT))) { \ + char src_addr_str_buf[256]; \ + char dest_addr_str_buf[256]; \ + const char *type_str; \ + const char *src_addr_str; \ + const char *dest_addr_str; \ + const char *iface_name; \ + uint16_t dest_port = 0; \ + \ + type_str = ((TARGET)->type == RTP_TARGET_TYPE_UNICAST) ? "unicast" : "multicast"; \ + src_addr_str = dest_addr_str = ""; \ + iface_name = ((TARGET)->ifname != NULL) ? (TARGET)->ifname : ""; \ + \ + if (pw_net_get_ip(&((TARGET)->src_addr), src_addr_str_buf, sizeof(src_addr_str_buf), NULL, NULL) == 0) \ + src_addr_str = src_addr_str_buf; \ + if (pw_net_get_ip(&((TARGET)->dest_addr), dest_addr_str_buf, sizeof(dest_addr_str_buf), NULL, &dest_port) == 0) \ + dest_addr_str = dest_addr_str_buf; \ + \ + if ((TARGET)->socket_fd < 0) { \ + pw_log_logt((LOGLEVEL), PW_LOG_TOPIC_DEFAULT, __FILE__, __LINE__, __func__, \ + "%s: type: %s; dest addr: %s; dest port: %" PRIu16 "; " \ + "src addr: %s; TTL: %" PRIu32 "; DSCP: %" PRIu32 "; " \ + "iface name: %s; mcast loop: %d; (socket not opened yet)", \ + (PREFIX), type_str, dest_addr_str, dest_port, src_addr_str, \ + (TARGET)->ttl, (TARGET)->dscp, iface_name, (TARGET)->mcast_loop); \ + } else { \ + pw_log_logt((LOGLEVEL), PW_LOG_TOPIC_DEFAULT, __FILE__, __LINE__, __func__, \ + "%s type: %s; dest addr: %s; dest port: %" PRIu16 "; " \ + "src addr: %s; TTL: %" PRIu32 "; DSCP: %" PRIu32 "; " \ + "iface name: %s; mcast loop: %d; socket FD: %d", (PREFIX), \ + type_str, dest_addr_str, dest_port, src_addr_str, (TARGET)->ttl, \ + (TARGET)->dscp, iface_name, (TARGET)->mcast_loop, (TARGET)->socket_fd); \ + } \ + } \ + } while (0) + +static struct rtp_target *find_rtp_target_by_sockaddr(struct impl *impl, struct sockaddr_storage *address, bool compare_ports) +{ + struct rtp_target *rtp_target; + + spa_list_for_each(rtp_target, &(impl->rtp_targets), link) { + if (pw_net_are_addresses_equal(&(rtp_target->dest_addr), address, compare_ports)) { + return rtp_target; + } + } + + return NULL; +} + +static struct rtp_target *find_rtp_target(struct impl *impl, const char *destination_ip, uint16_t destination_port) +{ + int res; + struct sockaddr_storage dest_addr; + socklen_t dest_len; + + if ((res = pw_net_parse_address(destination_ip, destination_port, + &dest_addr, &dest_len)) < 0) { + pw_log_error("invalid destination IP \"%s\": %s", destination_ip, spa_strerror(res)); + return NULL; + } + + return find_rtp_target_by_sockaddr(impl, &dest_addr, destination_port != 0); +} + +static int append_to_rtp_targets(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + /* IMPORTANT: This must be run from within the data loop, since the rtp_targets + * list is modified here, and the stream_send_packet() function (which runs in + * the data loop thread), iterates over this same list. */ + + struct impl *impl = user_data; + struct rtp_target *rtp_target_to_add = (struct rtp_target *)data; + + spa_list_append(&(impl->rtp_targets), &(rtp_target_to_add->link)); + impl->num_rtp_targets++; + + return 0; +} + +static int add_rtp_target(struct impl *impl, struct rtp_target *rtp_target, bool append_in_data_loop) +{ + struct rtp_target *rtp_target_copy; + + LOG_RTP_TARGET(SPA_LOG_LEVEL_INFO, "Adding RTP target", rtp_target); + + /* Allocate and fill the target here, outside of the data loop + * thread, to not unnecessarily block it. */ + rtp_target_copy = malloc(sizeof(struct rtp_target)); + if (SPA_UNLIKELY(rtp_target_copy == NULL)) { + pw_log_error("could not allocate memory for new target"); + return -ENOMEM; + } + + memcpy(rtp_target_copy, rtp_target, sizeof(struct rtp_target)); + /* Ensure that nothing is stored in the spa_list link; it will + * be filled by spa_list_append(). */ + spa_zero(rtp_target_copy->link); + + if (append_in_data_loop) + rtp_stream_run_in_data_loop(impl->stream, append_to_rtp_targets, 1, rtp_target_copy, 0, impl); + else + append_to_rtp_targets(NULL, false, 0, rtp_target_copy, 0, impl); + + return 0; +} + +static int remove_from_rtp_targets(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) +{ + /* IMPORTANT: This must be run from within the data loop, since the rtp_targets + * list is modified here, and the stream_send_packet() function (which runs in + * the data loop thread), iterates over this same list. */ + + struct impl *impl = user_data; + struct rtp_target *rtp_target_to_remove = (struct rtp_target *)data; + + spa_list_remove(&(rtp_target_to_remove->link)); + impl->num_rtp_targets--; + + return 0; +} + +static void remove_rtp_target(struct impl *impl, struct rtp_target *rtp_target_to_remove, + bool remove_in_data_loop, const char *custom_prefix) +{ + if (custom_prefix == NULL) + custom_prefix = "Removing RTP target"; + + LOG_RTP_TARGET(SPA_LOG_LEVEL_INFO, custom_prefix, rtp_target_to_remove); + + if (remove_in_data_loop) + rtp_stream_run_in_data_loop(impl->stream, remove_from_rtp_targets, 1, rtp_target_to_remove, 0, impl); + else + remove_from_rtp_targets(NULL, false, 0, rtp_target_to_remove, 0, impl); + + teardown_rtp_target(rtp_target_to_remove); + + free(rtp_target_to_remove); +} + +static void remove_rtp_target_by_ip_and_port(struct impl *impl, const char *destination_ip, + uint16_t destination_port, bool remove_in_data_loop) +{ + struct rtp_target *rtp_target_to_remove; + + if (impl->num_rtp_targets == 0) + return; + + rtp_target_to_remove = find_rtp_target(impl, destination_ip, destination_port); + if (rtp_target_to_remove == NULL) + return; + + remove_rtp_target(impl, rtp_target_to_remove, remove_in_data_loop, NULL); +} + static inline uint64_t get_time_ns(void) { struct timespec ts; @@ -286,6 +609,7 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) struct impl *impl = data; struct msghdr msg; ssize_t n; + struct rtp_target *rtp_target; spa_zero(msg); msg.msg_iov = iov; @@ -294,11 +618,18 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen) msg.msg_controllen = 0; msg.msg_flags = 0; - n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); - if (n < 0) { - int suppressed; - if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) - pw_log_warn("(%d suppressed) sendmsg() failed: %m", suppressed); + spa_list_for_each(rtp_target, &(impl->rtp_targets), link) { + /* All targets are required to have open sockets + * by the time sending takes place. */ + spa_assert(rtp_target->socket_fd >= 0); + n = sendmsg(rtp_target->socket_fd, &msg, MSG_NOSIGNAL); + if (n < 0) { + int suppressed; + if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) { + pw_log_warn("(%d suppressed) sendmsg() failed: %m", suppressed); + LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "RTP target", rtp_target); + } + } } } @@ -311,40 +642,56 @@ static void stream_report_error(void *data, const char *error) } } +static void stream_close_connection(void *data, int *result); + static void stream_open_connection(void *data, int *result) { int res; struct impl *impl = data; + struct rtp_target *rtp_target; - if ((res = make_socket(&impl->src_addr, impl->src_len, - &impl->dst_addr, impl->dst_len, - impl->mcast_loop, impl->ttl, impl->dscp, - impl->ifname)) < 0) { - pw_log_error("can't make socket: %s", spa_strerror(res)); - rtp_stream_set_error(impl->stream, res, "Can't make socket"); - if (result) - *result = res; - return; + spa_list_for_each(rtp_target, &(impl->rtp_targets), link) { + if ((res = make_socket(&rtp_target->src_addr, rtp_target->src_addrlen, + &rtp_target->dest_addr, rtp_target->dest_addrlen, + rtp_target->mcast_loop, rtp_target->ttl, + rtp_target->dscp, rtp_target->ifname)) < 0) { + pw_log_error("can't make socket: %s", spa_strerror(res)); + LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "RTP target", rtp_target); + rtp_stream_set_error(impl->stream, res, "Can't make socket"); + stream_close_connection(data, NULL); + if (result) + *result = res; + return; + } + rtp_target->socket_fd = res; } if (result) *result = 1; - impl->rtp_fd = res; + /* Now, after all sockets are opened, mark the stream as connected. */ + impl->stream_connected = true; } static void stream_close_connection(void *data, int *result) { struct impl *impl = data; + struct rtp_target *rtp_target; - if (impl->rtp_fd > 0) { - if (result) - *result = 1; - close(impl->rtp_fd); - impl->rtp_fd = -1; - } else { - if (result) - *result = 0; + /* Mark the stream as disconnected to let future on_add_receiver() + * calls know that they must not connect the socket on their own. */ + impl->stream_connected = false; + + if (result) + *result = 0; + + spa_list_for_each(rtp_target, &(impl->rtp_targets), link) { + if (rtp_target->socket_fd >= 0) { + if (result) + *result = 1; + close(rtp_target->socket_fd); + rtp_target->socket_fd = -1; + } } } @@ -385,15 +732,7 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp spa_pod_get_int(pod, &value_int) < 0) continue; pw_log_info("key '%s', value '%s'/%u", key, value_str, value_int); - if (spa_streq(key, "destination.ip")) { - if (!value_str || pw_net_parse_address(value_str, impl->dst_port, &impl->dst_addr, - &impl->dst_len) < 0) { - pw_log_error("invalid destination.ip: '%s'", value_str); - break; - } - pw_properties_set(impl->stream_props, "rtp.destination.ip", value_str); - items[n_items++] = SPA_DICT_ITEM_INIT("rtp.destination.ip", value_str); - } else if (spa_streq(key, "sess.name")) { + if (spa_streq(key, "sess.name")) { if (!value_str) { pw_log_error("invalid sess.name"); break; @@ -434,6 +773,193 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod * } } +static void on_add_receiver(struct impl *impl, struct spa_json *command_json_iter) +{ + int res; + char key[256]; + const char *value; + struct rtp_target rtp_target; + int len; + char ifname[64]; + char destination_ip[64]; + int destination_port = 0; + char source_ip[64]; + int ttl = DEFAULT_TTL; + int dscp = DEFAULT_DSCP; + bool mcast_loop = DEFAULT_LOOP; + + ifname[0] = '\0'; + destination_ip[0] = '\0'; + source_ip[0] = '\0'; + + while ((len = spa_json_object_next(command_json_iter, key, sizeof(key), &value)) > 0) { + if (spa_streq(key, "local.ifname")) { + spa_json_parse_stringn(value, len, ifname, sizeof(ifname)); + } else if (spa_streq(key, "destination.ip")) { + spa_json_parse_stringn(value, len, destination_ip, sizeof(destination_ip)); + } else if (spa_streq(key, "destination.port")) { + spa_json_parse_int(value, len, &destination_port); + } else if (spa_streq(key, "source.ip")) { + spa_json_parse_stringn(value, len, source_ip, sizeof(source_ip)); + } else if (spa_streq(key, "net.ttl")) { + spa_json_parse_int(value, len, &ttl); + } else if (spa_streq(key, "net.dscp")) { + spa_json_parse_int(value, len, &dscp); + } else if (spa_streq(key, "net.loop")) { + spa_json_parse_bool(value, len, &mcast_loop); + } + } + + if (destination_ip[0] == '\0') { + pw_log_error("Cannot add receiver without a destination.ip value"); + return; + } + + if ((res = setup_rtp_target(&rtp_target, (ifname[0] != '\0') ? ifname : NULL, + destination_ip, destination_port, (source_ip[0] != '\0') ? source_ip : NULL, + ttl, dscp, mcast_loop)) != 0) + return; + + if (find_rtp_target_by_sockaddr(impl, &(rtp_target.dest_addr), true) != NULL) { + LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "Not adding RTP target because it is already added", &rtp_target); + goto duplicate_add; + } + + /* Only create the socket when the stream is connected. Sockets are + * supposed to be disconnected otherwise. If it is disconnected, adding + * the RTP target to the list is enough - stream_open_connection() will + * connect the target's socket then. */ + if (impl->stream_connected) { + if ((res = make_socket(&rtp_target.src_addr, rtp_target.src_addrlen, + &rtp_target.dest_addr, rtp_target.dest_addrlen, + rtp_target.mcast_loop, rtp_target.ttl, + rtp_target.dscp, rtp_target.ifname)) < 0) { + pw_log_error("Couldn't make socket for new receiver: %s", spa_strerror(res)); + goto error; + } + pw_log_debug("Created socket for new receiver, socket FD: %d", res); + + rtp_target.socket_fd = res; + } + + if (add_rtp_target(impl, &rtp_target, true) != 0) + goto error; + +finish: + return; + +duplicate_add: + teardown_rtp_target(&rtp_target); + goto finish; + +error: + teardown_rtp_target(&rtp_target); + goto finish; +} + +static void on_remove_receiver(struct impl *impl, struct spa_json *command_json_iter) +{ + char key[256]; + const char *value; + int len; + char destination_ip[64]; + int destination_port = 0; + + destination_ip[0] = '\0'; + + while ((len = spa_json_object_next(command_json_iter, key, sizeof(key), &value)) > 0) { + if (spa_streq(key, "destination.ip")) { + spa_json_parse_stringn(value, len, destination_ip, sizeof(destination_ip)); + } else if (spa_streq(key, "destination.port")) { + spa_json_parse_int(value, len, &destination_port); + } + } + + if (destination_ip[0] == '\0') { + pw_log_error("Cannot remove receiver without a destination.ip value"); + return; + } + + remove_rtp_target_by_ip_and_port(impl, destination_ip, destination_port, true); +} + +static void on_clear_receivers(struct impl *impl) +{ + struct rtp_target *rtp_target; + + pw_log_info("Clearing all receivers"); + + spa_list_consume(rtp_target, &(impl->rtp_targets), link) + remove_rtp_target(impl, rtp_target, true, NULL); +} + +static void parse_rtp_command(struct impl *impl, const char *command_json_str) +{ + int res; + struct spa_json iter; + char rtp_command_id[64]; + + if ((res = spa_json_str_object_find(command_json_str, strlen(command_json_str), + "command.id", rtp_command_id, sizeof(rtp_command_id))) <= 0) { + if (res == -ENOENT) { + pw_log_error("Command JSON string \"%s\" has no command.id field", + command_json_str); + } else { + pw_log_error("Error while parsing JSON string \"%s\": %s", command_json_str, + spa_strerror(res)); + } + + return; + } + + if ((res = spa_json_begin_object(&iter, command_json_str, strlen(command_json_str))) <= 0) { + pw_log_error("Error while parsing JSON string \"%s\": %s", command_json_str, + spa_strerror(res)); + return; + } + + if (spa_streq(rtp_command_id, "add-receiver")) { + on_add_receiver(impl, &iter); + } else if (spa_streq(rtp_command_id, "remove-receiver")) { + on_remove_receiver(impl, &iter); + } else if (spa_streq(rtp_command_id, "clear-receivers")) { + on_clear_receivers(impl); + } else { + pw_log_error("Command JSON string \"%s\" has unrecognized command ID \"%s\"", + command_json_str, rtp_command_id); + } +} + +static void stream_command(void *data, const struct spa_command *command) +{ + struct impl *impl = data; + + if (SPA_UNLIKELY(SPA_COMMAND_TYPE(command) != SPA_TYPE_COMMAND_Node)) + return; + + switch (SPA_NODE_COMMAND_ID(command)) { + case SPA_NODE_COMMAND_User: { + const struct spa_pod_object *pod_object = (const struct spa_pod_object *)command; + struct spa_pod_prop *prop; + + SPA_POD_OBJECT_FOREACH(pod_object, prop) { + if (prop->key != SPA_COMMAND_NODE_extra) + continue; + + if (prop->value.type != SPA_TYPE_String) + continue; + + const char *json_str = SPA_POD_CONTENTS(struct spa_pod, &prop->value); + parse_rtp_command(impl, json_str); + } + + break; + } + default: + break; + } +} + static const struct rtp_stream_events stream_events = { RTP_VERSION_STREAM_EVENTS, .destroy = stream_destroy, @@ -442,6 +968,7 @@ static const struct rtp_stream_events stream_events = { .close_connection = stream_close_connection, .param_changed = stream_param_changed, .send_packet = stream_send_packet, + .command = stream_command, }; static void core_destroy(void *d) @@ -458,21 +985,20 @@ static const struct pw_proxy_events core_proxy_events = { static void impl_destroy(struct impl *impl) { + struct rtp_target *rtp_target; + if (impl->stream) rtp_stream_destroy(impl->stream); if (impl->core && impl->do_disconnect_core) pw_core_disconnect(impl->core); - if (impl->rtp_fd != -1) { - pw_log_info("closing socket with FD %d as part of shutdown", impl->rtp_fd); - close(impl->rtp_fd); - } + spa_list_consume(rtp_target, &(impl->rtp_targets), link) + remove_rtp_target(impl, rtp_target, false, "Removing RTP target as part of shutdown"); pw_properties_free(impl->stream_props); pw_properties_free(impl->props); - free(impl->ifname); free(impl); } @@ -524,6 +1050,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) int64_t ts_offset; int res = 0; uint32_t header_size; + struct rtp_target initial_target; + bool initial_target_valid = false; PW_LOG_TOPIC_INIT(mod_topic); @@ -531,8 +1059,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) if (impl == NULL) return -errno; - impl->rtp_fd = -1; - if (args == NULL) args = ""; @@ -552,6 +1078,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) } impl->stream_props = stream_props; + spa_list_init(&impl->rtp_targets); + impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC; impl->rate_limit.burst = 1; @@ -598,45 +1126,43 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) copy_props(impl, props, "sess.ts-refclk"); copy_props(impl, props, "aes67.driver-group"); - str = pw_properties_get(props, "local.ifname"); - impl->ifname = str ? strdup(str) : NULL; - - impl->dst_port = DEFAULT_PORT + ((uint32_t) (pw_rand32() % 512) << 1); - impl->dst_port = pw_properties_get_uint32(props, "destination.port", impl->dst_port); - if ((str = pw_properties_get(props, "destination.ip")) == NULL) - str = DEFAULT_DESTINATION_IP; - if ((res = pw_net_parse_address(str, impl->dst_port, &impl->dst_addr, &impl->dst_len)) < 0) { - pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); + if ((res = setup_rtp_target_from_props(&initial_target, props, &initial_target_valid)) < 0) { + pw_log_error("could not setup initial destination: %s", spa_strerror(res)); goto out; } - if ((str = pw_properties_get(props, "source.ip")) == NULL) - str = impl->dst_addr.ss_family == AF_INET ? - DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6; - if ((res = pw_net_parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) { - pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res)); - goto out; - } - - impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL); - impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP); - impl->dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP); ts_offset = pw_properties_get_int64(props, "sess.ts-offset", DEFAULT_TS_OFFSET); if (ts_offset == -1) ts_offset = pw_rand32(); pw_properties_setf(stream_props, "rtp.sender-ts-offset", "%u", (uint32_t)ts_offset); - header_size = impl->dst_addr.ss_family == AF_INET ? - IP4_HEADER_SIZE : IP6_HEADER_SIZE; + /* Assume IPv6 header size. This is necessary, since it + * is possible to add a mixture of IPv4 and IPv6 targets + * to the RTP sink. Having different headers sizes for + * both IPv4 and IPv6 would require producing packets + * separately for both protocols, which adds significant + * complexity and runtime overhead. Instead, accept the + * small waste (20 bytes) by always assuming an IPv6 + * header size, even when IPv4 is used. */ + header_size = IP6_HEADER_SIZE; header_size += UDP_HEADER_SIZE; pw_properties_setf(stream_props, "net.header", "%u", header_size); - pw_net_get_ip(&impl->src_addr, addr, sizeof(addr), NULL, NULL); - pw_properties_set(stream_props, "rtp.source.ip", addr); - pw_net_get_ip(&impl->dst_addr, addr, sizeof(addr), NULL, NULL); - pw_properties_set(stream_props, "rtp.destination.ip", addr); - pw_properties_setf(stream_props, "rtp.destination.port", "%u", impl->dst_port); - pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl); - pw_properties_setf(stream_props, "rtp.dscp", "%u", impl->dscp); + + if (initial_target_valid) { + res = add_rtp_target(impl, &initial_target, false); + if (res != 0) { + teardown_rtp_target(&initial_target); + goto out; + } + + pw_net_get_ip(&(initial_target.src_addr), addr, sizeof(addr), NULL, NULL); + pw_properties_set(stream_props, "rtp.source.ip", addr); + pw_net_get_ip(&(initial_target.dest_addr), addr, sizeof(addr), NULL, NULL); + pw_properties_set(stream_props, "rtp.destination.ip", addr); + pw_properties_setf(stream_props, "rtp.destination.port", "%u", initial_target.dest_port); + pw_properties_setf(stream_props, "rtp.ttl", "%u", initial_target.ttl); + pw_properties_setf(stream_props, "rtp.dscp", "%u", initial_target.dscp); + } impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); if (impl->core == NULL) { diff --git a/src/modules/module-rtp/stream.c b/src/modules/module-rtp/stream.c index 0f7d18b38..c61950978 100644 --- a/src/modules/module-rtp/stream.c +++ b/src/modules/module-rtp/stream.c @@ -45,6 +45,7 @@ PW_LOG_TOPIC_EXTERN(mod_topic); #define rtp_stream_emit_open_connection(s,r) rtp_stream_emit(s, open_connection, 0,r) #define rtp_stream_emit_close_connection(s,r) rtp_stream_emit(s, close_connection, 0,r) #define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p) +#define rtp_stream_emit_command(s,cmd) rtp_stream_emit(s, command,0,cmd) #define rtp_stream_call(s,m,v,...) spa_callbacks_call_fast(&s->rtp_callbacks, \ struct rtp_stream_events, m, v, ##__VA_ARGS__) @@ -584,11 +585,18 @@ static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod } }; +static void on_stream_command(void *d, const struct spa_command *command) +{ + struct impl *impl = d; + rtp_stream_emit_command(impl, command); +} + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .destroy = stream_destroy, .state_changed = on_stream_state_changed, .param_changed = on_stream_param_changed, + .command = on_stream_command, .io_changed = stream_io_changed, }; @@ -1225,3 +1233,10 @@ void rtp_stream_update_process_latency(struct rtp_stream *s, update_latency_params(impl); } + +int rtp_stream_run_in_data_loop(struct rtp_stream *s, spa_invoke_func_t func, + uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct impl *impl = (struct impl*)s; + return pw_loop_locked(impl->data_loop, func, seq, data, size, user_data); +} diff --git a/src/modules/module-rtp/stream.h b/src/modules/module-rtp/stream.h index f7f061fc9..37d041b06 100644 --- a/src/modules/module-rtp/stream.h +++ b/src/modules/module-rtp/stream.h @@ -50,6 +50,8 @@ struct rtp_stream_events { void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param); + void (*command) (void *data, const struct spa_command *command); + void (*send_packet) (void *data, struct iovec *iov, size_t iovlen); void (*send_feedback) (void *data, uint32_t seqnum); @@ -91,6 +93,9 @@ int rtp_stream_update_params(struct rtp_stream *stream, void rtp_stream_update_process_latency(struct rtp_stream *stream, const struct spa_process_latency_info *process_latency); +int rtp_stream_run_in_data_loop(struct rtp_stream *s, spa_invoke_func_t func, + uint32_t seq, const void *data, size_t size, void *user_data); + #ifdef __cplusplus } #endif