Merge branch 'rtp-sink-multiple-receivers' into 'master'

module-rtp-sink: Add ability to add / remove receivers through commands

See merge request pipewire/pipewire!2863
This commit is contained in:
Carlos Rafael Giani 2026-06-23 10:48:32 +00:00
commit c2eff23bad
3 changed files with 626 additions and 80 deletions

View file

@ -15,7 +15,10 @@
#include <net/if.h>
#include <ctype.h>
#include <spa/utils/atomic.h>
#include <spa/utils/defs.h>
#include <spa/utils/hook.h>
#include <spa/utils/list.h>
#include <spa/utils/result.h>
#include <spa/utils/ringbuffer.h>
#include <spa/utils/json.h>
@ -121,6 +124,51 @@
*]
*\endcode
*
* ## Adding and removing receivers through commands
*
* The following commands can be sent to the RTP sink node via `pw_node_send_command()`:
*
* * `add-receiver` : Adds a receiver to the sink's list. If the given
* IP address <-> port combination was already added, the command is
* logged, but otherwise ignored. Arguments:
* - `destination.ip` : IP address to send data to. Can be a uni- or
* multicast address, but must be a valid address.
* - `destination.port` : Port to send data to. Must be valid.
* - `local.ifname`, `source.ip`, `net.ttl`, `net.dscp`, `net.loop` :
* These are all optional, and work just like in the RTP sink
* module's properties.
*
* * `remove-receiver` : Removes a receiver from the sink's list. The
* receiver is identified by the given IP address. A port can optionally
* be specified as well. If it isn't, then the first receiver with that IP
* address is removed. If no matching receiver is in the sink's list,
* this command does nothing. Arguments:
* - `destination.ip` : IP address to send data to. Can be a uni- or
* multicast address, but must be a valid address.
* - `destination.port` : Port to send data to. This is optional. But, if
* it is set, it must be a valid port number.
*
* * `clear-receivers` : Removes all receivers from the sink's list. If the
* list is empty, this does nothing. This command has no arguments.
*
* If the RTP sink module is created with the `destination.ip` and
* `destination.port` properties set, it behaves as if `add-receiver` were
* called right after the module was initialized. This means that if none
* of these commands are used, the module behaves just as it did prior to
* this patch. Note that the `remove-receivers` command can remove this
* initial receiver as well.
*
* If no receivers are added, the module continues to work normally.
* Adding and removing receivers mid-operation is supported.
*
* Example pw-cli calls (56 is the ID of the RTP sink node):
*
* \code{.unparsed}
* pw-cli c 56 User '{ extra="{ \"command.id\" : \"add-receiver\" , \"destination.ip\" : \"10.42.0.1\", \"destination.port\" : 55001 }" }'
* pw-cli c 56 User '{ extra="{ \"command.id\" : \"remove-receiver\", \"destination.ip\" : \"10.42.0.1\" }" }'
* pw-cli c 56 User '{ extra="{ \"command.id\" : \"clear-receivers\" }" }'
* \endcode
*
* \since 0.3.60
*/
@ -166,6 +214,36 @@ static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
enum rtp_target_type {
RTP_TARGET_TYPE_UNICAST,
RTP_TARGET_TYPE_MULTICAST
};
struct rtp_target {
struct spa_list link;
/* Common multicast and unicast fields */
enum rtp_target_type type;
uint16_t dest_port;
struct sockaddr_storage dest_addr;
socklen_t dest_addrlen;
int socket_fd;
uint32_t ttl;
uint32_t dscp;
/* Multicast specific fields */
char *ifname;
bool mcast_loop;
struct sockaddr_storage src_addr;
socklen_t src_addrlen;
};
struct impl {
struct pw_context *context;
@ -186,19 +264,15 @@ struct impl {
unsigned int do_disconnect_core:1;
char *ifname;
uint32_t ttl;
bool mcast_loop;
uint32_t dscp;
struct spa_list rtp_targets;
size_t num_rtp_targets;
struct sockaddr_storage src_addr;
socklen_t src_len;
uint16_t dst_port;
struct sockaddr_storage dst_addr;
socklen_t dst_len;
int rtp_fd;
/* This flag is needed to know whether on_add_receiver() shall connect
* the socket immediately, or keep the target disconnected. The latter
* case is done when the PW node is not running; then, once it does start
* running, the stream.c stream_start() call will in turn call
* stream_open_connection(), which will connect the socket. */
bool stream_connected;
};
static int make_socket(struct sockaddr_storage *src, socklen_t src_len,
@ -274,6 +348,255 @@ static void stream_destroy(void *d)
impl->stream = NULL;
}
static void teardown_rtp_target(struct rtp_target *target);
static int setup_rtp_target(struct rtp_target *target,
const char *ifname, const char *destination_ip, uint16_t destination_port,
const char *source_ip, uint32_t ttl, uint32_t dscp, bool mcast_loop)
{
int res = 0;
memset(target, 0, sizeof(struct rtp_target));
target->socket_fd = -1;
if (destination_port == 0)
target->dest_port = (DEFAULT_PORT + ((uint32_t) (pw_rand32() % 512) << 1));
else
target->dest_port = destination_port;
if ((res = pw_net_parse_address(destination_ip, target->dest_port,
&target->dest_addr, &target->dest_addrlen)) < 0) {
pw_log_error("invalid destination IP \"%s\": %s", destination_ip, spa_strerror(res));
goto error;
}
if (source_ip == NULL)
source_ip = (target->dest_addr.ss_family == AF_INET) ?
DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6;
if ((res = pw_net_parse_address(source_ip, 0, &target->src_addr,
&target->src_addrlen)) < 0) {
pw_log_error("invalid source IP \"%s\": %s", source_ip, spa_strerror(res));
goto error;
}
target->ttl = ttl;
target->dscp = dscp;
target->mcast_loop = mcast_loop;
target->ifname = ifname ? strdup(ifname) : NULL;
target->type = pw_net_is_multicast(&(target->dest_addr))
? RTP_TARGET_TYPE_MULTICAST
: RTP_TARGET_TYPE_UNICAST;
out:
return res;
error:
teardown_rtp_target(target);
goto out;
}
static int setup_rtp_target_from_props(struct rtp_target *target, struct pw_properties *props, bool *target_was_setup)
{
const char *destination_ip;
uint16_t destination_port;
const char *ifname;
const char *source_ip;
uint32_t ttl;
uint32_t dscp;
bool mcast_loop;
int res;
*target_was_setup = false;
destination_ip = pw_properties_get(props, "destination.ip");
if (destination_ip == NULL)
return 0;
destination_port = pw_properties_get_uint32(props, "destination.port", 0);
ifname = pw_properties_get(props, "local.ifname");
source_ip = pw_properties_get(props, "source.ip");
ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP);
mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
res = setup_rtp_target(target, ifname, destination_ip, destination_port, source_ip, ttl, dscp, mcast_loop);
if (res == 0)
*target_was_setup = true;
return res;
}
static void teardown_rtp_target(struct rtp_target *target)
{
if (target->socket_fd >= 0) {
close(target->socket_fd);
target->socket_fd = -1;
}
free(target->ifname);
target->ifname = NULL;
}
#define LOG_RTP_TARGET(LOGLEVEL, PREFIX, TARGET) \
do { \
if (SPA_UNLIKELY(pw_log_topic_enabled((LOGLEVEL), PW_LOG_TOPIC_DEFAULT))) { \
char src_addr_str_buf[256]; \
char dest_addr_str_buf[256]; \
const char *type_str; \
const char *src_addr_str; \
const char *dest_addr_str; \
const char *iface_name; \
uint16_t dest_port = 0; \
\
type_str = ((TARGET)->type == RTP_TARGET_TYPE_UNICAST) ? "unicast" : "multicast"; \
src_addr_str = dest_addr_str = "<could not get address>"; \
iface_name = ((TARGET)->ifname != NULL) ? (TARGET)->ifname : "<none>"; \
\
if (pw_net_get_ip(&((TARGET)->src_addr), src_addr_str_buf, sizeof(src_addr_str_buf), NULL, NULL) == 0) \
src_addr_str = src_addr_str_buf; \
if (pw_net_get_ip(&((TARGET)->dest_addr), dest_addr_str_buf, sizeof(dest_addr_str_buf), NULL, &dest_port) == 0) \
dest_addr_str = dest_addr_str_buf; \
\
if ((TARGET)->socket_fd < 0) { \
pw_log_logt((LOGLEVEL), PW_LOG_TOPIC_DEFAULT, __FILE__, __LINE__, __func__, \
"%s: type: %s; dest addr: %s; dest port: %" PRIu16 "; " \
"src addr: %s; TTL: %" PRIu32 "; DSCP: %" PRIu32 "; " \
"iface name: %s; mcast loop: %d; (socket not opened yet)", \
(PREFIX), type_str, dest_addr_str, dest_port, src_addr_str, \
(TARGET)->ttl, (TARGET)->dscp, iface_name, (TARGET)->mcast_loop); \
} else { \
pw_log_logt((LOGLEVEL), PW_LOG_TOPIC_DEFAULT, __FILE__, __LINE__, __func__, \
"%s type: %s; dest addr: %s; dest port: %" PRIu16 "; " \
"src addr: %s; TTL: %" PRIu32 "; DSCP: %" PRIu32 "; " \
"iface name: %s; mcast loop: %d; socket FD: %d", (PREFIX), \
type_str, dest_addr_str, dest_port, src_addr_str, (TARGET)->ttl, \
(TARGET)->dscp, iface_name, (TARGET)->mcast_loop, (TARGET)->socket_fd); \
} \
} \
} while (0)
static struct rtp_target *find_rtp_target_by_sockaddr(struct impl *impl, struct sockaddr_storage *address, bool compare_ports)
{
struct rtp_target *rtp_target;
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
if (pw_net_are_addresses_equal(&(rtp_target->dest_addr), address, compare_ports)) {
return rtp_target;
}
}
return NULL;
}
static struct rtp_target *find_rtp_target(struct impl *impl, const char *destination_ip, uint16_t destination_port)
{
int res;
struct sockaddr_storage dest_addr;
socklen_t dest_len;
if ((res = pw_net_parse_address(destination_ip, destination_port,
&dest_addr, &dest_len)) < 0) {
pw_log_error("invalid destination IP \"%s\": %s", destination_ip, spa_strerror(res));
return NULL;
}
return find_rtp_target_by_sockaddr(impl, &dest_addr, destination_port != 0);
}
static int append_to_rtp_targets(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
/* IMPORTANT: This must be run from within the data loop, since the rtp_targets
* list is modified here, and the stream_send_packet() function (which runs in
* the data loop thread), iterates over this same list. */
struct impl *impl = user_data;
struct rtp_target *rtp_target_to_add = (struct rtp_target *)data;
spa_list_append(&(impl->rtp_targets), &(rtp_target_to_add->link));
impl->num_rtp_targets++;
return 0;
}
static int add_rtp_target(struct impl *impl, struct rtp_target *rtp_target, bool append_in_data_loop)
{
struct rtp_target *rtp_target_copy;
LOG_RTP_TARGET(SPA_LOG_LEVEL_INFO, "Adding RTP target", rtp_target);
/* Allocate and fill the target here, outside of the data loop
* thread, to not unnecessarily block it. */
rtp_target_copy = malloc(sizeof(struct rtp_target));
if (SPA_UNLIKELY(rtp_target_copy == NULL)) {
pw_log_error("could not allocate memory for new target");
return -ENOMEM;
}
memcpy(rtp_target_copy, rtp_target, sizeof(struct rtp_target));
/* Ensure that nothing is stored in the spa_list link; it will
* be filled by spa_list_append(). */
spa_zero(rtp_target_copy->link);
if (append_in_data_loop)
rtp_stream_run_in_data_loop(impl->stream, append_to_rtp_targets, 1, rtp_target_copy, 0, impl);
else
append_to_rtp_targets(NULL, false, 0, rtp_target_copy, 0, impl);
return 0;
}
static int remove_from_rtp_targets(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
/* IMPORTANT: This must be run from within the data loop, since the rtp_targets
* list is modified here, and the stream_send_packet() function (which runs in
* the data loop thread), iterates over this same list. */
struct impl *impl = user_data;
struct rtp_target *rtp_target_to_remove = (struct rtp_target *)data;
spa_list_remove(&(rtp_target_to_remove->link));
impl->num_rtp_targets--;
return 0;
}
static void remove_rtp_target(struct impl *impl, struct rtp_target *rtp_target_to_remove,
bool remove_in_data_loop, const char *custom_prefix)
{
if (custom_prefix == NULL)
custom_prefix = "Removing RTP target";
LOG_RTP_TARGET(SPA_LOG_LEVEL_INFO, custom_prefix, rtp_target_to_remove);
if (remove_in_data_loop)
rtp_stream_run_in_data_loop(impl->stream, remove_from_rtp_targets, 1, rtp_target_to_remove, 0, impl);
else
remove_from_rtp_targets(NULL, false, 0, rtp_target_to_remove, 0, impl);
teardown_rtp_target(rtp_target_to_remove);
free(rtp_target_to_remove);
}
static void remove_rtp_target_by_ip_and_port(struct impl *impl, const char *destination_ip,
uint16_t destination_port, bool remove_in_data_loop)
{
struct rtp_target *rtp_target_to_remove;
if (impl->num_rtp_targets == 0)
return;
rtp_target_to_remove = find_rtp_target(impl, destination_ip, destination_port);
if (rtp_target_to_remove == NULL)
return;
remove_rtp_target(impl, rtp_target_to_remove, remove_in_data_loop, NULL);
}
static inline uint64_t get_time_ns(void)
{
struct timespec ts;
@ -286,6 +609,7 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
struct impl *impl = data;
struct msghdr msg;
ssize_t n;
struct rtp_target *rtp_target;
spa_zero(msg);
msg.msg_iov = iov;
@ -294,11 +618,18 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
msg.msg_controllen = 0;
msg.msg_flags = 0;
n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL);
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
/* All targets are required to have open sockets
* by the time sending takes place. */
spa_assert(rtp_target->socket_fd >= 0);
n = sendmsg(rtp_target->socket_fd, &msg, MSG_NOSIGNAL);
if (n < 0) {
int suppressed;
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) {
pw_log_warn("(%d suppressed) sendmsg() failed: %m", suppressed);
LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "RTP target", rtp_target);
}
}
}
}
@ -311,40 +642,56 @@ static void stream_report_error(void *data, const char *error)
}
}
static void stream_close_connection(void *data, int *result);
static void stream_open_connection(void *data, int *result)
{
int res;
struct impl *impl = data;
struct rtp_target *rtp_target;
if ((res = make_socket(&impl->src_addr, impl->src_len,
&impl->dst_addr, impl->dst_len,
impl->mcast_loop, impl->ttl, impl->dscp,
impl->ifname)) < 0) {
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
if ((res = make_socket(&rtp_target->src_addr, rtp_target->src_addrlen,
&rtp_target->dest_addr, rtp_target->dest_addrlen,
rtp_target->mcast_loop, rtp_target->ttl,
rtp_target->dscp, rtp_target->ifname)) < 0) {
pw_log_error("can't make socket: %s", spa_strerror(res));
LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "RTP target", rtp_target);
rtp_stream_set_error(impl->stream, res, "Can't make socket");
stream_close_connection(data, NULL);
if (result)
*result = res;
return;
}
rtp_target->socket_fd = res;
}
if (result)
*result = 1;
impl->rtp_fd = res;
/* Now, after all sockets are opened, mark the stream as connected. */
impl->stream_connected = true;
}
static void stream_close_connection(void *data, int *result)
{
struct impl *impl = data;
struct rtp_target *rtp_target;
/* Mark the stream as disconnected to let future on_add_receiver()
* calls know that they must not connect the socket on their own. */
impl->stream_connected = false;
if (impl->rtp_fd > 0) {
if (result)
*result = 1;
close(impl->rtp_fd);
impl->rtp_fd = -1;
} else {
if (result)
*result = 0;
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
if (rtp_target->socket_fd >= 0) {
if (result)
*result = 1;
close(rtp_target->socket_fd);
rtp_target->socket_fd = -1;
}
}
}
@ -385,15 +732,7 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp
spa_pod_get_int(pod, &value_int) < 0)
continue;
pw_log_info("key '%s', value '%s'/%u", key, value_str, value_int);
if (spa_streq(key, "destination.ip")) {
if (!value_str || pw_net_parse_address(value_str, impl->dst_port, &impl->dst_addr,
&impl->dst_len) < 0) {
pw_log_error("invalid destination.ip: '%s'", value_str);
break;
}
pw_properties_set(impl->stream_props, "rtp.destination.ip", value_str);
items[n_items++] = SPA_DICT_ITEM_INIT("rtp.destination.ip", value_str);
} else if (spa_streq(key, "sess.name")) {
if (spa_streq(key, "sess.name")) {
if (!value_str) {
pw_log_error("invalid sess.name");
break;
@ -434,6 +773,193 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
}
}
static void on_add_receiver(struct impl *impl, struct spa_json *command_json_iter)
{
int res;
char key[256];
const char *value;
struct rtp_target rtp_target;
int len;
char ifname[64];
char destination_ip[64];
int destination_port = 0;
char source_ip[64];
int ttl = DEFAULT_TTL;
int dscp = DEFAULT_DSCP;
bool mcast_loop = DEFAULT_LOOP;
ifname[0] = '\0';
destination_ip[0] = '\0';
source_ip[0] = '\0';
while ((len = spa_json_object_next(command_json_iter, key, sizeof(key), &value)) > 0) {
if (spa_streq(key, "local.ifname")) {
spa_json_parse_stringn(value, len, ifname, sizeof(ifname));
} else if (spa_streq(key, "destination.ip")) {
spa_json_parse_stringn(value, len, destination_ip, sizeof(destination_ip));
} else if (spa_streq(key, "destination.port")) {
spa_json_parse_int(value, len, &destination_port);
} else if (spa_streq(key, "source.ip")) {
spa_json_parse_stringn(value, len, source_ip, sizeof(source_ip));
} else if (spa_streq(key, "net.ttl")) {
spa_json_parse_int(value, len, &ttl);
} else if (spa_streq(key, "net.dscp")) {
spa_json_parse_int(value, len, &dscp);
} else if (spa_streq(key, "net.loop")) {
spa_json_parse_bool(value, len, &mcast_loop);
}
}
if (destination_ip[0] == '\0') {
pw_log_error("Cannot add receiver without a destination.ip value");
return;
}
if ((res = setup_rtp_target(&rtp_target, (ifname[0] != '\0') ? ifname : NULL,
destination_ip, destination_port, (source_ip[0] != '\0') ? source_ip : NULL,
ttl, dscp, mcast_loop)) != 0)
return;
if (find_rtp_target_by_sockaddr(impl, &(rtp_target.dest_addr), true) != NULL) {
LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "Not adding RTP target because it is already added", &rtp_target);
goto duplicate_add;
}
/* Only create the socket when the stream is connected. Sockets are
* supposed to be disconnected otherwise. If it is disconnected, adding
* the RTP target to the list is enough - stream_open_connection() will
* connect the target's socket then. */
if (impl->stream_connected) {
if ((res = make_socket(&rtp_target.src_addr, rtp_target.src_addrlen,
&rtp_target.dest_addr, rtp_target.dest_addrlen,
rtp_target.mcast_loop, rtp_target.ttl,
rtp_target.dscp, rtp_target.ifname)) < 0) {
pw_log_error("Couldn't make socket for new receiver: %s", spa_strerror(res));
goto error;
}
pw_log_debug("Created socket for new receiver, socket FD: %d", res);
rtp_target.socket_fd = res;
}
if (add_rtp_target(impl, &rtp_target, true) != 0)
goto error;
finish:
return;
duplicate_add:
teardown_rtp_target(&rtp_target);
goto finish;
error:
teardown_rtp_target(&rtp_target);
goto finish;
}
static void on_remove_receiver(struct impl *impl, struct spa_json *command_json_iter)
{
char key[256];
const char *value;
int len;
char destination_ip[64];
int destination_port = 0;
destination_ip[0] = '\0';
while ((len = spa_json_object_next(command_json_iter, key, sizeof(key), &value)) > 0) {
if (spa_streq(key, "destination.ip")) {
spa_json_parse_stringn(value, len, destination_ip, sizeof(destination_ip));
} else if (spa_streq(key, "destination.port")) {
spa_json_parse_int(value, len, &destination_port);
}
}
if (destination_ip[0] == '\0') {
pw_log_error("Cannot remove receiver without a destination.ip value");
return;
}
remove_rtp_target_by_ip_and_port(impl, destination_ip, destination_port, true);
}
static void on_clear_receivers(struct impl *impl)
{
struct rtp_target *rtp_target;
pw_log_info("Clearing all receivers");
spa_list_consume(rtp_target, &(impl->rtp_targets), link)
remove_rtp_target(impl, rtp_target, true, NULL);
}
static void parse_rtp_command(struct impl *impl, const char *command_json_str)
{
int res;
struct spa_json iter;
char rtp_command_id[64];
if ((res = spa_json_str_object_find(command_json_str, strlen(command_json_str),
"command.id", rtp_command_id, sizeof(rtp_command_id))) <= 0) {
if (res == -ENOENT) {
pw_log_error("Command JSON string \"%s\" has no command.id field",
command_json_str);
} else {
pw_log_error("Error while parsing JSON string \"%s\": %s", command_json_str,
spa_strerror(res));
}
return;
}
if ((res = spa_json_begin_object(&iter, command_json_str, strlen(command_json_str))) <= 0) {
pw_log_error("Error while parsing JSON string \"%s\": %s", command_json_str,
spa_strerror(res));
return;
}
if (spa_streq(rtp_command_id, "add-receiver")) {
on_add_receiver(impl, &iter);
} else if (spa_streq(rtp_command_id, "remove-receiver")) {
on_remove_receiver(impl, &iter);
} else if (spa_streq(rtp_command_id, "clear-receivers")) {
on_clear_receivers(impl);
} else {
pw_log_error("Command JSON string \"%s\" has unrecognized command ID \"%s\"",
command_json_str, rtp_command_id);
}
}
static void stream_command(void *data, const struct spa_command *command)
{
struct impl *impl = data;
if (SPA_UNLIKELY(SPA_COMMAND_TYPE(command) != SPA_TYPE_COMMAND_Node))
return;
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_User: {
const struct spa_pod_object *pod_object = (const struct spa_pod_object *)command;
struct spa_pod_prop *prop;
SPA_POD_OBJECT_FOREACH(pod_object, prop) {
if (prop->key != SPA_COMMAND_NODE_extra)
continue;
if (prop->value.type != SPA_TYPE_String)
continue;
const char *json_str = SPA_POD_CONTENTS(struct spa_pod, &prop->value);
parse_rtp_command(impl, json_str);
}
break;
}
default:
break;
}
}
static const struct rtp_stream_events stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
@ -442,6 +968,7 @@ static const struct rtp_stream_events stream_events = {
.close_connection = stream_close_connection,
.param_changed = stream_param_changed,
.send_packet = stream_send_packet,
.command = stream_command,
};
static void core_destroy(void *d)
@ -458,21 +985,20 @@ static const struct pw_proxy_events core_proxy_events = {
static void impl_destroy(struct impl *impl)
{
struct rtp_target *rtp_target;
if (impl->stream)
rtp_stream_destroy(impl->stream);
if (impl->core && impl->do_disconnect_core)
pw_core_disconnect(impl->core);
if (impl->rtp_fd != -1) {
pw_log_info("closing socket with FD %d as part of shutdown", impl->rtp_fd);
close(impl->rtp_fd);
}
spa_list_consume(rtp_target, &(impl->rtp_targets), link)
remove_rtp_target(impl, rtp_target, false, "Removing RTP target as part of shutdown");
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
free(impl->ifname);
free(impl);
}
@ -524,6 +1050,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
int64_t ts_offset;
int res = 0;
uint32_t header_size;
struct rtp_target initial_target;
bool initial_target_valid = false;
PW_LOG_TOPIC_INIT(mod_topic);
@ -531,8 +1059,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (impl == NULL)
return -errno;
impl->rtp_fd = -1;
if (args == NULL)
args = "";
@ -552,6 +1078,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
}
impl->stream_props = stream_props;
spa_list_init(&impl->rtp_targets);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
@ -598,45 +1126,43 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
copy_props(impl, props, "sess.ts-refclk");
copy_props(impl, props, "aes67.driver-group");
str = pw_properties_get(props, "local.ifname");
impl->ifname = str ? strdup(str) : NULL;
impl->dst_port = DEFAULT_PORT + ((uint32_t) (pw_rand32() % 512) << 1);
impl->dst_port = pw_properties_get_uint32(props, "destination.port", impl->dst_port);
if ((str = pw_properties_get(props, "destination.ip")) == NULL)
str = DEFAULT_DESTINATION_IP;
if ((res = pw_net_parse_address(str, impl->dst_port, &impl->dst_addr, &impl->dst_len)) < 0) {
pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res));
if ((res = setup_rtp_target_from_props(&initial_target, props, &initial_target_valid)) < 0) {
pw_log_error("could not setup initial destination: %s", spa_strerror(res));
goto out;
}
if ((str = pw_properties_get(props, "source.ip")) == NULL)
str = impl->dst_addr.ss_family == AF_INET ?
DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6;
if ((res = pw_net_parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) {
pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res));
goto out;
}
impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
impl->dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP);
ts_offset = pw_properties_get_int64(props, "sess.ts-offset", DEFAULT_TS_OFFSET);
if (ts_offset == -1)
ts_offset = pw_rand32();
pw_properties_setf(stream_props, "rtp.sender-ts-offset", "%u", (uint32_t)ts_offset);
header_size = impl->dst_addr.ss_family == AF_INET ?
IP4_HEADER_SIZE : IP6_HEADER_SIZE;
/* Assume IPv6 header size. This is necessary, since it
* is possible to add a mixture of IPv4 and IPv6 targets
* to the RTP sink. Having different headers sizes for
* both IPv4 and IPv6 would require producing packets
* separately for both protocols, which adds significant
* complexity and runtime overhead. Instead, accept the
* small waste (20 bytes) by always assuming an IPv6
* header size, even when IPv4 is used. */
header_size = IP6_HEADER_SIZE;
header_size += UDP_HEADER_SIZE;
pw_properties_setf(stream_props, "net.header", "%u", header_size);
pw_net_get_ip(&impl->src_addr, addr, sizeof(addr), NULL, NULL);
if (initial_target_valid) {
res = add_rtp_target(impl, &initial_target, false);
if (res != 0) {
teardown_rtp_target(&initial_target);
goto out;
}
pw_net_get_ip(&(initial_target.src_addr), addr, sizeof(addr), NULL, NULL);
pw_properties_set(stream_props, "rtp.source.ip", addr);
pw_net_get_ip(&impl->dst_addr, addr, sizeof(addr), NULL, NULL);
pw_net_get_ip(&(initial_target.dest_addr), addr, sizeof(addr), NULL, NULL);
pw_properties_set(stream_props, "rtp.destination.ip", addr);
pw_properties_setf(stream_props, "rtp.destination.port", "%u", impl->dst_port);
pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl);
pw_properties_setf(stream_props, "rtp.dscp", "%u", impl->dscp);
pw_properties_setf(stream_props, "rtp.destination.port", "%u", initial_target.dest_port);
pw_properties_setf(stream_props, "rtp.ttl", "%u", initial_target.ttl);
pw_properties_setf(stream_props, "rtp.dscp", "%u", initial_target.dscp);
}
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {

View file

@ -45,6 +45,7 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
#define rtp_stream_emit_open_connection(s,r) rtp_stream_emit(s, open_connection, 0,r)
#define rtp_stream_emit_close_connection(s,r) rtp_stream_emit(s, close_connection, 0,r)
#define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p)
#define rtp_stream_emit_command(s,cmd) rtp_stream_emit(s, command,0,cmd)
#define rtp_stream_call(s,m,v,...) spa_callbacks_call_fast(&s->rtp_callbacks, \
struct rtp_stream_events, m, v, ##__VA_ARGS__)
@ -584,11 +585,18 @@ static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod
}
};
static void on_stream_command(void *d, const struct spa_command *command)
{
struct impl *impl = d;
rtp_stream_emit_command(impl, command);
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = on_stream_state_changed,
.param_changed = on_stream_param_changed,
.command = on_stream_command,
.io_changed = stream_io_changed,
};
@ -1225,3 +1233,10 @@ void rtp_stream_update_process_latency(struct rtp_stream *s,
update_latency_params(impl);
}
int rtp_stream_run_in_data_loop(struct rtp_stream *s, spa_invoke_func_t func,
uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *impl = (struct impl*)s;
return pw_loop_locked(impl->data_loop, func, seq, data, size, user_data);
}

View file

@ -50,6 +50,8 @@ struct rtp_stream_events {
void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param);
void (*command) (void *data, const struct spa_command *command);
void (*send_packet) (void *data, struct iovec *iov, size_t iovlen);
void (*send_feedback) (void *data, uint32_t seqnum);
@ -91,6 +93,9 @@ int rtp_stream_update_params(struct rtp_stream *stream,
void rtp_stream_update_process_latency(struct rtp_stream *stream,
const struct spa_process_latency_info *process_latency);
int rtp_stream_run_in_data_loop(struct rtp_stream *s, spa_invoke_func_t func,
uint32_t seq, const void *data, size_t size, void *user_data);
#ifdef __cplusplus
}
#endif