module-rtp-sink: Add ability to add / remove receivers through commands

This makes it possible to dynamically add / remove receivers, which is
necesary for sending to multiple receivers. Mixed multi- and unicast
receivers are possible. Example pw-cli calls (56 is the ID of the RTP
sink node):

pw-cli c 56 User '{ extra="{ \"command.id\" : \"add-receiver\" , \"destination.ip\" : \"10.42.0.1\", \"destination.port\" : 55001 }" }'
pw-cli c 56 User '{ extra="{ \"command.id\" : \"remove-receiver\", \"destination.ip\" : \"10.42.0.1\" }" }'
pw-cli c 56 User '{ extra="{ \"command.id\" : \"clear-receivers\" }" }'

Commands and their arguments:

* "add-receiver" : Adds a receiver to the sink's list. If the given
  IP address <-> port combination was already added, the command is
  logged, but otherwise ignored. Arguments:
  - "destination.ip" : IP address to send data to. Can be a uni- or
    multicast address, but must be a valid address.
  - "destination.port" : Port to send data to. Must be valid.
  - "local.ifname", "source.ip", "net.ttl", "net.dscp", "net.loop" :
    These are all optional, and work just like in the RTP sink
    module's properties.

* "remove-receiver" : Removes a receiver from the sink's list. The
  receiver is identified by the given IP address. A port can optionally
  be specified as well. If it isn't, then the first receiver with that IP
  address is removed. If no matching receiver is in the sink's list,
  this command does nothing. Arguments:
  - "destination.ip" : IP address to send data to. Can be a uni- or
    multicast address, but must be a valid address.
  - "destination.port" : Port to send data to. This is optional. But, if
    it is set, it must be a valid port number.

* "clear-receivers" : Removes all receivers from the sink's list. If the
  list is empty, this does nothing. This command has no arguments.

If the RTP sink module is created with the "destination.ip" and
"destination.port" properties set, it behaves as if "add-receiver" were
called right after the module was initialized. This means that if none
of these commands are used, the module behaves just as it did prior to
this patch. Note that the "remove-receivers" command can remove this
initial receiver as well.

If no receivers are added, the module continues to work normally.
Adding and removing receivers mid-operation is supported.

NOTE: "destination.ip") handling in stream_props_changed() is removed,
since it never really did anything other than change the param value.
This commit is contained in:
Carlos Rafael Giani 2026-06-18 22:07:30 +02:00 committed by Wim Taymans
parent f2ccfe12c2
commit d8f5ed0c13
3 changed files with 626 additions and 80 deletions

View file

