module-rtp: make sender work

Small cleanups
This commit is contained in:
Wim Taymans 2022-10-06 11:41:01 +02:00
parent 84c666db20
commit be1159eb66
3 changed files with 141 additions and 81 deletions

View file

@ -113,7 +113,7 @@
* #raop.password = "****" * #raop.password = "****"
* #audio.format = "S16" * #audio.format = "S16"
* #audio.rate = 44100 * #audio.rate = 44100
* #audio.channels = 22 * #audio.channels = 2
* #audio.position = [ FL FR ] * #audio.position = [ FL FR ]
* stream.props = { * stream.props = {
* # extra sink properties * # extra sink properties

View file

@ -56,34 +56,57 @@
* *
* Options specific to the behavior of this module * Options specific to the behavior of this module
* *
* - `stream.props = {}`: properties to be passed to the stream * - `sap.ip = <str>`: IP address of the SAP messages, default "224.0.0.56"
* - `sap.ip = <str>`: IP address of the SAP messages * - `sap.port = <int>`: port of the SAP messages, default 9875
* - `sap.port = <str>`: port of the SAP messages * - `source.ip =<str>`: source IP address, default "0.0.0.0"
* - `destination.ip =<str>`: destination IP address, default "224.0.0.56"
* - `local.ifname = <str>`: interface name to use * - `local.ifname = <str>`: interface name to use
* - `sess.latency.msec = <str>`: target network latency in milliseconds * - `net.mtu = <int>`: MTU to use, default 1280
* - `net.ttl = <int>`: TTL to use, default 1
* - `net.loop = <bool>`: loopback multicast, default false
* - `sess.name = <str>`: a session name
* - `stream.props = {}`: properties to be passed to the stream
* *
* ## General options * ## General options
* *
* Options with well-known behavior: * Options with well-known behavior:
* *
* - \ref PW_KEY_REMOTE_NAME
* - \ref PW_KEY_AUDIO_FORMAT
* - \ref PW_KEY_AUDIO_RATE
* - \ref PW_KEY_AUDIO_CHANNELS
* - \ref SPA_KEY_AUDIO_POSITION
* - \ref PW_KEY_NODE_NAME * - \ref PW_KEY_NODE_NAME
* - \ref PW_KEY_NODE_DESCRIPTION * - \ref PW_KEY_NODE_DESCRIPTION
* - \ref PW_KEY_MEDIA_NAME * - \ref PW_KEY_MEDIA_NAME
* - \ref PW_KEY_NODE_GROUP
* - \ref PW_KEY_NODE_LATENCY
* - \ref PW_KEY_NODE_VIRTUAL
* - \ref PW_KEY_MEDIA_CLASS
* *
* ## Example configuration * ## Example configuration
*\code{.unparsed} *\code{.unparsed}
* context.modules = [ * context.modules = [
* { name = libpipewire-module-rtp-sink * { name = libpipewire-module-rtp-sink
* args = { * args = {
* #sap.ip = 224.0.0.56 * #sap.ip = "224.0.0.56"
* #sap.port = 9875 * #sap.port = 9875
* #local.ifname = eth0 * #source.ip = "0.0.0.0"
* sess.latency.msec = 200 * #destination.ip = "224.0.0.56"
* stream.props = { * #local.ifname = "eth0"
* #net.mtu = 1280
* #net.ttl = 1
* #net.loop = false
* #sess.name = "PipeWire RTP stream"
* #audio.format = "S16BE"
* #audio.rate = 48000
* #audio.channels = 2
* #audio.position = [ FL FR ]
* stream.props = {
* node.name = "rtp-sink" * node.name = "rtp-sink"
* } * }
* } * }
* } *}
*] *]
*\endcode *\endcode
* *
@ -91,41 +114,54 @@
#define NAME "rtp-sink" #define NAME "rtp-sink"
static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
{ PW_KEY_MODULE_DESCRIPTION, "RTP Source" },
{ PW_KEY_MODULE_USAGE, "sap.ip=<SAP IP address to listen on> "
"sap.port=<SAP port to listen on> "
"source.ip=<source IP address> "
"destination.ip=<destination IP address> "
"local.ifname=<local interface name to use> "
"sess.latency.msec=<target network latency in milliseconds> "
"stream.props= { key=value ... }" },
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define PW_LOG_TOPIC_DEFAULT mod_topic #define PW_LOG_TOPIC_DEFAULT mod_topic
#define SAP_INTERVAL_SEC 5 #define SAP_INTERVAL_SEC 5
#define SAP_MIME_TYPE "application/sdp" #define SAP_MIME_TYPE "application/sdp"
#define BUFFER_SIZE (1u<<16) #define BUFFER_SIZE (1u<<20)
#define BUFFER_MASK (BUFFER_SIZE-1) #define BUFFER_MASK (BUFFER_SIZE-1)
#define DEFAULT_SAP_IP "224.0.0.56" #define DEFAULT_SAP_IP "224.0.0.56"
#define DEFAULT_SAP_PORT 9875 #define DEFAULT_SAP_PORT 9875
#define DEFAULT_FORMAT "S16BE" #define DEFAULT_FORMAT "S16BE"
#define DEFAULT_RATE 44100 #define DEFAULT_RATE 48000
#define DEFAULT_CHANNELS 2 #define DEFAULT_CHANNELS 2
#define DEFAULT_POSITION "[ FL FR ]" #define DEFAULT_POSITION "[ FL FR ]"
#define DEFAULT_PORT 46000 #define DEFAULT_PORT 46000
#define DEFAULT_SOURCE_IP "0.0.0.0" #define DEFAULT_SOURCE_IP "0.0.0.0"
#define DEFAULT_DESTINATION_IP "224.0.0.56" #define DEFAULT_DESTINATION_IP "224.0.0.56"
#define DEFAULT_TTL 1 #define DEFAULT_TTL 1
#define DEFAULT_MTU 1280 #define DEFAULT_MTU 1280
#define DEFAULT_LOOP false
#define DEFAULT_MIN_PTIME 2
#define DEFAULT_MAX_PTIME 20
#define USAGE "sap.ip=<SAP IP address to send announce, default:"DEFAULT_SAP_IP"> " \
"sap.port=<SAP port to send on, default:"SPA_STRINGIFY(DEFAULT_SAP_PORT)"> " \
"source.ip=<source IP address, default:"DEFAULT_SOURCE_IP"> " \
"destination.ip=<destination IP address, default:"DEFAULT_DESTINATION_IP"> " \
"local.ifname=<local interface name to use> " \
"net.mtu=<desired MTU, default:"SPA_STRINGIFY(DEFAULT_MTU)"> " \
"net.ttl=<desired TTL, default:"SPA_STRINGIFY(DEFAULT_TTL)"> " \
"net.loop=<desired loopback, default:"SPA_STRINGIFY(DEFAULT_LOOP)"> " \
"sess.name=<a name for the session> " \
"audio.format=<format, default:"DEFAULT_FORMAT"> " \
"audio.rate=<sample rate, default:"SPA_STRINGIFY(DEFAULT_RATE)"> " \
"audio.channels=<number of channels, default:"SPA_STRINGIFY(DEFAULT_CHANNELS)"> "\
"audio.position=<channel map, default:"DEFAULT_POSITION"> " \
"stream.props= { key=value ... }"
static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
{ PW_KEY_MODULE_DESCRIPTION, "RTP Sink" },
{ PW_KEY_MODULE_USAGE, USAGE },
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
struct impl { struct impl {
struct pw_impl_module *module; struct pw_impl_module *module;
@ -150,9 +186,11 @@ struct impl {
char *ifname; char *ifname;
char *session_name; char *session_name;
int sess_latency_msec; int sess_latency_msec;
bool mcast_loop;
bool ttl;
int mtu; int mtu;
bool ttl;
bool mcast_loop;
uint32_t min_ptime;
uint32_t max_ptime;
struct sockaddr_storage src_addr; struct sockaddr_storage src_addr;
socklen_t src_len; socklen_t src_len;
@ -166,6 +204,7 @@ struct impl {
socklen_t sap_len; socklen_t sap_len;
uint16_t msg_id_hash; uint16_t msg_id_hash;
uint32_t ntp;
struct spa_audio_info_raw info; struct spa_audio_info_raw info;
uint32_t frame_size; uint32_t frame_size;
@ -206,6 +245,14 @@ static void flush_packets(struct impl *impl)
struct msghdr msg; struct msghdr msg;
ssize_t n; ssize_t n;
struct rtp_header header; struct rtp_header header;
int32_t tosend;
avail = spa_ringbuffer_get_read_index(&impl->ring, &index);
tosend = SPA_ROUND_DOWN(impl->mtu, impl->frame_size);
if (avail < tosend)
return;
spa_zero(header); spa_zero(header);
header.v = 2; header.v = 2;
@ -223,26 +270,24 @@ static void flush_packets(struct impl *impl)
msg.msg_controllen = 0; msg.msg_controllen = 0;
msg.msg_flags = 0; msg.msg_flags = 0;
avail = spa_ringbuffer_get_read_index(&impl->ring, &index); while (avail >= tosend) {
while (avail >= impl->mtu) {
header.sequence_number = htons(impl->seq); header.sequence_number = htons(impl->seq);
header.timestamp = htonl(impl->timestamp); header.timestamp = htonl(impl->timestamp);
set_iovec(&impl->ring, set_iovec(&impl->ring,
impl->buffer, BUFFER_SIZE, impl->buffer, BUFFER_SIZE,
index % BUFFER_MASK, index & BUFFER_MASK,
&iov[1], impl->mtu); &iov[1], tosend);
n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL); n = sendmsg(impl->rtp_fd, &msg, MSG_NOSIGNAL);
if (n < 0) if (n < 0)
pw_log_warn("sendmsg() failed: %m"); pw_log_warn("sendmsg() failed: %m");
impl->seq++; impl->seq++;
impl->timestamp += impl->mtu / impl->frame_size; impl->timestamp += tosend / impl->frame_size;
index += impl->mtu; index += tosend;
avail -= impl->mtu; avail -= tosend;
} }
spa_ringbuffer_read_update(&impl->ring, index); spa_ringbuffer_read_update(&impl->ring, index);
} }
@ -265,8 +310,8 @@ static void stream_process(void *data)
filled = spa_ringbuffer_get_write_index(&impl->ring, &index); filled = spa_ringbuffer_get_write_index(&impl->ring, &index);
if (filled > (int32_t)BUFFER_SIZE) { if (filled + wanted > (int32_t)BUFFER_SIZE) {
pw_log_warn("overrun %u > %u", filled, BUFFER_SIZE); pw_log_warn("overrun %u + %u > %u", filled, wanted, BUFFER_SIZE);
} else { } else {
spa_ringbuffer_write_data(&impl->ring, spa_ringbuffer_write_data(&impl->ring,
impl->buffer, impl->buffer,
@ -426,8 +471,7 @@ static int get_ip(const struct sockaddr_storage *sa, char *ip, size_t len)
static void send_sap(struct impl *impl, bool bye) static void send_sap(struct impl *impl, bool bye)
{ {
char buffer[2048], src_addr[64], dst_addr[64]; char buffer[2048], src_addr[64], dst_addr[64];
const char *user_name = "-", *af, *fmt; const char *user_name, *af, *fmt;
uint32_t ntp;
struct sockaddr *sa = (struct sockaddr*)&impl->src_addr; struct sockaddr *sa = (struct sockaddr*)&impl->src_addr;
struct sap_header header; struct sap_header header;
struct iovec iov[4]; struct iovec iov[4];
@ -454,12 +498,12 @@ static void send_sap(struct impl *impl, bool bye)
iov[2].iov_base = SAP_MIME_TYPE; iov[2].iov_base = SAP_MIME_TYPE;
iov[2].iov_len = sizeof(SAP_MIME_TYPE); iov[2].iov_len = sizeof(SAP_MIME_TYPE);
ntp = (uint32_t) time(NULL) + 2208988800U;
get_ip(&impl->src_addr, src_addr, sizeof(src_addr)); get_ip(&impl->src_addr, src_addr, sizeof(src_addr));
get_ip(&impl->dst_addr, dst_addr, sizeof(dst_addr)); get_ip(&impl->dst_addr, dst_addr, sizeof(dst_addr));
fmt = "L16"; fmt = "L16";
if ((user_name = pw_get_user_name()) == NULL)
user_name = "-";
snprintf(buffer, sizeof(buffer), snprintf(buffer, sizeof(buffer),
"v=0\n" "v=0\n"
@ -471,10 +515,10 @@ static void send_sap(struct impl *impl, bool bye)
"m=audio %u RTP/AVP %i\n" "m=audio %u RTP/AVP %i\n"
"a=rtpmap:%i %s/%u/%u\n" "a=rtpmap:%i %s/%u/%u\n"
"a=type:broadcast\n", "a=type:broadcast\n",
user_name, ntp, af, src_addr, user_name, impl->ntp, af, src_addr,
impl->session_name, impl->session_name,
af, dst_addr, af, dst_addr,
ntp, impl->ntp,
impl->port, impl->payload, impl->port, impl->payload,
impl->payload, fmt, impl->info.rate, impl->info.channels); impl->payload, fmt, impl->info.rate, impl->info.channels);
@ -564,6 +608,7 @@ static void impl_destroy(struct impl *impl)
pw_properties_free(impl->props); pw_properties_free(impl->props);
free(impl->ifname); free(impl->ifname);
free(impl->session_name);
free(impl); free(impl);
} }
@ -775,13 +820,13 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
res = -EINVAL; res = -EINVAL;
goto out; goto out;
} }
impl->payload = 127;
impl->mtu = DEFAULT_MTU;
impl->ttl = DEFAULT_TTL;
impl->ssrc = rand();
impl->timestamp = rand();
impl->seq = rand();
impl->msg_id_hash = rand(); impl->msg_id_hash = rand();
impl->ntp = (uint32_t) time(NULL) + 2208988800U;
impl->payload = 127;
impl->seq = rand();
impl->timestamp = rand();
impl->ssrc = rand();
str = pw_properties_get(props, "local.ifname"); str = pw_properties_get(props, "local.ifname");
impl->ifname = str ? strdup(str) : NULL; impl->ifname = str ? strdup(str) : NULL;
@ -789,7 +834,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((str = pw_properties_get(props, "sap.ip")) == NULL) if ((str = pw_properties_get(props, "sap.ip")) == NULL)
str = DEFAULT_SAP_IP; str = DEFAULT_SAP_IP;
port = pw_properties_get_uint32(props, "sap.port", DEFAULT_SAP_PORT); port = pw_properties_get_uint32(props, "sap.port", DEFAULT_SAP_PORT);
if ((res = parse_address(str, port, &impl->sap_addr, &impl->sap_len)) < 0) { if ((res = parse_address(str, port, &impl->sap_addr, &impl->sap_len)) < 0) {
pw_log_error("invalid sap.ip %s: %s", str, spa_strerror(res)); pw_log_error("invalid sap.ip %s: %s", str, spa_strerror(res));
goto out; goto out;
@ -810,6 +854,18 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res));
goto out; goto out;
} }
impl->mtu = pw_properties_get_uint32(props, "net.mtu", DEFAULT_MTU);
impl->ttl = pw_properties_get_uint32(props, "net.ttl", DEFAULT_TTL);
impl->mcast_loop = pw_properties_get_bool(props, "net.loop", DEFAULT_LOOP);
impl->min_ptime = pw_properties_get_uint32(props, "sess.min-ptime", DEFAULT_MIN_PTIME);
impl->max_ptime = pw_properties_get_uint32(props, "sess.max-ptime", DEFAULT_MAX_PTIME);
if ((str = pw_properties_get(props, "sess.name")) == NULL)
pw_properties_setf(props, "sess.name", "PipeWire RTP Stream on %s",
pw_get_host_name());
str = pw_properties_get(props, "sess.name");
impl->session_name = str ? strdup(str) : NULL;
impl->core = pw_context_get_object(impl->module_context, PW_TYPE_INTERFACE_Core); impl->core = pw_context_get_object(impl->module_context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) { if (impl->core == NULL) {

View file

@ -56,11 +56,11 @@
* *
* Options specific to the behavior of this module * Options specific to the behavior of this module
* *
* - `stream.props = {}`: properties to be passed to the stream * - `sap.ip = <str>`: IP address of the SAP messages, default "224.0.0.56"
* - `sap.ip = <str>`: IP address of the SAP messages * - `sap.port = <str>`: port of the SAP messages, default 9875
* - `sap.port = <str>`: port of the SAP messages
* - `local.ifname = <str>`: interface name to use * - `local.ifname = <str>`: interface name to use
* - `sess.latency.msec = <str>`: target network latency in milliseconds * - `sess.latency.msec = <str>`: target network latency in milliseconds, default 100
* - `stream.props = {}`: properties to be passed to the stream
* *
* ## General options * ## General options
* *
@ -69,6 +69,7 @@
* - \ref PW_KEY_NODE_NAME * - \ref PW_KEY_NODE_NAME
* - \ref PW_KEY_NODE_DESCRIPTION * - \ref PW_KEY_NODE_DESCRIPTION
* - \ref PW_KEY_MEDIA_NAME * - \ref PW_KEY_MEDIA_NAME
* - \ref PW_KEY_MEDIA_CLASS
* *
* ## Example configuration * ## Example configuration
*\code{.unparsed} *\code{.unparsed}
@ -78,9 +79,10 @@
* #sap.ip = 224.0.0.56 * #sap.ip = 224.0.0.56
* #sap.port = 9875 * #sap.port = 9875
* #local.ifname = eth0 * #local.ifname = eth0
* sess.latency.msec = 200 * sess.latency.msec = 100
* stream.props = { * stream.props = {
* node.name = "rtp-source" * node.name = "rtp-source"
* #media.class = "Audio/Source"
* } * }
* } * }
* } * }
@ -91,29 +93,34 @@
#define NAME "rtp-source" #define NAME "rtp-source"
static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
{ PW_KEY_MODULE_DESCRIPTION, "RTP Source" },
{ PW_KEY_MODULE_USAGE, "sap.ip=<SAP IP address to listen on> "
"sap.port=<SAP port to listen on> "
"local.ifname=<local interface name to use> "
"sess.latency.msec=<target network latency in milliseconds> "
"stream.props= { key=value ... }" },
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define PW_LOG_TOPIC_DEFAULT mod_topic #define PW_LOG_TOPIC_DEFAULT mod_topic
#define SAP_MIME_TYPE "application/sdp"
#define ERROR_MSEC 2 #define ERROR_MSEC 2
#define MAX_SESSIONS 16 #define MAX_SESSIONS 16
#define MTU 1280
#define CLEANUP_INTERVAL_SEC 20 #define CLEANUP_INTERVAL_SEC 20
#define DEFAULT_SAP_IP "224.0.0.56" #define DEFAULT_SAP_IP "224.0.0.56"
#define DEFAULT_SAP_PORT 9875 #define DEFAULT_SAP_PORT 9875
#define DEFAULT_SESS_LATENCY 200 #define DEFAULT_SESS_LATENCY 100
#define BUFFER_SIZE (1u<<22)
#define BUFFER_MASK (BUFFER_SIZE-1)
#define USAGE "sap.ip=<SAP IP address to listen on, default "DEFAULT_SAP_IP"> " \
"sap.port=<SAP port to listen on, default "SPA_STRINGIFY(DEFAULT_SAP_PORT)"> " \
"local.ifname=<local interface name to use> " \
"sess.latency.msec=<target network latency, default "SPA_STRINGIFY(DEFAULT_SESS_LATENCY)"> " \
"stream.props= { key=value ... }"
static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
{ PW_KEY_MODULE_DESCRIPTION, "RTP Source" },
{ PW_KEY_MODULE_USAGE, USAGE },
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
struct impl { struct impl {
struct pw_impl_module *module; struct pw_impl_module *module;
@ -159,9 +166,6 @@ struct sdp_info {
uint32_t stride; uint32_t stride;
}; };
#define BUFFER_SIZE (1u<<16)
#define BUFFER_MASK (BUFFER_SIZE-1)
struct session { struct session {
struct impl *impl; struct impl *impl;
struct spa_list link; struct spa_list link;
@ -219,7 +223,7 @@ static void stream_process(void *data)
if (avail < wanted || sess->buffering) { if (avail < wanted || sess->buffering) {
memset(d[0].data, 0, wanted); memset(d[0].data, 0, wanted);
if (!sess->buffering) if (!sess->buffering && sess->have_sync)
pw_log_warn("underrun %u/%u < %u", avail, sess->target_buffer, wanted); pw_log_warn("underrun %u/%u < %u", avail, sess->target_buffer, wanted);
} else { } else {
float error, corr; float error, corr;
@ -767,8 +771,8 @@ static int parse_sap(struct impl *impl, void *data, size_t len)
mime = SPA_PTROFF(data, offs, char); mime = SPA_PTROFF(data, offs, char);
if (spa_strstartswith(mime, "v=0")) { if (spa_strstartswith(mime, "v=0")) {
sdp = mime; sdp = mime;
mime = "application/sdp"; mime = SAP_MIME_TYPE;
} else if (spa_streq(mime, "application/sdp")) } else if (spa_streq(mime, SAP_MIME_TYPE))
sdp = SPA_PTROFF(mime, strlen(mime)+1, char); sdp = SPA_PTROFF(mime, strlen(mime)+1, char);
else else
return -EINVAL; return -EINVAL;