module-vban: add stream.rules support

Make it possible to match the new VBAN streams on ip/port/name and media
properties and create a stream with custom properties.

See #4402
This commit is contained in:
Wim Taymans 2024-11-14 09:59:12 +01:00
parent 1a5514e5cf
commit 388f330700

View file

@ -50,11 +50,25 @@
* Options specific to the behavior of this module * Options specific to the behavior of this module
* *
* - `local.ifname = <str>`: interface name to use * - `local.ifname = <str>`: interface name to use
* - `source.ip = <str>`: the source ip address, default 127.0.0.1 * - `source.ip = <str>`: the source ip address to listen on, default 127.0.0.1
* - `source.port = <int>`: the source port, default 6980 * - `source.port = <int>`: the source port to listen on, default 6980
* - `node.always-process = <bool>`: true to receive even when not running * - `node.always-process = <bool>`: true to receive even when not running
* - `sess.latency.msec = <str>`: target network latency in milliseconds, default 100 * - `sess.latency.msec = <str>`: target network latency in milliseconds, default 100
* - `stream.props = {}`: properties to be passed to the stream * - `stream.props = {}`: properties to be passed to all the stream
* - `stream.rules` = <rules>: match rules, use create-stream actions.
*
* ### stream.rules matches
*
* - `vban.ip`: the IP address of the VBAN sender
* - `vban.port`: the port of the VBAN sender
* - `sess.name`: the name of the VBAN stream
*
* ### stream.rules create-stream
*
* In addition to all the properties that can be passed to a stream,
* you can also set:
*
* - `sess.latency.msec = <str>`: target network latency in milliseconds, default 100
* *
* ## General options * ## General options
* *
@ -85,8 +99,32 @@
* #audio.position = [ FL FR ] * #audio.position = [ FL FR ]
* stream.props = { * stream.props = {
* #media.class = "Audio/Source" * #media.class = "Audio/Source"
* node.name = "vban-receiver" * #node.name = "vban-receiver"
* } * }
* stream.rules = [
* { matches = [
* { sess.name = "~.*"
* #sess.media = "audio" | "midi"
* #vban.ip = ""
* #vban.port = 1000
* #audio.channels = 2
* #audio.format = "U8" | "S16LE" | "S24LE" | "S32LE" | "F32LE" | "F64LE"
* #audio.rate = 44100
* }
* ]
* actions = {
* create-stream = {
* stream.props = {
* #sess.latency.msec = 100
* #target.object = ""
* #audio.position = [ FL FR ]
* #media.class = "Audio/Source"
* #node.name = "vban-receiver"
* }
* }
* }
* }
* ]
* } * }
* } * }
* ] * ]
@ -104,12 +142,16 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_SOURCE_IP "127.0.0.1" #define DEFAULT_SOURCE_IP "127.0.0.1"
#define DEFAULT_SOURCE_PORT 6980 #define DEFAULT_SOURCE_PORT 6980
#define DEFAULT_CREATE_RULES \
"[ { matches = [ { sess.name = \"~.*\" } ] actions = { create-stream = { } } } ] "
#define USAGE "( local.ifname=<local interface name to use> ) " \ #define USAGE "( local.ifname=<local interface name to use> ) " \
"( source.ip=<source IP address, default:"DEFAULT_SOURCE_IP"> ) " \ "( source.ip=<source IP address, default:"DEFAULT_SOURCE_IP"> ) " \
"( source.port=<int, source port, default:"SPA_STRINGIFY(DEFAULT_SOURCE_PORT)"> " \ "( source.port=<int, source port, default:"SPA_STRINGIFY(DEFAULT_SOURCE_PORT)"> " \
"( sess.latency.msec=<target network latency, default "SPA_STRINGIFY(DEFAULT_SESS_LATENCY)"> ) "\ "( sess.latency.msec=<target network latency, default "SPA_STRINGIFY(DEFAULT_SESS_LATENCY)"> ) "\
"( audio.position=<channel map, default:"DEFAULT_POSITION"> ) " \ "( audio.position=<channel map, default:"DEFAULT_POSITION"> ) " \
"( stream.props= { key=value ... } ) " "( stream.props= { key=value ... } ) " \
"( stream.rules=<rules>, use create-stream actions )"
static const struct spa_dict_item module_info[] = { static const struct spa_dict_item module_info[] = {
{ PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" }, { PW_KEY_MODULE_AUTHOR, "Wim Taymans <wim.taymans@gmail.com>" },
@ -153,6 +195,9 @@ struct stream {
struct impl *impl; struct impl *impl;
struct vban_header header; struct vban_header header;
struct sockaddr_storage sa;
socklen_t salen;
struct vban_stream *stream; struct vban_stream *stream;
bool active; bool active;
@ -264,6 +309,55 @@ static const struct vban_stream_events stream_events = {
.state_changed = stream_state_changed, .state_changed = stream_state_changed,
}; };
static int create_stream(struct stream *s, struct pw_properties *props)
{
struct impl *impl = s->impl;
const char *sess_name, *ip, *port;
ip = pw_properties_get(props, "vban.ip");
port = pw_properties_get(props, "vban.port");
sess_name = pw_properties_get(props, "sess.name");
if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL)
pw_properties_setf(props, PW_KEY_NODE_NAME, "vban_session.%s.%s.%s", sess_name, ip, port);
if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL)
pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s from %s", sess_name, ip);
if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL)
pw_properties_setf(props, PW_KEY_MEDIA_NAME, "VBAN %s from %s",
sess_name, ip);
s->stream = vban_stream_new(impl->core,
PW_DIRECTION_OUTPUT, spa_steal_ptr(props),
&stream_events, s);
if (s->stream == NULL) {
pw_log_error("can't create stream: %m");
return -errno;
}
return 0;
}
struct match_info {
struct stream *stream;
const struct pw_properties *props;
bool matched;
};
static int rule_matched(void *data, const char *location, const char *action,
const char *str, size_t len)
{
struct match_info *i = data;
int res = 0;
i->matched = true;
if (spa_streq(action, "create-stream")) {
struct pw_properties *p = pw_properties_copy(i->props);
pw_properties_update_string(p, str, len);
create_stream(i->stream, p);
}
return res;
}
static int static int
do_setup_stream(struct spa_loop *loop, do_setup_stream(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -272,20 +366,17 @@ do_setup_stream(struct spa_loop *loop,
struct impl *impl = s->impl; struct impl *impl = s->impl;
struct pw_properties *props; struct pw_properties *props;
int res; int res;
const char *sess_name; const char *str;
char addr[128];
uint16_t port = 0;
props = pw_properties_copy(impl->stream_props); props = pw_properties_copy(impl->stream_props);
pw_properties_setf(props, "sess.name", "%s", s->header.stream_name); pw_net_get_ip(&s->sa, addr, sizeof(addr), NULL, &port);
sess_name = pw_properties_get(props, "sess.name");
if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) pw_properties_setf(props, "sess.name", "%s", s->header.stream_name);
pw_properties_setf(props, PW_KEY_NODE_NAME, "vban_session.%s", sess_name); pw_properties_setf(props, "vban.ip", "%s", addr);
if (pw_properties_get(props, PW_KEY_NODE_DESCRIPTION) == NULL) pw_properties_setf(props, "vban.port", "%u", port);
pw_properties_setf(props, PW_KEY_NODE_DESCRIPTION, "%s", sess_name);
if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL)
pw_properties_setf(props, PW_KEY_MEDIA_NAME, "VBAN Session %s",
sess_name);
if ((s->header.format_SR & 0xE0) == VBAN_PROTOCOL_AUDIO && if ((s->header.format_SR & 0xE0) == VBAN_PROTOCOL_AUDIO &&
(s->header.format_bit & 0xF0) == VBAN_CODEC_PCM) { (s->header.format_bit & 0xF0) == VBAN_CODEC_PCM) {
@ -332,20 +423,27 @@ do_setup_stream(struct spa_loop *loop,
goto error; goto error;
} }
s->stream = vban_stream_new(impl->core, if ((str = pw_properties_get(impl->props, "stream.rules")) == NULL)
PW_DIRECTION_OUTPUT, spa_steal_ptr(props), str = DEFAULT_CREATE_RULES;
&stream_events, s); if (str != NULL) {
if (s->stream == NULL) { struct match_info minfo = {
pw_log_error("can't create stream: %m"); .stream = s,
return -errno; .props = props,
};
pw_conf_match_rules(str, strlen(str), NAME, &props->dict,
rule_matched, &minfo);
if (!minfo.matched)
pw_log_info("unmatched stream found %s", str);
} }
return 0; res = 0;
error: error:
pw_properties_free(props); pw_properties_free(props);
return res; return res;
} }
static struct stream *make_stream(struct impl *impl, const struct vban_header *hdr) static struct stream *make_stream(struct impl *impl, const struct vban_header *hdr,
struct sockaddr_storage *sa, socklen_t salen)
{ {
struct stream *stream; struct stream *stream;
@ -355,6 +453,8 @@ static struct stream *make_stream(struct impl *impl, const struct vban_header *h
stream->impl = impl; stream->impl = impl;
stream->header = *hdr; stream->header = *hdr;
stream->sa = *sa;
stream->salen = salen;
spa_list_append(&impl->streams, &stream->link); spa_list_append(&impl->streams, &stream->link);
pw_loop_invoke(impl->loop, do_setup_stream, 1, NULL, 0, false, stream); pw_loop_invoke(impl->loop, do_setup_stream, 1, NULL, 0, false, stream);
@ -382,8 +482,11 @@ on_vban_io(void *data, int fd, uint32_t mask)
if (mask & SPA_IO_IN) { if (mask & SPA_IO_IN) {
struct vban_header *hdr; struct vban_header *hdr;
struct stream *s; struct stream *s;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if ((len = recv(fd, buffer, sizeof(buffer), 0)) < 0) if ((len = recvfrom(fd, buffer, sizeof(buffer), 0,
(struct sockaddr*)&sa, &salen)) < 0)
goto receive_error; goto receive_error;
if (len < VBAN_HEADER_SIZE) if (len < VBAN_HEADER_SIZE)
@ -395,7 +498,7 @@ on_vban_io(void *data, int fd, uint32_t mask)
s = find_stream(impl, hdr->stream_name); s = find_stream(impl, hdr->stream_name);
if (SPA_UNLIKELY(s == NULL)) if (SPA_UNLIKELY(s == NULL))
s = make_stream(impl, hdr); s = make_stream(impl, hdr, &sa, salen);
if (SPA_LIKELY(s != NULL && s->active)) { if (SPA_LIKELY(s != NULL && s->active)) {
s->receiving = true; s->receiving = true;
vban_stream_receive_packet(s->stream, buffer, len); vban_stream_receive_packet(s->stream, buffer, len);