@ -15,7 +15,10 @@
#include <net/if.h>
#include <ctype.h>
#include <spa/utils/atomic.h>
#include <spa/utils/defs.h>
#include <spa/utils/hook.h>
#include <spa/utils/list.h>
#include <spa/utils/result.h>
#include <spa/utils/ringbuffer.h>
#include <spa/utils/json.h>
@ -121,6 +124,51 @@
*]
*\endcode
*
* ## Adding and removing receivers through commands
*
* The following commands can be sent to the RTP sink node via `pw_node_send_command()`:
*
* * `add-receiver` : Adds a receiver to the sink's list. If the given
* IP address <-> port combination was already added, the command is
* logged, but otherwise ignored. Arguments:
* - `destination.ip` : IP address to send data to. Can be a uni- or
* multicast address, but must be a valid address.
* - `destination.port` : Port to send data to. Must be valid.
* - `local.ifname`, `source.ip`, `net.ttl`, `net.dscp`, `net.loop` :
* These are all optional, and work just like in the RTP sink
* module's properties.
*
* * `remove-receiver` : Removes a receiver from the sink's list. The
* receiver is identified by the given IP address. A port can optionally
* be specified as well. If it isn't, then the first receiver with that IP
* address is removed. If no matching receiver is in the sink's list,
* this command does nothing. Arguments:
* - `destination.ip` : IP address to send data to. Can be a uni- or
* multicast address, but must be a valid address.
* - `destination.port` : Port to send data to. This is optional. But, if
* it is set, it must be a valid port number.
*
* * `clear-receivers` : Removes all receivers from the sink's list. If the
* list is empty, this does nothing. This command has no arguments.
*
* If the RTP sink module is created with the `destination.ip` and
* `destination.port` properties set, it behaves as if `add-receiver` were
* called right after the module was initialized. This means that if none
* of these commands are used, the module behaves just as it did prior to
* this patch. Note that the `remove-receivers` command can remove this
* initial receiver as well.
*
* If no receivers are added, the module continues to work normally.
* Adding and removing receivers mid-operation is supported.
*
* Example pw-cli calls (56 is the ID of the RTP sink node):
*
* \code{.unparsed}
* pw-cli c 56 User '{ extra="{ \"command.id\" : \"add-receiver\" , \"destination.ip\" : \"10.42.0.1\", \"destination.port\" : 55001 }" }'
* pw-cli c 56 User '{ extra="{ \"command.id\" : \"remove-receiver\", \"destination.ip\" : \"10.42.0.1\" }" }'
* pw-cli c 56 User '{ extra="{ \"command.id\" : \"clear-receivers\" }" }'
* \endcode
*
* \since 0.3.60
*/
@ -166,6 +214,36 @@ static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
enum rtp_target_type {
RTP_TARGET_TYPE_UNICAST,
RTP_TARGET_TYPE_MULTICAST
};
struct rtp_target {
struct spa_list link;
/* Common multicast and unicast fields */
enum rtp_target_type type;
uint16_t dest_port;
struct sockaddr_storage dest_addr;
socklen_t dest_addrlen;
int socket_fd;
uint32_t ttl;
uint32_t dscp;
/* Multicast specific fields */
char *ifname;
bool mcast_loop;
struct sockaddr_storage src_addr;
socklen_t src_addrlen;
};
struct impl {
struct pw_context *context;
@ -186,19 +264,15 @@ struct impl {
unsigned int do_disconnect_core:1;
char *ifname;
uint32_t ttl;
bool mcast_loop;
uint32_t dscp;
struct spa_list rtp_targets;
size_t num_rtp_targets;
struct sockaddr_storage src_addr;
socklen_t src_len;
uint16_t dst_port;
struct sockaddr_storage dst_addr;
socklen_t dst_len;
int rtp_fd;
/* This flag is needed to know whether on_add_receiver() shall connect
* the socket immediately, or keep the target disconnected. The latter
* case is done when the PW node is not running; then, once it does start
* running, the stream.c stream_start() call will in turn call
* stream_open_connection(), which will connect the socket. */
bool stream_connected;
};
static int make_socket(struct sockaddr_storage *src, socklen_t src_len,
@ -274,6 +348,255 @@ static void stream_destroy(void *d)
impl->stream = NULL;
}
static void teardown_rtp_target(struct rtp_target *target);
static int setup_rtp_target(struct rtp_target *target,
const char *ifname, const char *destination_ip, uint16_t destination_port,
const char *source_ip, uint32_t ttl, uint32_t dscp, bool mcast_loop)
{
int res = 0;
memset(target, 0, sizeof(struct rtp_target));
target->socket_fd = -1;
if (destination_port == 0)
target->dest_port = (DEFAULT_PORT + ((uint32_t) (pw_rand32() % 512) << 1));
else
target->dest_port = destination_port;
if ((res = pw_net_parse_address(destination_ip, target->dest_port,
&target->dest_addr, &target->dest_addrlen)) < 0) {
pw_log_error("invalid destination IP \"%s\": %s", destination_ip, spa_strerror(res));
goto error;
}
if (source_ip == NULL)
source_ip = (target->dest_addr.ss_family == AF_INET) ?
DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6;
if ((res = pw_net_parse_address(source_ip, 0, &target->src_addr,
&target->src_addrlen)) < 0) {
pw_log_error("invalid source IP \"%s\": %s", source_ip, spa_strerror(res));
goto error;
}
target->ttl = ttl;
target->dscp = dscp;
target->mcast_loop = mcast_loop;
target->ifname = ifname ? strdup(ifname) : NULL;
target->type = pw_net_is_multicast(&(target->dest_addr))
? RTP_TARGET_TYPE_MULTICAST
: RTP_TARGET_TYPE_UNICAST;
out:
return res;
error:
teardown_rtp_target(target);
goto out;
}
static int setup_rtp_target_from_props(struct rtp_target *target, struct pw_properties *props, bool *target_was_setup)
{
const char *destination_ip;
uint16_t destination_port;
const char *ifname;
const char *source_ip;
uint32_t ttl;
uint32_t dscp;
bool mcast_loop;
int res;
*target_was_setup = false;
destination_ip = pw_properties_get(props, "destination.ip");
if (destination_ip == NULL)
return 0;
destination_port = pw_properties_get_uint32(props, "destination.port", 0);
ifname = pw_properties_get(props, "local.ifname");
source_ip = pw_properties_get(props, "source.ip");
ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP);
mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
res = setup_rtp_target(target, ifname, destination_ip, destination_port, source_ip, ttl, dscp, mcast_loop);
if (res == 0)
*target_was_setup = true;
return res;
}
static void teardown_rtp_target(struct rtp_target *target)
{
if (target->socket_fd >= 0) {
close(target->socket_fd);
target->socket_fd = -1;
}
free(target->ifname);
target->ifname = NULL;
}
#define LOG_RTP_TARGET(LOGLEVEL, PREFIX, TARGET) \
do { \
if (SPA_UNLIKELY(pw_log_topic_enabled((LOGLEVEL), PW_LOG_TOPIC_DEFAULT))) { \
char src_addr_str_buf[256]; \
char dest_addr_str_buf[256]; \
const char *type_str; \
const char *src_addr_str; \
const char *dest_addr_str; \
const char *iface_name; \
uint16_t dest_port = 0; \
\
type_str = ((TARGET)->type == RTP_TARGET_TYPE_UNICAST) ? "unicast" : "multicast"; \
src_addr_str = dest_addr_str = "<could not get address>"; \
iface_name = ((TARGET)->ifname != NULL) ? (TARGET)->ifname : "<none>"; \
\
if (pw_net_get_ip(&((TARGET)->src_addr), src_addr_str_buf, sizeof(src_addr_str_buf), NULL, NULL) == 0) \
src_addr_str = src_addr_str_buf; \
if (pw_net_get_ip(&((TARGET)->dest_addr), dest_addr_str_buf, sizeof(dest_addr_str_buf), NULL, &dest_port) == 0) \
dest_addr_str = dest_addr_str_buf; \
\
if ((TARGET)->socket_fd < 0) { \
pw_log_logt((LOGLEVEL), PW_LOG_TOPIC_DEFAULT, __FILE__, __LINE__, __func__, \
"%s: type: %s; dest addr: %s; dest port: %" PRIu16 "; " \
"src addr: %s; TTL: %" PRIu32 "; DSCP: %" PRIu32 "; " \
"iface name: %s; mcast loop: %d; (socket not opened yet)", \
(PREFIX), type_str, dest_addr_str, dest_port, src_addr_str, \
(TARGET)->ttl, (TARGET)->dscp, iface_name, (TARGET)->mcast_loop); \
} else { \
pw_log_logt((LOGLEVEL), PW_LOG_TOPIC_DEFAULT, __FILE__, __LINE__, __func__, \
"%s type: %s; dest addr: %s; dest port: %" PRIu16 "; " \
"src addr: %s; TTL: %" PRIu32 "; DSCP: %" PRIu32 "; " \
"iface name: %s; mcast loop: %d; socket FD: %d", (PREFIX), \
type_str, dest_addr_str, dest_port, src_addr_str, (TARGET)->ttl, \
(TARGET)->dscp, iface_name, (TARGET)->mcast_loop, (TARGET)->socket_fd); \
} \
} \
} while (0)
static struct rtp_target *find_rtp_target_by_sockaddr(struct impl *impl, struct sockaddr_storage *address, bool compare_ports)
{
struct rtp_target *rtp_target;
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
if (pw_net_are_addresses_equal(&(rtp_target->dest_addr), address, compare_ports)) {
return rtp_target;
}
}
return NULL;
}
static struct rtp_target *find_rtp_target(struct impl *impl, const char *destination_ip, uint16_t destination_port)
{
int res;
struct sockaddr_storage dest_addr;
socklen_t dest_len;
if ((res = pw_net_parse_address(destination_ip, destination_port,
&dest_addr, &dest_len)) < 0) {
pw_log_error("invalid destination IP \"%s\": %s", destination_ip, spa_strerror(res));
return NULL;
}
return find_rtp_target_by_sockaddr(impl, &dest_addr, destination_port != 0);
}
static int append_to_rtp_targets(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
/* IMPORTANT: This must be run from within the data loop, since the rtp_targets
* list is modified here, and the stream_send_packet() function (which runs in
* the data loop thread), iterates over this same list. */
struct impl *impl = user_data;
struct rtp_target *rtp_target_to_add = (struct rtp_target *)data;
spa_list_append(&(impl->rtp_targets), &(rtp_target_to_add->link));
impl->num_rtp_targets++;
return 0;
}
static int add_rtp_target(struct impl *impl, struct rtp_target *rtp_target, bool append_in_data_loop)
{
struct rtp_target *rtp_target_copy;
LOG_RTP_TARGET(SPA_LOG_LEVEL_INFO, "Adding RTP target", rtp_target);
/* Allocate and fill the target here, outside of the data loop
* thread, to not unnecessarily block it. */
rtp_target_copy = malloc(sizeof(struct rtp_target));
if (SPA_UNLIKELY(rtp_target_copy == NULL)) {
pw_log_error("could not allocate memory for new target");
return -ENOMEM;
}
memcpy(rtp_target_copy, rtp_target, sizeof(struct rtp_target));
/* Ensure that nothing is stored in the spa_list link; it will
* be filled by spa_list_append(). */
spa_zero(rtp_target_copy->link);
if (append_in_data_loop)
rtp_stream_run_in_data_loop(impl->stream, append_to_rtp_targets, 1, rtp_target_copy, 0, impl);
else
append_to_rtp_targets(NULL, false, 0, rtp_target_copy, 0, impl);
return 0;
}
static int remove_from_rtp_targets(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
/* IMPORTANT: This must be run from within the data loop, since the rtp_targets
* list is modified here, and the stream_send_packet() function (which runs in
* the data loop thread), iterates over this same list. */
struct impl *impl = user_data;
struct rtp_target *rtp_target_to_remove = (struct rtp_target *)data;
spa_list_remove(&(rtp_target_to_remove->link));
impl->num_rtp_targets--;
return 0;
}
static void remove_rtp_target(struct impl *impl, struct rtp_target *rtp_target_to_remove,
bool remove_in_data_loop, const char *custom_prefix)
{
if (custom_prefix == NULL)
custom_prefix = "Removing RTP target";
LOG_RTP_TARGET(SPA_LOG_LEVEL_INFO, custom_prefix, rtp_target_to_remove);
if (remove_in_data_loop)
rtp_stream_run_in_data_loop(impl->stream, remove_from_rtp_targets, 1, rtp_target_to_remove, 0, impl);
else
remove_from_rtp_targets(NULL, false, 0, rtp_target_to_remove, 0, impl);
teardown_rtp_target(rtp_target_to_remove);
free(rtp_target_to_remove);
}
static void remove_rtp_target_by_ip_and_port(struct impl *impl, const char *destination_ip,
uint16_t destination_port, bool remove_in_data_loop)
{
struct rtp_target *rtp_target_to_remove;
if (impl->num_rtp_targets == 0)
return;
rtp_target_to_remove = find_rtp_target(impl, destination_ip, destination_port);
if (rtp_target_to_remove == NULL)
return;
remove_rtp_target(impl, rtp_target_to_remove, remove_in_data_loop, NULL);
}
static inline uint64_t get_time_ns(void)
{
struct timespec ts;
@ -286,6 +609,7 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
struct impl *impl = data;
struct msghdr msg;
ssize_t n;
struct rtp_target *rtp_target;
spa_zero(msg);
msg.msg_iov = iov;
@ -294,11 +618,18 @@ static void stream_send_packet(void *data, struct iovec *iov, size_t iovlen)
msg.msg_controllen = 0;
msg.msg_flags = 0;
n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL);
if (n < 0) {
int suppressed;
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0)
pw_log_warn("(%d suppressed) sendmsg() failed: %m", suppressed);
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
/* All targets are required to have open sockets
* by the time sending takes place. */
spa_assert(rtp_target->socket_fd >= 0);
n = sendmsg(rtp_target->socket_fd, &msg, MSG_NOSIGNAL);
if (n < 0) {
int suppressed;
if ((suppressed = spa_ratelimit_test(&impl->rate_limit, get_time_ns())) >= 0) {
pw_log_warn("(%d suppressed) sendmsg() failed: %m", suppressed);
LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "RTP target", rtp_target);
}
}
}
}
@ -311,40 +642,56 @@ static void stream_report_error(void *data, const char *error)
}
}
static void stream_close_connection(void *data, int *result);
static void stream_open_connection(void *data, int *result)
{
int res;
struct impl *impl = data;
struct rtp_target *rtp_target;
if ((res = make_socket(&impl->src_addr, impl->src_len,
&impl->dst_addr, impl->dst_len,
impl->mcast_loop, impl->ttl, impl->dscp,
impl->ifname)) < 0) {
pw_log_error("can't make socket: %s", spa_strerror(res));
rtp_stream_set_error(impl->stream, res, "Can't make socket");
if (result)
*result = res;
return;
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
if ((res = make_socket(&rtp_target->src_addr, rtp_target->src_addrlen,
&rtp_target->dest_addr, rtp_target->dest_addrlen,
rtp_target->mcast_loop, rtp_target->ttl,
rtp_target->dscp, rtp_target->ifname)) < 0) {
pw_log_error("can't make socket: %s", spa_strerror(res));
LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "RTP target", rtp_target);
rtp_stream_set_error(impl->stream, res, "Can't make socket");
stream_close_connection(data, NULL);
if (result)
*result = res;
return;
}
rtp_target->socket_fd = res;
}
if (result)
*result = 1;
impl->rtp_fd = res;
/* Now, after all sockets are opened, mark the stream as connected. */
impl->stream_connected = true;
}
static void stream_close_connection(void *data, int *result)
{
struct impl *impl = data;
struct rtp_target *rtp_target;
if (impl->rtp_fd > 0) {
if (result)
*result = 1;
close(impl->rtp_fd);
impl->rtp_fd = -1;
} else {
if (result)
*result = 0;
/* Mark the stream as disconnected to let future on_add_receiver()
* calls know that they must not connect the socket on their own. */
impl->stream_connected = false;
if (result)
*result = 0;
spa_list_for_each(rtp_target, &(impl->rtp_targets), link) {
if (rtp_target->socket_fd >= 0) {
if (result)
*result = 1;
close(rtp_target->socket_fd);
rtp_target->socket_fd = -1;
}
}
}
@ -385,15 +732,7 @@ static void stream_props_changed(struct impl *impl, uint32_t id, const struct sp
spa_pod_get_int(pod, &value_int) < 0)
continue;
pw_log_info("key '%s', value '%s'/%u", key, value_str, value_int);
if (spa_streq(key, "destination.ip")) {
if (!value_str || pw_net_parse_address(value_str, impl->dst_port, &impl->dst_addr,
&impl->dst_len) < 0) {
pw_log_error("invalid destination.ip: '%s'", value_str);
break;
}
pw_properties_set(impl->stream_props, "rtp.destination.ip", value_str);
items[n_items++] = SPA_DICT_ITEM_INIT("rtp.destination.ip", value_str);
} else if (spa_streq(key, "sess.name")) {
if (spa_streq(key, "sess.name")) {
if (!value_str) {
pw_log_error("invalid sess.name");
break;
@ -434,6 +773,193 @@ static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *
}
}
static void on_add_receiver(struct impl *impl, struct spa_json *command_json_iter)
{
int res;
char key[256];
const char *value;
struct rtp_target rtp_target;
int len;
char ifname[64];
char destination_ip[64];
int destination_port = 0;
char source_ip[64];
int ttl = DEFAULT_TTL;
int dscp = DEFAULT_DSCP;
bool mcast_loop = DEFAULT_LOOP;
ifname[0] = '\0';
destination_ip[0] = '\0';
source_ip[0] = '\0';
while ((len = spa_json_object_next(command_json_iter, key, sizeof(key), &value)) > 0) {
if (spa_streq(key, "local.ifname")) {
spa_json_parse_stringn(value, len, ifname, sizeof(ifname));
} else if (spa_streq(key, "destination.ip")) {
spa_json_parse_stringn(value, len, destination_ip, sizeof(destination_ip));
} else if (spa_streq(key, "destination.port")) {
spa_json_parse_int(value, len, &destination_port);
} else if (spa_streq(key, "source.ip")) {
spa_json_parse_stringn(value, len, source_ip, sizeof(source_ip));
} else if (spa_streq(key, "net.ttl")) {
spa_json_parse_int(value, len, &ttl);
} else if (spa_streq(key, "net.dscp")) {
spa_json_parse_int(value, len, &dscp);
} else if (spa_streq(key, "net.loop")) {
spa_json_parse_bool(value, len, &mcast_loop);
}
}
if (destination_ip[0] == '\0') {
pw_log_error("Cannot add receiver without a destination.ip value");
return;
}
if ((res = setup_rtp_target(&rtp_target, (ifname[0] != '\0') ? ifname : NULL,
destination_ip, destination_port, (source_ip[0] != '\0') ? source_ip : NULL,
ttl, dscp, mcast_loop)) != 0)
return;
if (find_rtp_target_by_sockaddr(impl, &(rtp_target.dest_addr), true) != NULL) {
LOG_RTP_TARGET(SPA_LOG_LEVEL_WARN, "Not adding RTP target because it is already added", &rtp_target);
goto duplicate_add;
}
/* Only create the socket when the stream is connected. Sockets are
* supposed to be disconnected otherwise. If it is disconnected, adding
* the RTP target to the list is enough - stream_open_connection() will
* connect the target's socket then. */
if (impl->stream_connected) {
if ((res = make_socket(&rtp_target.src_addr, rtp_target.src_addrlen,
&rtp_target.dest_addr, rtp_target.dest_addrlen,
rtp_target.mcast_loop, rtp_target.ttl,
rtp_target.dscp, rtp_target.ifname)) < 0) {
pw_log_error("Couldn't make socket for new receiver: %s", spa_strerror(res));
goto error;
}
pw_log_debug("Created socket for new receiver, socket FD: %d", res);
rtp_target.socket_fd = res;
}
if (add_rtp_target(impl, &rtp_target, true) != 0)
goto error;
finish:
return;
duplicate_add:
teardown_rtp_target(&rtp_target);
goto finish;
error:
teardown_rtp_target(&rtp_target);
goto finish;
}
static void on_remove_receiver(struct impl *impl, struct spa_json *command_json_iter)
{
char key[256];
const char *value;
int len;
char destination_ip[64];
int destination_port = 0;
destination_ip[0] = '\0';
while ((len = spa_json_object_next(command_json_iter, key, sizeof(key), &value)) > 0) {
if (spa_streq(key, "destination.ip")) {
spa_json_parse_stringn(value, len, destination_ip, sizeof(destination_ip));
} else if (spa_streq(key, "destination.port")) {
spa_json_parse_int(value, len, &destination_port);
}
}
if (destination_ip[0] == '\0') {
pw_log_error("Cannot remove receiver without a destination.ip value");
return;
}
remove_rtp_target_by_ip_and_port(impl, destination_ip, destination_port, true);
}
static void on_clear_receivers(struct impl *impl)
{
struct rtp_target *rtp_target;
pw_log_info("Clearing all receivers");
spa_list_consume(rtp_target, &(impl->rtp_targets), link)
remove_rtp_target(impl, rtp_target, true, NULL);
}
static void parse_rtp_command(struct impl *impl, const char *command_json_str)
{
int res;
struct spa_json iter;
char rtp_command_id[64];
if ((res = spa_json_str_object_find(command_json_str, strlen(command_json_str),
"command.id", rtp_command_id, sizeof(rtp_command_id))) <= 0) {
if (res == -ENOENT) {
pw_log_error("Command JSON string \"%s\" has no command.id field",
command_json_str);
} else {
pw_log_error("Error while parsing JSON string \"%s\": %s", command_json_str,
spa_strerror(res));
}
return;
}
if ((res = spa_json_begin_object(&iter, command_json_str, strlen(command_json_str))) <= 0) {
pw_log_error("Error while parsing JSON string \"%s\": %s", command_json_str,
spa_strerror(res));
return;
}
if (spa_streq(rtp_command_id, "add-receiver")) {
on_add_receiver(impl, &iter);
} else if (spa_streq(rtp_command_id, "remove-receiver")) {
on_remove_receiver(impl, &iter);
} else if (spa_streq(rtp_command_id, "clear-receivers")) {
on_clear_receivers(impl);
} else {
pw_log_error("Command JSON string \"%s\" has unrecognized command ID \"%s\"",
command_json_str, rtp_command_id);
}
}
static void stream_command(void *data, const struct spa_command *command)
{
struct impl *impl = data;
if (SPA_UNLIKELY(SPA_COMMAND_TYPE(command) != SPA_TYPE_COMMAND_Node))
return;
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_User: {
const struct spa_pod_object *pod_object = (const struct spa_pod_object *)command;
struct spa_pod_prop *prop;
SPA_POD_OBJECT_FOREACH(pod_object, prop) {
if (prop->key != SPA_COMMAND_NODE_extra)
continue;
if (prop->value.type != SPA_TYPE_String)
continue;
const char *json_str = SPA_POD_CONTENTS(struct spa_pod, &prop->value);
parse_rtp_command(impl, json_str);
}
break;
}
default:
break;
}
}
static const struct rtp_stream_events stream_events = {
RTP_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
@ -442,6 +968,7 @@ static const struct rtp_stream_events stream_events = {
.close_connection = stream_close_connection,
.param_changed = stream_param_changed,
.send_packet = stream_send_packet,
.command = stream_command,
};
static void core_destroy(void *d)
@ -458,21 +985,20 @@ static const struct pw_proxy_events core_proxy_events = {
static void impl_destroy(struct impl *impl)
{
struct rtp_target *rtp_target;
if (impl->stream)
rtp_stream_destroy(impl->stream);
if (impl->core && impl->do_disconnect_core)
pw_core_disconnect(impl->core);
if (impl->rtp_fd != -1) {
pw_log_info("closing socket with FD %d as part of shutdown", impl->rtp_fd);
close(impl->rtp_fd);
}
spa_list_consume(rtp_target, &(impl->rtp_targets), link)
remove_rtp_target(impl, rtp_target, false, "Removing RTP target as part of shutdown");
pw_properties_free(impl->stream_props);
pw_properties_free(impl->props);
free(impl->ifname);
free(impl);
}
@ -524,6 +1050,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
int64_t ts_offset;
int res = 0;
uint32_t header_size;
struct rtp_target initial_target;
bool initial_target_valid = false;
PW_LOG_TOPIC_INIT(mod_topic);
@ -531,8 +1059,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (impl == NULL)
return -errno;
impl->rtp_fd = -1;
if (args == NULL)
args = "";
@ -552,6 +1078,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
}
impl->stream_props = stream_props;
spa_list_init(&impl->rtp_targets);
impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
impl->rate_limit.burst = 1;
@ -598,45 +1126,43 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
copy_props(impl, props, "sess.ts-refclk");
copy_props(impl, props, "aes67.driver-group");
str = pw_properties_get(props, "local.ifname");
impl->ifname = str ? strdup(str) : NULL;
impl->dst_port = DEFAULT_PORT + ((uint32_t) (pw_rand32() % 512) << 1);
impl->dst_port = pw_properties_get_uint32(props, "destination.port", impl->dst_port);
if ((str = pw_properties_get(props, "destination.ip")) == NULL)
str = DEFAULT_DESTINATION_IP;
if ((res = pw_net_parse_address(str, impl->dst_port, &impl->dst_addr, &impl->dst_len)) < 0) {
pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res));
if ((res = setup_rtp_target_from_props(&initial_target, props, &initial_target_valid)) < 0) {
pw_log_error("could not setup initial destination: %s", spa_strerror(res));
goto out;
}
if ((str = pw_properties_get(props, "source.ip")) == NULL)
str = impl->dst_addr.ss_family == AF_INET ?
DEFAULT_SOURCE_IP : DEFAULT_SOURCE_IP6;
if ((res = pw_net_parse_address(str, 0, &impl->src_addr, &impl->src_len)) < 0) {
pw_log_error("invalid source.ip %s: %s", str, spa_strerror(res));
goto out;
}
impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
impl->dscp = pw_properties_get_uint32(props, "net.dscp", DEFAULT_DSCP);
ts_offset = pw_properties_get_int64(props, "sess.ts-offset", DEFAULT_TS_OFFSET);
if (ts_offset == -1)
ts_offset = pw_rand32();
pw_properties_setf(stream_props, "rtp.sender-ts-offset", "%u", (uint32_t)ts_offset);
header_size = impl->dst_addr.ss_family == AF_INET ?
IP4_HEADER_SIZE : IP6_HEADER_SIZE;
/* Assume IPv6 header size. This is necessary, since it
* is possible to add a mixture of IPv4 and IPv6 targets
* to the RTP sink. Having different headers sizes for
* both IPv4 and IPv6 would require producing packets
* separately for both protocols, which adds significant
* complexity and runtime overhead. Instead, accept the
* small waste (20 bytes) by always assuming an IPv6
* header size, even when IPv4 is used. */
header_size = IP6_HEADER_SIZE;
header_size += UDP_HEADER_SIZE;
pw_properties_setf(stream_props, "net.header", "%u", header_size);
pw_net_get_ip(&impl->src_addr, addr, sizeof(addr), NULL, NULL);
pw_properties_set(stream_props, "rtp.source.ip", addr);
pw_net_get_ip(&impl->dst_addr, addr, sizeof(addr), NULL, NULL);
pw_properties_set(stream_props, "rtp.destination.ip", addr);
pw_properties_setf(stream_props, "rtp.destination.port", "%u", impl->dst_port);
pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl);
pw_properties_setf(stream_props, "rtp.dscp", "%u", impl->dscp);
if (initial_target_valid) {
res = add_rtp_target(impl, &initial_target, false);
if (res != 0) {
teardown_rtp_target(&initial_target);
goto out;
}
pw_net_get_ip(&(initial_target.src_addr), addr, sizeof(addr), NULL, NULL);
pw_properties_set(stream_props, "rtp.source.ip", addr);
pw_net_get_ip(&(initial_target.dest_addr), addr, sizeof(addr), NULL, NULL);
pw_properties_set(stream_props, "rtp.destination.ip", addr);
pw_properties_setf(stream_props, "rtp.destination.port", "%u", initial_target.dest_port);
pw_properties_setf(stream_props, "rtp.ttl", "%u", initial_target.ttl);
pw_properties_setf(stream_props, "rtp.dscp", "%u", initial_target.dscp);
}
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) {

View file

@ -45,6 +45,7 @@ PW_LOG_TOPIC_EXTERN(mod_topic);
#define rtp_stream_emit_open_connection(s,r) rtp_stream_emit(s, open_connection, 0,r)
#define rtp_stream_emit_close_connection(s,r) rtp_stream_emit(s, close_connection, 0,r)
#define rtp_stream_emit_param_changed(s,i,p) rtp_stream_emit(s, param_changed,0,i,p)
#define rtp_stream_emit_command(s,cmd) rtp_stream_emit(s, command,0,cmd)
#define rtp_stream_call(s,m,v,...) spa_callbacks_call_fast(&s->rtp_callbacks, \
struct rtp_stream_events, m, v, ##__VA_ARGS__)
@ -584,11 +585,18 @@ static void on_stream_param_changed (void *d, uint32_t id, const struct spa_pod
}
};
static void on_stream_command(void *d, const struct spa_command *command)
{
struct impl *impl = d;
rtp_stream_emit_command(impl, command);
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.destroy = stream_destroy,
.state_changed = on_stream_state_changed,
.param_changed = on_stream_param_changed,
.command = on_stream_command,
.io_changed = stream_io_changed,
};
@ -1225,3 +1233,10 @@ void rtp_stream_update_process_latency(struct rtp_stream *s,
update_latency_params(impl);
}
int rtp_stream_run_in_data_loop(struct rtp_stream *s, spa_invoke_func_t func,
uint32_t seq, const void *data, size_t size, void *user_data)
{
struct impl *impl = (struct impl*)s;
return pw_loop_locked(impl->data_loop, func, seq, data, size, user_data);
}

View file

@ -50,6 +50,8 @@ struct rtp_stream_events {
void (*param_changed) (void *data, uint32_t id, const struct spa_pod *param);
void (*command) (void *data, const struct spa_command *command);
void (*send_packet) (void *data, struct iovec *iov, size_t iovlen);
void (*send_feedback) (void *data, uint32_t seqnum);
@ -91,6 +93,9 @@ int rtp_stream_update_params(struct rtp_stream *stream,
void rtp_stream_update_process_latency(struct rtp_stream *stream,
const struct spa_process_latency_info *process_latency);
int rtp_stream_run_in_data_loop(struct rtp_stream *s, spa_invoke_func_t func,
uint32_t seq, const void *data, size_t size, void *user_data);
#ifdef __cplusplus
}
#endif