module-rtp: move avahi in rtp-session

It needs to be there because we need to be able to relate
zeroconf entries to udp connections.
This commit is contained in:
Wim Taymans 2023-03-01 09:21:53 +01:00
parent 7da031c969
commit 3badf6f3ac
2 changed files with 314 additions and 60 deletions

View file

@ -536,15 +536,19 @@ pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink',
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep],
)
pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-session',
[ 'module-rtp/stream.c',
'module-rtp-session.c' ],
include_directories : [configinc],
install : true,
install_dir : modules_install_dir,
install_rpath: modules_install_dir,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep],
)
build_module_rtp_session = avahi_dep.found()
if build_module_rtp_session
pipewire_module_rtp_session = shared_library('pipewire-module-rtp-session',
[ 'module-rtp/stream.c',
'module-zeroconf-discover/avahi-poll.c',
'module-rtp-session.c' ],
include_directories : [configinc],
install : true,
install_dir : modules_install_dir,
install_rpath: modules_install_dir,
dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, avahi_dep],
)
endif
build_module_roc = roc_dep.found()
if build_module_roc

View file

@ -26,6 +26,13 @@
#include <pipewire/pipewire.h>
#include <pipewire/impl.h>
#include <avahi-client/publish.h>
#include <avahi-client/lookup.h>
#include <avahi-common/error.h>
#include <avahi-common/malloc.h>
#include "module-zeroconf-discover/avahi-poll.h"
#include <module-rtp/rtp.h>
#include <module-rtp/apple-midi.h>
#include <module-rtp/stream.h>
@ -39,10 +46,9 @@
*
* Options specific to the behavior of this module
*
* - `source.ip =<str>`: source IP address, default "0.0.0.0"
* - `destination.ip =<str>`: destination IP address, default "224.0.0.56"
* - `destination.port =<int>`: destination port, default random beteen 46000 and 47024
* - `local.ifname = <str>`: interface name to use
* - `control.ip =<str>`: control IP address, default "0.0.0.0"
* - `control.port =<int>`: control port, default "0"
* - `net.mtu = <int>`: MTU to use, default 1280
* - `net.ttl = <int>`: TTL to use, default 1
* - `net.loop = <bool>`: loopback multicast, default false
@ -77,9 +83,8 @@
* { name = libpipewire-module-rtp-sink
* args = {
* #local.ifname = "eth0"
* #source.ip = "0.0.0.0"
* #destination.ip = "224.0.0.56"
* #destination.port = 46000
* #control.ip = "0.0.0.0"
* #control.port = 0
* #net.mtu = 1280
* #net.ttl = 1
* #net.loop = false
@ -87,12 +92,12 @@
* #sess.max-ptime = 20
* #sess.name = "PipeWire RTP stream"
* #rtp.media = "audio"
* #audio.format = "S16BE"
* #audio.rate = 48000
* #audio.channels = 2
* #audio.position = [ FL FR ]
* stream.props = {
* node.name = "rtp-sink"
* #audio.format = "S16BE"
* #audio.rate = 48000
* #audio.channels = 2
* #audio.position = [ FL FR ]
* }
* }
*}
@ -115,19 +120,16 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_CHANNELS 2
#define DEFAULT_POSITION "[ FL FR ]"
#define DEFAULT_SOURCE_IP "0.0.0.0"
#define DEFAULT_SOURCE_PORT 0
#define DEFAULT_DESTINATION_IP "224.0.0.56"
#define DEFAULT_DESTINATION_PORT 46000
#define DEFAULT_CONTROL_IP "0.0.0.0"
#define DEFAULT_CONTROL_PORT 0
#define DEFAULT_TTL 1
#define DEFAULT_MTU 1280
#define DEFAULT_LOOP false
#define DEFAULT_TS_OFFSET -1
#define USAGE "source.ip=<source IP address, default:"DEFAULT_SOURCE_IP"> " \
"destination.ip=<destination IP address, default:"DEFAULT_DESTINATION_IP"> " \
"destination.port=<int, default random beteen 46000 and 47024> " \
#define USAGE "control.ip=<destination IP address, default:"DEFAULT_CONTROL_IP"> " \
"control.port=<int, default:"SPA_STRINGIFY(DEFAULT_CONTROL_PORT)"> " \
"local.ifname=<local interface name to use> " \
"net.mtu=<desired MTU, default:"SPA_STRINGIFY(DEFAULT_MTU)"> " \
"net.ttl=<desired TTL, default:"SPA_STRINGIFY(DEFAULT_TTL)"> " \
@ -149,6 +151,28 @@ static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_VERSION, PACKAGE_VERSION },
};
struct service_info {
AvahiIfIndex interface;
AvahiProtocol protocol;
const char *name;
const char *type;
const char *domain;
const char *host_name;
AvahiAddress address;
uint16_t port;
};
#define SERVICE_INFO(...) ((struct service_info){ __VA_ARGS__ })
struct service {
struct service_info info;
struct spa_list link;
struct impl *impl;
struct session *sess;
};
struct session {
struct impl *impl;
struct spa_list link;
@ -182,6 +206,12 @@ struct impl {
struct spa_hook module_listener;
struct pw_properties *props;
AvahiPoll *avahi_poll;
AvahiClient *client;
AvahiServiceBrowser *browser;
AvahiEntryGroup *group;
struct spa_list service_list;
struct pw_properties *stream_props;
struct pw_loop *loop;
@ -206,10 +236,11 @@ struct impl {
char *ts_refclk;
int payload;
uint16_t ctrl_port;
struct sockaddr_storage ctrl_addr;
socklen_t ctrl_len;
struct sockaddr_storage src_addr;
socklen_t src_len;
struct sockaddr_storage data_addr;
socklen_t data_len;
struct spa_list sessions;
uint32_t n_sessions;
@ -831,7 +862,7 @@ static int setup_apple_session(struct impl *impl)
if (impl->ctrl_source == NULL)
return -errno;
if ((fd = make_socket(&impl->src_addr, impl->src_len,
if ((fd = make_socket(&impl->data_addr, impl->data_len,
impl->mcast_loop, impl->ttl, impl->ifname)) < 0)
return fd;
@ -929,6 +960,239 @@ static int parse_address(const char *address, uint16_t port,
return 0;
}
static struct service *make_service(struct impl *impl, const struct service_info *info,
AvahiStringList *txt)
{
struct service *s;
char at[AVAHI_ADDRESS_STR_MAX];
struct session *sess;
int res;
s = calloc(1, sizeof(*s));
if (s == NULL)
return NULL;
s->info.interface = info->interface;
s->info.protocol = info->protocol;
s->info.name = strdup(info->name);
s->info.type = strdup(info->type);
s->info.domain = strdup(info->domain);
s->info.host_name = strdup(info->host_name);
s->info.address = info->address;
s->info.port = info->port;
s->impl = impl;
spa_list_append(&impl->service_list, &s->link);
avahi_address_snprint(at, sizeof(at), &s->info.address);
pw_log_info("create session: %s %s:%u", s->info.name, at, s->info.port);
sess = make_session(impl, s->info.name);
if (sess == NULL) {
res = -errno;
pw_log_error("can't create session: %m");
return s;
}
s->sess = sess;
if ((res = parse_address(at, s->info.port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) {
pw_log_error("invalid address %s: %s", at, spa_strerror(res));
}
if ((res = parse_address(at, s->info.port+1, &sess->data_addr, &sess->data_len)) < 0) {
pw_log_error("invalid address %s: %s", at, spa_strerror(res));
}
return s;
}
static struct service *find_service(struct impl *impl, const struct service_info *info)
{
struct service *s;
spa_list_for_each(s, &impl->service_list, link) {
if (s->info.interface == info->interface &&
s->info.protocol == info->protocol &&
spa_streq(s->info.name, info->name) &&
spa_streq(s->info.type, info->type) &&
spa_streq(s->info.domain, info->domain))
return s;
}
return NULL;
}
static void free_service(struct service *s)
{
spa_list_remove(&s->link);
free_session(s->sess);
free((char *) s->info.name);
free((char *) s->info.type);
free((char *) s->info.domain);
free((char *) s->info.host_name);
free(s);
}
static void resolver_cb(AvahiServiceResolver *r, AvahiIfIndex interface, AvahiProtocol protocol,
AvahiResolverEvent event, const char *name, const char *type, const char *domain,
const char *host_name, const AvahiAddress *a, uint16_t port, AvahiStringList *txt,
AvahiLookupResultFlags flags, void *userdata)
{
struct impl *impl = userdata;
struct service *s;
struct service_info sinfo;
if (event != AVAHI_RESOLVER_FOUND) {
pw_log_error("Resolving of '%s' failed: %s", name,
avahi_strerror(avahi_client_errno(impl->client)));
goto done;
}
sinfo = SERVICE_INFO(.interface = interface,
.protocol = protocol,
.name = name,
.type = type,
.domain = domain,
.host_name = host_name,
.address = *a,
.port = port);
s = make_service(impl, &sinfo, txt);
if (s == NULL) {
pw_log_error("Can't make service: %m");
goto done;
}
done:
avahi_service_resolver_free(r);
}
static void browser_cb(AvahiServiceBrowser *b, AvahiIfIndex interface, AvahiProtocol protocol,
AvahiBrowserEvent event, const char *name, const char *type, const char *domain,
AvahiLookupResultFlags flags, void *userdata)
{
struct impl *impl = userdata;
struct service_info info;
struct service *s;
if (flags & AVAHI_LOOKUP_RESULT_LOCAL)
return;
info = SERVICE_INFO(.interface = interface,
.protocol = protocol,
.name = name,
.type = type,
.domain = domain);
s = find_service(impl, &info);
switch (event) {
case AVAHI_BROWSER_NEW:
if (s != NULL)
return;
if (!(avahi_service_resolver_new(impl->client,
interface, protocol,
name, type, domain,
AVAHI_PROTO_UNSPEC, 0,
resolver_cb, impl)))
pw_log_error("can't make service resolver: %s",
avahi_strerror(avahi_client_errno(impl->client)));
break;
case AVAHI_BROWSER_REMOVE:
if (s == NULL)
return;
free_service(s);
break;
default:
break;
}
}
static int make_browser(struct impl *impl)
{
if (impl->browser == NULL) {
impl->browser = avahi_service_browser_new(impl->client,
AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
"_apple-midi._udp", NULL, 0,
browser_cb, impl);
}
if (impl->browser == NULL) {
pw_log_error("can't make browser: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
}
return 0;
}
static void entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata)
{
switch (state) {
case AVAHI_ENTRY_GROUP_ESTABLISHED:
pw_log_info("Service successfully established");
break;
case AVAHI_ENTRY_GROUP_COLLISION:
pw_log_error("Service name collision");
break;
case AVAHI_ENTRY_GROUP_FAILURE:
pw_log_error("Entry group failure: %s",
avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
break;
case AVAHI_ENTRY_GROUP_UNCOMMITED:
case AVAHI_ENTRY_GROUP_REGISTERING:;
break;
}
}
static int make_announce(struct impl *impl)
{
int res;
if (impl->group == NULL) {
impl->group = avahi_entry_group_new(impl->client,
entry_group_callback, impl);
}
if (impl->group == NULL) {
pw_log_error("can't make group: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
}
avahi_entry_group_reset(impl->group);
res = avahi_entry_group_add_service(impl->group,
AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
(AvahiPublishFlags)0, impl->session_name,
"_apple-midi._udp", NULL, NULL,
impl->ctrl_port, NULL);
if (res < 0) {
pw_log_error("can't add service: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
}
if ((res = avahi_entry_group_commit(impl->group)) < 0) {
pw_log_error("can't commit group: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
}
return 0;
}
static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata)
{
struct impl *impl = userdata;
impl->client = c;
switch (state) {
case AVAHI_CLIENT_S_REGISTERING:
case AVAHI_CLIENT_S_RUNNING:
case AVAHI_CLIENT_S_COLLISION:
make_browser(impl);
make_announce(impl);
break;
case AVAHI_CLIENT_FAILURE:
case AVAHI_CLIENT_CONNECTING:
break;
default:
break;
}
}
static void copy_props(struct impl *impl, struct pw_properties *props, const char *key)
{
const char *str;
@ -961,6 +1225,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
args = "";
spa_list_init(&impl->sessions);
spa_list_init(&impl->service_list);
props = pw_properties_new_string(args);
if (props == NULL) {
@ -1015,16 +1280,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
str = pw_properties_get(props, "local.ifname");
impl->ifname = str ? strdup(str) : NULL;
port = DEFAULT_SOURCE_PORT;
port = pw_properties_get_uint32(props, "control.port", port);
port = pw_properties_get_uint32(props, "control.port", DEFAULT_CONTROL_PORT);
if ((str = pw_properties_get(props, "control.ip")) == NULL)
str = DEFAULT_SOURCE_IP;
str = DEFAULT_CONTROL_IP;
impl->ctrl_port = port;
if ((res = parse_address(str, port, &impl->ctrl_addr, &impl->ctrl_len)) < 0) {
pw_log_error("invalid control.ip %s: %s", str, spa_strerror(res));
goto out;
}
if ((res = parse_address(str, port ? port+1 : 0, &impl->src_addr, &impl->src_len)) < 0) {
if ((res = parse_address(str, port ? port+1 : 0, &impl->data_addr, &impl->data_len)) < 0) {
pw_log_error("invalid data.ip %s: %s", str, spa_strerror(res));
goto out;
}
@ -1046,9 +1312,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->session_name = str ? strdup(str) : NULL;
pw_properties_set(stream_props, "rtp.session", impl->session_name);
get_ip(&impl->src_addr, addr, sizeof(addr), &port);
pw_properties_set(stream_props, "rtp.source.ip", addr);
pw_properties_setf(stream_props, "rtp.source.port", "%u", port);
get_ip(&impl->ctrl_addr, addr, sizeof(addr), &impl->ctrl_port);
pw_properties_set(stream_props, "rtp.control.ip", addr);
pw_properties_setf(stream_props, "rtp.control.port", "%u", impl->ctrl_port);
pw_properties_setf(stream_props, "rtp.mtu", "%u", impl->mtu);
pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl);
@ -1084,33 +1350,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
&impl->core_listener,
&core_events, impl);
str = pw_properties_get(props, "destination.ip");
if (str != NULL) {
struct session *sess;
port = pw_properties_get_uint32(props, "destination.port", 0);
if (port == 0) {
pw_log_error("invalid destination.port");
goto out;
}
sess = make_session(impl, impl->session_name);
if (sess == NULL) {
res = -errno;
pw_log_error("can't create session: %m");
goto out;
}
if ((res = parse_address(str, port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) {
pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res));
goto out;
}
if ((res = parse_address(str, port+1, &sess->data_addr, &sess->data_len)) < 0) {
pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res));
goto out;
}
}
if ((res = setup_apple_session(impl)) < 0)
goto out;
impl->avahi_poll = pw_avahi_poll_new(impl->loop);
if ((impl->client = avahi_client_new(impl->avahi_poll,
AVAHI_CLIENT_NO_FAIL,
client_callback, impl,
&res)) == NULL) {
pw_log_error("can't create avahi client: %s", avahi_strerror(res));
goto out;
}
pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);