modules: port rtp-session to new zeroconf helper

This commit is contained in:
Wim Taymans 2026-02-27 13:50:49 +01:00
parent a1db2b8d35
commit db713c8264

View file

@ -27,12 +27,7 @@
#include <pipewire/pipewire.h>
#include <pipewire/impl.h>
#include <avahi-client/publish.h>
#include <avahi-client/lookup.h>
#include <avahi-common/error.h>
#include <avahi-common/malloc.h>
#include "zeroconf-utils/avahi-poll.h"
#include "zeroconf-utils/zeroconf.h"
#include <module-rtp/rtp.h>
#include <module-rtp/apple-midi.h>
@ -160,31 +155,21 @@ static const struct spa_dict_item module_info[] = {
};
struct service_info {
AvahiIfIndex interface;
AvahiProtocol protocol;
int ifindex;
int protocol;
const char *name;
const char *type;
const char *domain;
const char *host_name;
AvahiAddress address;
uint16_t port;
};
#define SERVICE_INFO(...) ((struct service_info){ __VA_ARGS__ })
struct service {
struct service_info info;
struct spa_list link;
struct impl *impl;
struct session *sess;
};
struct session {
struct impl *impl;
struct spa_list link;
struct service_info info;
struct sockaddr_storage ctrl_addr;
socklen_t ctrl_len;
struct sockaddr_storage data_addr;
@ -229,11 +214,8 @@ struct impl {
struct pw_properties *props;
bool discover_local;
AvahiPoll *avahi_poll;
AvahiClient *client;
AvahiServiceBrowser *browser;
AvahiEntryGroup *group;
struct spa_list service_list;
struct pw_zeroconf *zeroconf;
struct spa_hook zeroconf_listener;
struct pw_properties *stream_props;
@ -595,6 +577,9 @@ static void free_session(struct session *sess)
if (sess->recv)
rtp_stream_destroy(sess->recv);
free(sess->name);
free((char *) sess->info.name);
free((char *) sess->info.type);
free((char *) sess->info.domain);
free(sess);
}
@ -612,7 +597,8 @@ static bool cmp_ip(const struct sockaddr_storage *sa, const struct sockaddr_stor
return false;
}
static struct session *make_session(struct impl *impl, struct pw_properties *props)
static struct session *make_session(struct impl *impl, struct service_info *info,
struct pw_properties *props)
{
struct session *sess;
const char *str;
@ -627,6 +613,11 @@ static struct session *make_session(struct impl *impl, struct pw_properties *pro
sess->impl = impl;
sess->ssrc = pw_rand32();
sess->info.ifindex = info->ifindex;
sess->info.protocol = info->protocol;
sess->info.name = strdup(info->name);
sess->info.type = strdup(info->type);
sess->info.domain = strdup(info->domain);
str = pw_properties_get(props, "sess.name");
sess->name = str ? strdup(str) : strdup("RTP Session");
@ -669,6 +660,21 @@ error:
return NULL;
}
static struct session *find_session_by_info(struct impl *impl,
const struct service_info *info)
{
struct session *s;
spa_list_for_each(s, &impl->sessions, link) {
if (s->info.ifindex == info->ifindex &&
s->info.protocol == info->protocol &&
spa_streq(s->info.name, info->name) &&
spa_streq(s->info.type, info->type) &&
spa_streq(s->info.domain, info->domain))
return s;
}
return NULL;
}
static struct session *find_session_by_addr_name(struct impl *impl,
const struct sockaddr_storage *sa, const char *name)
{
@ -1220,8 +1226,8 @@ static void impl_destroy(struct impl *impl)
if (impl->data_source)
pw_loop_destroy_source(impl->data_loop, impl->data_source);
if (impl->client)
avahi_client_free(impl->client);
if (impl->zeroconf)
pw_zeroconf_destroy(impl->zeroconf);
if (impl->data_loop)
pw_context_release_loop(impl->context, impl->data_loop);
@ -1263,20 +1269,6 @@ static const struct pw_core_events core_events = {
.error = on_core_error,
};
static void free_service(struct service *s)
{
spa_list_remove(&s->link);
if (s->sess)
free_session(s->sess);
free((char *) s->info.name);
free((char *) s->info.type);
free((char *) s->info.domain);
free((char *) s->info.host_name);
free(s);
}
static const char *get_service_name(struct impl *impl)
{
const char *str;
@ -1288,21 +1280,36 @@ static const char *get_service_name(struct impl *impl)
return NULL;
}
static struct service *make_service(struct impl *impl, const struct service_info *info,
AvahiStringList *txt)
static void on_zeroconf_added(void *data, void *user, const struct spa_dict *info)
{
struct service *s = NULL;
char at[AVAHI_ADDRESS_STR_MAX], if_suffix[16] = "";
struct impl *impl = data;
const char *str, *service_name, *address, *hostname;
struct service_info sinfo;
struct session *sess;
int res, ipv;
int ifindex = -1, protocol = 0, res, port = 0;
struct pw_properties *props = NULL;
const char *service_name, *str;
AvahiStringList *l;
bool compatible = true;
if ((str = spa_dict_lookup(info, "zeroconf.ifindex")))
ifindex = atoi(str);
if ((str = spa_dict_lookup(info, "zeroconf.protocol")))
protocol = atoi(str);
if ((str = spa_dict_lookup(info, "zeroconf.port")))
port = atoi(str);
sinfo = SERVICE_INFO(.ifindex = ifindex,
.protocol = protocol,
.name = spa_dict_lookup(info, "zeroconf.session"),
.type = spa_dict_lookup(info, "zeroconf.service"),
.domain = spa_dict_lookup(info, "zeroconf.domain"));
sess = find_session_by_info(impl, &sinfo);
if (sess != NULL)
return;
/* check for compatible session */
service_name = get_service_name(impl);
compatible = spa_streq(service_name, info->type);
compatible = spa_streq(service_name, sinfo.type);
props = pw_properties_copy(impl->stream_props);
if (props == NULL) {
@ -1312,53 +1319,51 @@ static struct service *make_service(struct impl *impl, const struct service_info
if (spa_streq(service_name, "_pipewire-audio._udp")) {
uint32_t mask = 0;
for (l = txt; l && compatible; l = l->next) {
const struct spa_dict_item *it;
spa_dict_for_each(it, info) {
const char *k = NULL;
char *key, *value;
if (avahi_string_list_get_pair(l, &key, &value, NULL) != 0)
if (!compatible)
break;
if (spa_streq(key, "subtype")) {
if (spa_streq(it->key, "subtype")) {
k = "sess.media";
mask |= 1<<0;
} else if (spa_streq(key, "format")) {
} else if (spa_streq(it->key, "format")) {
k = PW_KEY_AUDIO_FORMAT;
mask |= 1<<1;
} else if (spa_streq(key, "rate")) {
} else if (spa_streq(it->key, "rate")) {
k = PW_KEY_AUDIO_RATE;
mask |= 1<<2;
} else if (spa_streq(key, "channels")) {
} else if (spa_streq(it->key, "channels")) {
k = PW_KEY_AUDIO_CHANNELS;
mask |= 1<<3;
} else if (spa_streq(key, "position")) {
} else if (spa_streq(it->key, "position")) {
pw_properties_set(props,
SPA_KEY_AUDIO_POSITION, value);
} else if (spa_streq(key, "layout")) {
SPA_KEY_AUDIO_POSITION, it->value);
} else if (spa_streq(it->key, "layout")) {
pw_properties_set(props,
SPA_KEY_AUDIO_LAYOUT, value);
} else if (spa_streq(key, "channelnames")) {
SPA_KEY_AUDIO_LAYOUT, it->value);
} else if (spa_streq(it->key, "channelnames")) {
pw_properties_set(props,
PW_KEY_NODE_CHANNELNAMES, value);
} else if (spa_streq(key, "ts-refclk")) {
PW_KEY_NODE_CHANNELNAMES, it->value);
} else if (spa_streq(it->key, "ts-refclk")) {
pw_properties_set(props,
"sess.ts-refclk", value);
if (spa_streq(value, impl->ts_refclk))
"sess.ts-refclk", it->value);
if (spa_streq(it->value, impl->ts_refclk))
pw_properties_set(props,
"sess.ts-direct", "true");
} else if (spa_streq(key, "ts-offset")) {
} else if (spa_streq(it->key, "ts-offset")) {
uint32_t v;
if (spa_atou32(value, &v, 0))
if (spa_atou32(it->value, &v, 0))
pw_properties_setf(props,
"rtp.receiver-ts-offset", "%u", v);
}
if (k != NULL) {
str = pw_properties_get(props, k);
if (str == NULL || !spa_streq(str, value))
if (str == NULL || !spa_streq(str, it->value))
compatible = false;
}
avahi_free(key);
avahi_free(value);
}
str = pw_properties_get(props, "sess.media");
if (spa_streq(str, "opus") && mask != 0xd)
@ -1368,281 +1373,147 @@ static struct service *make_service(struct impl *impl, const struct service_info
}
if (!compatible) {
pw_log_info("found incompatible session IP%d:%s",
info->protocol == AVAHI_PROTO_INET ? 4 : 6,
info->name);
sinfo.protocol, sinfo.name);
res = 0;
goto error;
}
s = calloc(1, sizeof(*s));
if (s == NULL) {
res = -errno;
goto error;
}
address = spa_dict_lookup(info, "zeroconf.address");
hostname = spa_dict_lookup(info, "zeroconf.hostname");
s->impl = impl;
spa_list_append(&impl->service_list, &s->link);
pw_log_info("create session: %s %s:%u %s", sinfo.name, address, port, sinfo.type);
s->info.interface = info->interface;
s->info.protocol = info->protocol;
s->info.name = strdup(info->name);
s->info.type = strdup(info->type);
s->info.domain = strdup(info->domain);
s->info.host_name = strdup(info->host_name);
s->info.address = info->address;
s->info.port = info->port;
avahi_address_snprint(at, sizeof(at), &s->info.address);
pw_log_info("create session: %s %s:%u %s", s->info.name, at, s->info.port, s->info.type);
if (s->info.protocol == AVAHI_PROTO_INET6 &&
s->info.address.data.ipv6.address[0] == 0xfe &&
(s->info.address.data.ipv6.address[1] & 0xc0) == 0x80)
snprintf(if_suffix, sizeof(if_suffix), "%%%d", s->info.interface);
ipv = s->info.protocol == AVAHI_PROTO_INET ? 4 : 6;
pw_properties_set(props, "sess.name", s->info.name);
pw_properties_setf(props, "destination.ip", "%s%s", at, if_suffix);
pw_properties_setf(props, "destination.ifindex", "%u", s->info.interface);
pw_properties_setf(props, "destination.port", "%u", s->info.port);
pw_properties_set(props, "sess.name", sinfo.name);
pw_properties_set(props, "destination.ip", address);
pw_properties_setf(props, "destination.ifindex", "%u", sinfo.ifindex);
pw_properties_setf(props, "destination.port", "%u", port);
if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL)
pw_properties_setf(props, PW_KEY_NODE_NAME, "rtp_session.%s.%s.ipv%d",
s->info.name, s->info.host_name, ipv);
sinfo.name, hostname, sinfo.protocol);
if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL)
pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s (IPv%d)",
s->info.name, ipv);
sinfo.name, sinfo.protocol);
if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL)
pw_properties_setf(props, PW_KEY_MEDIA_NAME, "RTP Session with %s (IPv%d)",
s->info.name, ipv);
sinfo.name, sinfo.protocol);
sess = make_session(impl, props);
props = NULL;
sess = make_session(impl, &sinfo, spa_steal_ptr(props));
if (sess == NULL) {
res = -errno;
pw_log_error("can't create session: %m");
goto error;
}
s->sess = sess;
if ((res = pw_net_parse_address(at, s->info.port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) {
pw_log_error("invalid address %s: %s", at, spa_strerror(res));
if ((res = pw_net_parse_address(address, port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) {
pw_log_error("invalid address %s: %s", address, spa_strerror(res));
}
if ((res = pw_net_parse_address(at, s->info.port+1, &sess->data_addr, &sess->data_len)) < 0) {
pw_log_error("invalid address %s: %s", at, spa_strerror(res));
if ((res = pw_net_parse_address(address, port+1, &sess->data_addr, &sess->data_len)) < 0) {
pw_log_error("invalid address %s: %s", address, spa_strerror(res));
}
return s;
return;
error:
pw_properties_free(props);
if (s != NULL)
free_service(s);
errno = -res;
return NULL;
return;
}
static struct service *find_service(struct impl *impl, const struct service_info *info)
static void on_zeroconf_removed(void *data, void *user, const struct spa_dict *info)
{
struct service *s;
spa_list_for_each(s, &impl->service_list, link) {
if (s->info.interface == info->interface &&
s->info.protocol == info->protocol &&
spa_streq(s->info.name, info->name) &&
spa_streq(s->info.type, info->type) &&
spa_streq(s->info.domain, info->domain))
return s;
}
return NULL;
}
static void resolver_cb(AvahiServiceResolver *r, AvahiIfIndex interface, AvahiProtocol protocol,
AvahiResolverEvent event, const char *name, const char *type, const char *domain,
const char *host_name, const AvahiAddress *a, uint16_t port, AvahiStringList *txt,
AvahiLookupResultFlags flags, void *userdata)
{
struct impl *impl = userdata;
struct impl *impl = data;
struct service_info sinfo;
struct session *sess;
const char *str;
int ifindex = -1, protocol = 0;
if (event != AVAHI_RESOLVER_FOUND) {
pw_log_error("Resolving of '%s' failed: %s", name,
avahi_strerror(avahi_client_errno(impl->client)));
goto done;
}
if ((str = spa_dict_lookup(info, "zeroconf.ifindex")))
ifindex = atoi(str);
if ((str = spa_dict_lookup(info, "zeroconf.protocol")))
protocol = atoi(str);
sinfo = SERVICE_INFO(.interface = interface,
sinfo = SERVICE_INFO(.ifindex = ifindex,
.protocol = protocol,
.name = name,
.type = type,
.domain = domain,
.host_name = host_name,
.address = *a,
.port = port);
.name = spa_dict_lookup(info, "zeroconf.session"),
.type = spa_dict_lookup(info, "zeroconf.service"),
.domain = spa_dict_lookup(info, "zeroconf.domain"));
make_service(impl, &sinfo, txt);
done:
avahi_service_resolver_free(r);
}
static void browser_cb(AvahiServiceBrowser *b, AvahiIfIndex interface, AvahiProtocol protocol,
AvahiBrowserEvent event, const char *name, const char *type, const char *domain,
AvahiLookupResultFlags flags, void *userdata)
{
struct impl *impl = userdata;
struct service_info info;
struct service *s;
if ((flags & AVAHI_LOOKUP_RESULT_LOCAL) && !impl->discover_local)
sess = find_session_by_info(impl, &sinfo);
if (sess == NULL)
return;
info = SERVICE_INFO(.interface = interface,
.protocol = protocol,
.name = name,
.type = type,
.domain = domain);
s = find_service(impl, &info);
switch (event) {
case AVAHI_BROWSER_NEW:
if (s != NULL)
return;
if (!(avahi_service_resolver_new(impl->client,
interface, protocol,
name, type, domain,
AVAHI_PROTO_UNSPEC, 0,
resolver_cb, impl)))
pw_log_error("can't make service resolver: %s",
avahi_strerror(avahi_client_errno(impl->client)));
break;
case AVAHI_BROWSER_REMOVE:
if (s == NULL)
return;
free_service(s);
break;
default:
break;
}
free_session(sess);
}
static int make_browser(struct impl *impl)
{
const char *service_name;
int res;
service_name = get_service_name(impl);
if (service_name == NULL)
return -EINVAL;
if (impl->browser == NULL) {
impl->browser = avahi_service_browser_new(impl->client,
AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
service_name, NULL, 0,
browser_cb, impl);
}
if (impl->browser == NULL) {
pw_log_error("can't make browser: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
if ((res = pw_zeroconf_set_browse(impl->zeroconf, impl,
&SPA_DICT_ITEMS(
SPA_DICT_ITEM("zeroconf.service", service_name)))) < 0) {
pw_log_error("can't make browser for %s: %s",
service_name, spa_strerror(res));
return res;
}
return 0;
}
static void entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata)
{
switch (state) {
case AVAHI_ENTRY_GROUP_ESTABLISHED:
pw_log_info("Service successfully established");
break;
case AVAHI_ENTRY_GROUP_COLLISION:
pw_log_error("Service name collision");
break;
case AVAHI_ENTRY_GROUP_FAILURE:
pw_log_error("Entry group failure: %s",
avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g))));
break;
case AVAHI_ENTRY_GROUP_UNCOMMITED:
case AVAHI_ENTRY_GROUP_REGISTERING:;
break;
}
}
static int make_announce(struct impl *impl)
{
int res;
const char *service_name, *str;
AvahiStringList *txt = NULL;
struct pw_properties *props;
props = pw_properties_new(NULL, NULL);
if ((service_name = get_service_name(impl)) == NULL)
return -ENOTSUP;
if (impl->group == NULL) {
impl->group = avahi_entry_group_new(impl->client,
entry_group_callback, impl);
}
if (impl->group == NULL) {
pw_log_error("can't make group: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
}
avahi_entry_group_reset(impl->group);
if (spa_streq(service_name, "_pipewire-audio._udp")) {
str = pw_properties_get(impl->props, "sess.media");
txt = avahi_string_list_add_pair(txt, "subtype", str);
pw_properties_set(props, "subtype", str);
if ((str = pw_properties_get(impl->stream_props, PW_KEY_AUDIO_FORMAT)) != NULL)
txt = avahi_string_list_add_pair(txt, "format", str);
pw_properties_set(props, "format", str);
if ((str = pw_properties_get(impl->stream_props, PW_KEY_AUDIO_RATE)) != NULL)
txt = avahi_string_list_add_pair(txt, "rate", str);
pw_properties_set(props, "rate", str);
if ((str = pw_properties_get(impl->stream_props, PW_KEY_AUDIO_CHANNELS)) != NULL)
txt = avahi_string_list_add_pair(txt, "channels", str);
pw_properties_set(props, "channels", str);
if ((str = pw_properties_get(impl->stream_props, SPA_KEY_AUDIO_POSITION)) != NULL)
txt = avahi_string_list_add_pair(txt, "position", str);
pw_properties_set(props, "position", str);
if ((str = pw_properties_get(impl->stream_props, SPA_KEY_AUDIO_LAYOUT)) != NULL)
txt = avahi_string_list_add_pair(txt, "layout", str);
pw_properties_set(props, "layout", str);
if ((str = pw_properties_get(impl->stream_props, PW_KEY_NODE_CHANNELNAMES)) != NULL)
txt = avahi_string_list_add_pair(txt, "channelnames", str);
pw_properties_set(props, "channelnames", str);
if (impl->ts_refclk != NULL) {
txt = avahi_string_list_add_pair(txt, "ts-refclk", impl->ts_refclk);
txt = avahi_string_list_add_printf(txt, "ts-offset=%u", impl->ts_offset);
pw_properties_set(props, "ts-refclk", impl->ts_refclk);
pw_properties_setf(props, "ts-offset", "%u", impl->ts_offset);
}
}
res = avahi_entry_group_add_service_strlst(impl->group,
AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
(AvahiPublishFlags)0, impl->session_name,
service_name, NULL, NULL,
impl->ctrl_port, txt);
avahi_string_list_free(txt);
pw_properties_set(props, "zeroconf.session", impl->session_name);
pw_properties_set(props, "zeroconf.service", service_name);
pw_properties_setf(props, "zeroconf.port", "%u", impl->ctrl_port);
res = pw_zeroconf_set_announce(impl->zeroconf, impl, &props->dict);
pw_properties_free(props);
if (res < 0) {
pw_log_error("can't add service: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
}
if ((res = avahi_entry_group_commit(impl->group)) < 0) {
pw_log_error("can't commit group: %s",
avahi_strerror(avahi_client_errno(impl->client)));
return -EIO;
pw_log_error("can't add service: %s", spa_strerror(res));
return res;
}
return 0;
}
static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata)
{
struct impl *impl = userdata;
impl->client = c;
switch (state) {
case AVAHI_CLIENT_S_REGISTERING:
case AVAHI_CLIENT_S_RUNNING:
case AVAHI_CLIENT_S_COLLISION:
make_browser(impl);
make_announce(impl);
break;
case AVAHI_CLIENT_FAILURE:
case AVAHI_CLIENT_CONNECTING:
break;
default:
break;
}
}
static const struct pw_zeroconf_events zeroconf_events = {
PW_VERSION_ZEROCONF_EVENTS,
.added = on_zeroconf_added,
.removed = on_zeroconf_removed,
};
static void copy_props(struct impl *impl, struct pw_properties *props, const char *key)
{
@ -1673,7 +1544,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
args = "";
spa_list_init(&impl->sessions);
spa_list_init(&impl->service_list);
props = pw_properties_new_string(args);
if (props == NULL) {
@ -1685,6 +1555,8 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->discover_local = pw_properties_get_bool(impl->props,
"sess.discover-local", false);
pw_properties_set(impl->props, "zeroconf.discover-local",
impl->discover_local ? "true" : "false");
stream_props = pw_properties_new(NULL, NULL);
if (stream_props == NULL) {
@ -1804,14 +1676,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if ((res = setup_apple_session(impl)) < 0)
goto out;
impl->avahi_poll = pw_avahi_poll_new(impl->context);
if ((impl->client = avahi_client_new(impl->avahi_poll,
AVAHI_CLIENT_NO_FAIL,
client_callback, impl,
&res)) == NULL) {
pw_log_error("can't create avahi client: %s", avahi_strerror(res));
impl->zeroconf = pw_zeroconf_new(impl->context, &impl->props->dict);
if (impl->zeroconf == NULL) {
res = -errno;
pw_log_error("can't create zeroconf: %m");
goto out;
}
pw_zeroconf_add_listener(impl->zeroconf, &impl->zeroconf_listener,
&zeroconf_events, impl);
make_browser(impl);
make_announce(impl);
pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);