diff --git a/src/modules/meson.build b/src/modules/meson.build index a133d0338..7340022a8 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -536,15 +536,19 @@ pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-sink', dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], ) -pipewire_module_rtp_sink = shared_library('pipewire-module-rtp-session', - [ 'module-rtp/stream.c', - 'module-rtp-session.c' ], - include_directories : [configinc], - install : true, - install_dir : modules_install_dir, - install_rpath: modules_install_dir, - dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], -) +build_module_rtp_session = avahi_dep.found() +if build_module_rtp_session + pipewire_module_rtp_session = shared_library('pipewire-module-rtp-session', + [ 'module-rtp/stream.c', + 'module-zeroconf-discover/avahi-poll.c', + 'module-rtp-session.c' ], + include_directories : [configinc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, avahi_dep], + ) +endif build_module_roc = roc_dep.found() if build_module_roc diff --git a/src/modules/module-rtp-session.c b/src/modules/module-rtp-session.c index a52316646..acb938c99 100644 --- a/src/modules/module-rtp-session.c +++ b/src/modules/module-rtp-session.c @@ -26,6 +26,13 @@ #include #include +#include +#include +#include +#include + +#include "module-zeroconf-discover/avahi-poll.h" + #include #include #include @@ -39,10 +46,9 @@ * * Options specific to the behavior of this module * - * - `source.ip =`: source IP address, default "0.0.0.0" - * - `destination.ip =`: destination IP address, default "224.0.0.56" - * - `destination.port =`: destination port, default random beteen 46000 and 47024 * - `local.ifname = `: interface name to use + * - `control.ip =`: control IP address, default "0.0.0.0" + * - `control.port =`: control port, default "0" * - `net.mtu = `: MTU to use, default 1280 * - `net.ttl = `: TTL to use, default 1 * - `net.loop = `: loopback multicast, default false @@ -77,9 +83,8 @@ * { name = libpipewire-module-rtp-sink * args = { * #local.ifname = "eth0" - * #source.ip = "0.0.0.0" - * #destination.ip = "224.0.0.56" - * #destination.port = 46000 + * #control.ip = "0.0.0.0" + * #control.port = 0 * #net.mtu = 1280 * #net.ttl = 1 * #net.loop = false @@ -87,12 +92,12 @@ * #sess.max-ptime = 20 * #sess.name = "PipeWire RTP stream" * #rtp.media = "audio" - * #audio.format = "S16BE" - * #audio.rate = 48000 - * #audio.channels = 2 - * #audio.position = [ FL FR ] * stream.props = { * node.name = "rtp-sink" + * #audio.format = "S16BE" + * #audio.rate = 48000 + * #audio.channels = 2 + * #audio.position = [ FL FR ] * } * } *} @@ -115,19 +120,16 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME); #define DEFAULT_CHANNELS 2 #define DEFAULT_POSITION "[ FL FR ]" -#define DEFAULT_SOURCE_IP "0.0.0.0" -#define DEFAULT_SOURCE_PORT 0 -#define DEFAULT_DESTINATION_IP "224.0.0.56" -#define DEFAULT_DESTINATION_PORT 46000 +#define DEFAULT_CONTROL_IP "0.0.0.0" +#define DEFAULT_CONTROL_PORT 0 #define DEFAULT_TTL 1 #define DEFAULT_MTU 1280 #define DEFAULT_LOOP false #define DEFAULT_TS_OFFSET -1 -#define USAGE "source.ip= " \ - "destination.ip= " \ - "destination.port= " \ +#define USAGE "control.ip= " \ + "control.port= " \ "local.ifname= " \ "net.mtu= " \ "net.ttl= " \ @@ -149,6 +151,28 @@ static const struct spa_dict_item module_info[] = { { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, }; +struct service_info { + AvahiIfIndex interface; + AvahiProtocol protocol; + const char *name; + const char *type; + const char *domain; + const char *host_name; + AvahiAddress address; + uint16_t port; +}; + +#define SERVICE_INFO(...) ((struct service_info){ __VA_ARGS__ }) + +struct service { + struct service_info info; + + struct spa_list link; + struct impl *impl; + + struct session *sess; +}; + struct session { struct impl *impl; struct spa_list link; @@ -182,6 +206,12 @@ struct impl { struct spa_hook module_listener; struct pw_properties *props; + AvahiPoll *avahi_poll; + AvahiClient *client; + AvahiServiceBrowser *browser; + AvahiEntryGroup *group; + struct spa_list service_list; + struct pw_properties *stream_props; struct pw_loop *loop; @@ -206,10 +236,11 @@ struct impl { char *ts_refclk; int payload; + uint16_t ctrl_port; struct sockaddr_storage ctrl_addr; socklen_t ctrl_len; - struct sockaddr_storage src_addr; - socklen_t src_len; + struct sockaddr_storage data_addr; + socklen_t data_len; struct spa_list sessions; uint32_t n_sessions; @@ -831,7 +862,7 @@ static int setup_apple_session(struct impl *impl) if (impl->ctrl_source == NULL) return -errno; - if ((fd = make_socket(&impl->src_addr, impl->src_len, + if ((fd = make_socket(&impl->data_addr, impl->data_len, impl->mcast_loop, impl->ttl, impl->ifname)) < 0) return fd; @@ -929,6 +960,239 @@ static int parse_address(const char *address, uint16_t port, return 0; } +static struct service *make_service(struct impl *impl, const struct service_info *info, + AvahiStringList *txt) +{ + struct service *s; + char at[AVAHI_ADDRESS_STR_MAX]; + struct session *sess; + int res; + + s = calloc(1, sizeof(*s)); + if (s == NULL) + return NULL; + + s->info.interface = info->interface; + s->info.protocol = info->protocol; + s->info.name = strdup(info->name); + s->info.type = strdup(info->type); + s->info.domain = strdup(info->domain); + s->info.host_name = strdup(info->host_name); + s->info.address = info->address; + s->info.port = info->port; + + s->impl = impl; + spa_list_append(&impl->service_list, &s->link); + + avahi_address_snprint(at, sizeof(at), &s->info.address); + pw_log_info("create session: %s %s:%u", s->info.name, at, s->info.port); + + sess = make_session(impl, s->info.name); + if (sess == NULL) { + res = -errno; + pw_log_error("can't create session: %m"); + return s; + } + s->sess = sess; + + if ((res = parse_address(at, s->info.port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) { + pw_log_error("invalid address %s: %s", at, spa_strerror(res)); + } + if ((res = parse_address(at, s->info.port+1, &sess->data_addr, &sess->data_len)) < 0) { + pw_log_error("invalid address %s: %s", at, spa_strerror(res)); + } + return s; +} + +static struct service *find_service(struct impl *impl, const struct service_info *info) +{ + struct service *s; + spa_list_for_each(s, &impl->service_list, link) { + if (s->info.interface == info->interface && + s->info.protocol == info->protocol && + spa_streq(s->info.name, info->name) && + spa_streq(s->info.type, info->type) && + spa_streq(s->info.domain, info->domain)) + return s; + } + return NULL; +} + +static void free_service(struct service *s) +{ + spa_list_remove(&s->link); + + free_session(s->sess); + + free((char *) s->info.name); + free((char *) s->info.type); + free((char *) s->info.domain); + free((char *) s->info.host_name); + free(s); +} + +static void resolver_cb(AvahiServiceResolver *r, AvahiIfIndex interface, AvahiProtocol protocol, + AvahiResolverEvent event, const char *name, const char *type, const char *domain, + const char *host_name, const AvahiAddress *a, uint16_t port, AvahiStringList *txt, + AvahiLookupResultFlags flags, void *userdata) +{ + struct impl *impl = userdata; + struct service *s; + struct service_info sinfo; + + if (event != AVAHI_RESOLVER_FOUND) { + pw_log_error("Resolving of '%s' failed: %s", name, + avahi_strerror(avahi_client_errno(impl->client))); + goto done; + } + + sinfo = SERVICE_INFO(.interface = interface, + .protocol = protocol, + .name = name, + .type = type, + .domain = domain, + .host_name = host_name, + .address = *a, + .port = port); + + s = make_service(impl, &sinfo, txt); + if (s == NULL) { + pw_log_error("Can't make service: %m"); + goto done; + } + +done: + avahi_service_resolver_free(r); +} + +static void browser_cb(AvahiServiceBrowser *b, AvahiIfIndex interface, AvahiProtocol protocol, + AvahiBrowserEvent event, const char *name, const char *type, const char *domain, + AvahiLookupResultFlags flags, void *userdata) +{ + struct impl *impl = userdata; + struct service_info info; + struct service *s; + + if (flags & AVAHI_LOOKUP_RESULT_LOCAL) + return; + + info = SERVICE_INFO(.interface = interface, + .protocol = protocol, + .name = name, + .type = type, + .domain = domain); + + s = find_service(impl, &info); + + switch (event) { + case AVAHI_BROWSER_NEW: + if (s != NULL) + return; + if (!(avahi_service_resolver_new(impl->client, + interface, protocol, + name, type, domain, + AVAHI_PROTO_UNSPEC, 0, + resolver_cb, impl))) + pw_log_error("can't make service resolver: %s", + avahi_strerror(avahi_client_errno(impl->client))); + break; + case AVAHI_BROWSER_REMOVE: + if (s == NULL) + return; + free_service(s); + break; + default: + break; + } +} + +static int make_browser(struct impl *impl) +{ + if (impl->browser == NULL) { + impl->browser = avahi_service_browser_new(impl->client, + AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, + "_apple-midi._udp", NULL, 0, + browser_cb, impl); + } + if (impl->browser == NULL) { + pw_log_error("can't make browser: %s", + avahi_strerror(avahi_client_errno(impl->client))); + return -EIO; + } + return 0; +} + +static void entry_group_callback(AvahiEntryGroup *g, AvahiEntryGroupState state, void *userdata) +{ + switch (state) { + case AVAHI_ENTRY_GROUP_ESTABLISHED: + pw_log_info("Service successfully established"); + break; + case AVAHI_ENTRY_GROUP_COLLISION: + pw_log_error("Service name collision"); + break; + case AVAHI_ENTRY_GROUP_FAILURE: + pw_log_error("Entry group failure: %s", + avahi_strerror(avahi_client_errno(avahi_entry_group_get_client(g)))); + break; + case AVAHI_ENTRY_GROUP_UNCOMMITED: + case AVAHI_ENTRY_GROUP_REGISTERING:; + break; + } +} + +static int make_announce(struct impl *impl) +{ + int res; + + if (impl->group == NULL) { + impl->group = avahi_entry_group_new(impl->client, + entry_group_callback, impl); + } + if (impl->group == NULL) { + pw_log_error("can't make group: %s", + avahi_strerror(avahi_client_errno(impl->client))); + return -EIO; + } + avahi_entry_group_reset(impl->group); + + res = avahi_entry_group_add_service(impl->group, + AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC, + (AvahiPublishFlags)0, impl->session_name, + "_apple-midi._udp", NULL, NULL, + impl->ctrl_port, NULL); + if (res < 0) { + pw_log_error("can't add service: %s", + avahi_strerror(avahi_client_errno(impl->client))); + return -EIO; + } + if ((res = avahi_entry_group_commit(impl->group)) < 0) { + pw_log_error("can't commit group: %s", + avahi_strerror(avahi_client_errno(impl->client))); + return -EIO; + } + return 0; +} +static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) +{ + struct impl *impl = userdata; + impl->client = c; + + switch (state) { + case AVAHI_CLIENT_S_REGISTERING: + case AVAHI_CLIENT_S_RUNNING: + case AVAHI_CLIENT_S_COLLISION: + make_browser(impl); + make_announce(impl); + break; + case AVAHI_CLIENT_FAILURE: + case AVAHI_CLIENT_CONNECTING: + break; + default: + break; + } +} + static void copy_props(struct impl *impl, struct pw_properties *props, const char *key) { const char *str; @@ -961,6 +1225,7 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) args = ""; spa_list_init(&impl->sessions); + spa_list_init(&impl->service_list); props = pw_properties_new_string(args); if (props == NULL) { @@ -1015,16 +1280,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) str = pw_properties_get(props, "local.ifname"); impl->ifname = str ? strdup(str) : NULL; - port = DEFAULT_SOURCE_PORT; - port = pw_properties_get_uint32(props, "control.port", port); + port = pw_properties_get_uint32(props, "control.port", DEFAULT_CONTROL_PORT); if ((str = pw_properties_get(props, "control.ip")) == NULL) - str = DEFAULT_SOURCE_IP; + str = DEFAULT_CONTROL_IP; + + impl->ctrl_port = port; if ((res = parse_address(str, port, &impl->ctrl_addr, &impl->ctrl_len)) < 0) { pw_log_error("invalid control.ip %s: %s", str, spa_strerror(res)); goto out; } - if ((res = parse_address(str, port ? port+1 : 0, &impl->src_addr, &impl->src_len)) < 0) { + if ((res = parse_address(str, port ? port+1 : 0, &impl->data_addr, &impl->data_len)) < 0) { pw_log_error("invalid data.ip %s: %s", str, spa_strerror(res)); goto out; } @@ -1046,9 +1312,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) impl->session_name = str ? strdup(str) : NULL; pw_properties_set(stream_props, "rtp.session", impl->session_name); - get_ip(&impl->src_addr, addr, sizeof(addr), &port); - pw_properties_set(stream_props, "rtp.source.ip", addr); - pw_properties_setf(stream_props, "rtp.source.port", "%u", port); + get_ip(&impl->ctrl_addr, addr, sizeof(addr), &impl->ctrl_port); + pw_properties_set(stream_props, "rtp.control.ip", addr); + pw_properties_setf(stream_props, "rtp.control.port", "%u", impl->ctrl_port); pw_properties_setf(stream_props, "rtp.mtu", "%u", impl->mtu); pw_properties_setf(stream_props, "rtp.ttl", "%u", impl->ttl); @@ -1084,33 +1350,17 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args) &impl->core_listener, &core_events, impl); - str = pw_properties_get(props, "destination.ip"); - if (str != NULL) { - struct session *sess; - - port = pw_properties_get_uint32(props, "destination.port", 0); - if (port == 0) { - pw_log_error("invalid destination.port"); - goto out; - } - sess = make_session(impl, impl->session_name); - if (sess == NULL) { - res = -errno; - pw_log_error("can't create session: %m"); - goto out; - } - if ((res = parse_address(str, port, &sess->ctrl_addr, &sess->ctrl_len)) < 0) { - pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); - goto out; - } - if ((res = parse_address(str, port+1, &sess->data_addr, &sess->data_len)) < 0) { - pw_log_error("invalid destination.ip %s: %s", str, spa_strerror(res)); - goto out; - } - } if ((res = setup_apple_session(impl)) < 0) goto out; + impl->avahi_poll = pw_avahi_poll_new(impl->loop); + if ((impl->client = avahi_client_new(impl->avahi_poll, + AVAHI_CLIENT_NO_FAIL, + client_callback, impl, + &res)) == NULL) { + pw_log_error("can't create avahi client: %s", avahi_strerror(res)); + goto out; + } pw_impl_module_add_listener(module, &impl->module_listener, &module_events, impl);