netjack2: implement driver and manager roles correctly

The manager is actually not supposed to decide much about the number of
audio and midi ports. It should just suggest a default when connecting
driver doesn't know.

Add a audio.ports parameters to manager and driver to suggest/ask for
the amount of audio ports. Let the audio.position/audio.channels be a
specification of the channel mask in case it matches the requested
channels, otherwise use AUX channels for the ports.

This means that we must derive the mode (sink/source/audio/midi) from
the ports that are negotiated in the manager and the driver, so delay
this until after negotiation.

Make sure all the possible modes work. For midi only streams, we can't
wait for the session manager to perform a PortConfig so do that
ourselves. Make sure we only use a source trigger when we have a sink.

Fixes #4666
This commit is contained in:
Wim Taymans 2025-04-29 17:17:14 +02:00
parent 18a5f884be
commit 3856d29646
2 changed files with 180 additions and 130 deletions

View file

@ -53,7 +53,6 @@
* *
* ## Module Options * ## Module Options
* *
* - `driver.mode`: the driver mode, sink|source|duplex, default duplex
* - `local.ifname = <str>`: interface name to use * - `local.ifname = <str>`: interface name to use
* - `net.ip =<str>`: multicast IP address, default "225.3.19.154" * - `net.ip =<str>`: multicast IP address, default "225.3.19.154"
* - `net.port =<int>`: control port, default 19000 * - `net.port =<int>`: control port, default 19000
@ -64,11 +63,10 @@
* - `source.port =<int>`: port to bind to, default 0 (allocate) * - `source.port =<int>`: port to bind to, default 0 (allocate)
* - `netjack2.client-name`: the name of the NETJACK2 client. * - `netjack2.client-name`: the name of the NETJACK2 client.
* - `netjack2.latency`: the latency in cycles, default 2 * - `netjack2.latency`: the latency in cycles, default 2
* - `audio.channels`: the number of audio ports. Can also be added to the stream props. * - `audio.ports`: the number of audio ports. Can also be added to the stream props.
* Will always be configured to the channel count of the manager. The audio.position * A value of -1 will configure to the number of audio ports on the manager.
* can however be used to assign an audio position.
* - `midi.ports`: the number of midi ports. Can also be added to the stream props. * - `midi.ports`: the number of midi ports. Can also be added to the stream props.
* Will always be configured to the number of midi ports on the manager. * A value of -1 will configure to the number of midi ports on the manager.
* - `source.props`: Extra properties for the source filter. * - `source.props`: Extra properties for the source filter.
* - `sink.props`: Extra properties for the sink filter. * - `sink.props`: Extra properties for the sink filter.
* *
@ -94,10 +92,10 @@
* context.modules = [ * context.modules = [
* { name = libpipewire-module-netjack2-driver * { name = libpipewire-module-netjack2-driver
* args = { * args = {
* #driver.mode = duplex
* #netjack2.client-name = PipeWire * #netjack2.client-name = PipeWire
* #netjack2.latency = 2 * #netjack2.latency = 2
* #midi.ports = 0 * #midi.ports = 0
* #audio.ports = -1
* #audio.channels = 2 * #audio.channels = 2
* #audio.position = [ FL FR ] * #audio.position = [ FL FR ]
* source.props = { * source.props = {
@ -133,15 +131,13 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define NETWORK_MAX_LATENCY 30 #define NETWORK_MAX_LATENCY 30
#define DEFAULT_CLIENT_NAME "PipeWire" #define DEFAULT_CLIENT_NAME "PipeWire"
#define DEFAULT_CHANNELS 2 #define DEFAULT_MIDI_PORTS -1
#define DEFAULT_POSITION "[ FL FR ]" #define DEFAULT_AUDIO_PORTS -1
#define DEFAULT_MIDI_PORTS 1
#define FOLLOWER_INIT_TIMEOUT 1 #define FOLLOWER_INIT_TIMEOUT 10000
#define FOLLOWER_INIT_RETRY -1 #define FOLLOWER_INIT_RETRY -1
#define MODULE_USAGE "( remote.name=<remote> ) " \ #define MODULE_USAGE "( remote.name=<remote> ) " \
"( driver.mode=<sink|source|duplex> ) " \
"( local.ifname=<interface name> ) " \ "( local.ifname=<interface name> ) " \
"( net.ip=<ip address to use, default 225.3.19.154> ) " \ "( net.ip=<ip address to use, default 225.3.19.154> ) " \
"( net.port=<port to use, default 19000> ) " \ "( net.port=<port to use, default 19000> ) " \
@ -152,9 +148,10 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
"( source.port=<port to bind, default 0> ) " \ "( source.port=<port to bind, default 0> ) " \
"( netjack2.client-name=<name of the NETJACK2 client> ) " \ "( netjack2.client-name=<name of the NETJACK2 client> ) " \
"( netjack2.latency=<latency in cycles, default 2> ) " \ "( netjack2.latency=<latency in cycles, default 2> ) " \
"( midi.ports=<number of midi ports> ) " \ "( audio.ports=<number of midi ports, default -1> ) " \
"( audio.channels=<number of channels> ) " \ "( midi.ports=<number of midi ports, default -1> ) " \
"( audio.position=<channel map> ) " \ "( audio.channels=<number of channels, default 0> ) " \
"( audio.position=<channel map, default null> ) " \
"( source.props=<properties> ) " \ "( source.props=<properties> ) " \
"( sink.props=<properties> ) " "( sink.props=<properties> ) "
@ -181,11 +178,13 @@ struct stream {
struct pw_filter *filter; struct pw_filter *filter;
struct spa_hook listener; struct spa_hook listener;
int32_t wanted_n_midi;
int32_t wanted_n_audio;
struct spa_io_position *position; struct spa_io_position *position;
struct spa_audio_info_raw info; struct spa_audio_info_raw info;
uint32_t n_midi;
uint32_t n_ports; uint32_t n_ports;
struct port *ports[MAX_PORTS]; struct port *ports[MAX_PORTS];
@ -456,6 +455,7 @@ static void make_stream_ports(struct stream *s)
s->ports[i] = port; s->ports[i] = port;
} }
pw_filter_set_active(s->filter, true);
} }
static struct spa_pod *make_props_param(struct spa_pod_builder *b, static struct spa_pod *make_props_param(struct spa_pod_builder *b,
@ -521,7 +521,6 @@ static void stream_param_changed(void *data, void *port_data, uint32_t id,
case SPA_PARAM_PortConfig: case SPA_PARAM_PortConfig:
pw_log_debug("PortConfig"); pw_log_debug("PortConfig");
make_stream_ports(s); make_stream_ports(s);
pw_filter_set_active(s->filter, true);
break; break;
case SPA_PARAM_Props: case SPA_PARAM_Props:
pw_log_debug("Props"); pw_log_debug("Props");
@ -556,6 +555,7 @@ static int make_stream(struct stream *s, const char *name)
const struct spa_pod *params[4]; const struct spa_pod *params[4];
uint8_t buffer[1024]; uint8_t buffer[1024];
struct spa_pod_builder b; struct spa_pod_builder b;
int res;
n_params = 0; n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer)); spa_pod_builder_init(&b, buffer, sizeof(buffer));
@ -581,12 +581,18 @@ static int make_stream(struct stream *s, const char *name)
SPA_PARAM_Format, &s->info); SPA_PARAM_Format, &s->info);
params[n_params++] = make_props_param(&b, &s->volume); params[n_params++] = make_props_param(&b, &s->volume);
return pw_filter_connect(s->filter, if ((res = pw_filter_connect(s->filter,
PW_FILTER_FLAG_INACTIVE | PW_FILTER_FLAG_INACTIVE |
PW_FILTER_FLAG_DRIVER | PW_FILTER_FLAG_DRIVER |
PW_FILTER_FLAG_RT_PROCESS | PW_FILTER_FLAG_RT_PROCESS |
PW_FILTER_FLAG_CUSTOM_LATENCY, PW_FILTER_FLAG_CUSTOM_LATENCY,
params, n_params); params, n_params)) < 0)
return res;
if (s->info.channels == 0)
make_stream_ports(s);
return res;
} }
static int create_filters(struct impl *impl) static int create_filters(struct impl *impl)
@ -801,13 +807,12 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
int res; int res;
struct netjack2_peer *peer = &impl->peer; struct netjack2_peer *peer = &impl->peer;
uint32_t i; uint32_t i;
const char *media;
pw_log_info("got follower setup"); pw_log_info("got follower setup");
nj2_dump_session_params(params); nj2_dump_session_params(params);
nj2_session_params_ntoh(&peer->params, params); nj2_session_params_ntoh(&peer->params, params);
SPA_SWAP(peer->params.send_audio_channels, peer->params.recv_audio_channels);
SPA_SWAP(peer->params.send_midi_channels, peer->params.recv_midi_channels);
if (peer->params.send_audio_channels < 0 || if (peer->params.send_audio_channels < 0 ||
peer->params.recv_audio_channels < 0 || peer->params.recv_audio_channels < 0 ||
@ -822,20 +827,28 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
pw_loop_update_io(impl->main_loop, impl->setup_socket, 0); pw_loop_update_io(impl->main_loop, impl->setup_socket, 0);
impl->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels; impl->sink.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels;
impl->source.info.rate = peer->params.sample_rate; if (impl->sink.n_ports > MAX_PORTS) {
if ((uint32_t)peer->params.send_audio_channels != impl->source.info.channels) { pw_log_warn("Too many follower sink ports %d > %d", impl->sink.n_ports, MAX_PORTS);
impl->source.info.channels = peer->params.send_audio_channels; return -EINVAL;
for (i = 0; i < SPA_MIN(impl->source.info.channels, SPA_AUDIO_MAX_CHANNELS); i++)
impl->source.info.position[i] = SPA_AUDIO_CHANNEL_AUX0 + i;
} }
impl->sink.n_ports = peer->params.recv_audio_channels + peer->params.recv_midi_channels;
impl->sink.info.rate = peer->params.sample_rate; impl->sink.info.rate = peer->params.sample_rate;
if ((uint32_t)peer->params.recv_audio_channels != impl->sink.info.channels) { if ((uint32_t)peer->params.send_audio_channels != impl->sink.info.channels) {
impl->sink.info.channels = peer->params.recv_audio_channels; impl->sink.info.channels = SPA_MIN(peer->params.send_audio_channels, (int)SPA_AUDIO_MAX_CHANNELS);
for (i = 0; i < SPA_MIN(impl->sink.info.channels, SPA_AUDIO_MAX_CHANNELS); i++) for (i = 0; i < impl->sink.info.channels; i++)
impl->sink.info.position[i] = SPA_AUDIO_CHANNEL_AUX0 + i; impl->sink.info.position[i] = SPA_AUDIO_CHANNEL_AUX0 + i;
} }
impl->source.n_ports = peer->params.recv_audio_channels + peer->params.recv_midi_channels;
if (impl->source.n_ports > MAX_PORTS) {
pw_log_warn("Too many follower source ports %d > %d", impl->source.n_ports, MAX_PORTS);
return -EINVAL;
}
impl->source.info.rate = peer->params.sample_rate;
if ((uint32_t)peer->params.recv_audio_channels != impl->source.info.channels) {
impl->source.info.channels = SPA_MIN(peer->params.recv_audio_channels, (int)SPA_AUDIO_MAX_CHANNELS);
for (i = 0; i < impl->source.info.channels; i++)
impl->source.info.position[i] = SPA_AUDIO_CHANNEL_AUX0 + i;
}
impl->samplerate = peer->params.sample_rate; impl->samplerate = peer->params.sample_rate;
impl->period_size = peer->params.period_size; impl->period_size = peer->params.period_size;
@ -855,6 +868,20 @@ static int handle_follower_setup(struct impl *impl, struct nj2_session_params *p
pw_properties_setf(impl->source.props, PW_KEY_NODE_FORCE_QUANTUM, pw_properties_setf(impl->source.props, PW_KEY_NODE_FORCE_QUANTUM,
"%u", impl->period_size); "%u", impl->period_size);
media = impl->sink.info.channels > 0 ? "Audio" : "Midi";
if (pw_properties_get(impl->sink.props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_setf(impl->sink.props, PW_KEY_MEDIA_CLASS, "%s/Sink", media);
media = impl->source.info.channels > 0 ? "Audio" : "Midi";
if (pw_properties_get(impl->source.props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_setf(impl->source.props, PW_KEY_MEDIA_CLASS, "%s/Source", media);
impl->mode = 0;
if (impl->source.n_ports > 0)
impl->mode |= MODE_SOURCE;
if (impl->sink.n_ports > 0)
impl->mode |= MODE_SINK;
if ((res = create_filters(impl)) < 0) if ((res = create_filters(impl)) < 0)
return res; return res;
@ -954,10 +981,10 @@ static int send_follower_available(struct impl *impl)
snprintf(params.follower_name, sizeof(params.follower_name), "%s", pw_get_host_name()); snprintf(params.follower_name, sizeof(params.follower_name), "%s", pw_get_host_name());
params.mtu = htonl(impl->mtu); params.mtu = htonl(impl->mtu);
params.transport_sync = htonl(0); params.transport_sync = htonl(0);
params.send_audio_channels = htonl(-1); params.send_audio_channels = htonl(impl->sink.wanted_n_audio);
params.recv_audio_channels = htonl(-1); params.recv_audio_channels = htonl(impl->source.wanted_n_audio);
params.send_midi_channels = htonl(-1); params.send_midi_channels = htonl(impl->sink.wanted_n_midi);
params.recv_midi_channels = htonl(-1); params.recv_midi_channels = htonl(impl->source.wanted_n_midi);
params.sample_encoder = htonl(NJ2_ENCODER_FLOAT); params.sample_encoder = htonl(NJ2_ENCODER_FLOAT);
params.follower_sync_mode = htonl(1); params.follower_sync_mode = htonl(1);
params.network_latency = htonl(impl->latency); params.network_latency = htonl(impl->latency);
@ -1165,8 +1192,7 @@ static void parse_audio_info(const struct pw_properties *props, struct spa_audio
{ {
spa_audio_info_raw_init_dict_keys(info, spa_audio_info_raw_init_dict_keys(info,
&SPA_DICT_ITEMS( &SPA_DICT_ITEMS(
SPA_DICT_ITEM(SPA_KEY_AUDIO_FORMAT, "F32P"), SPA_DICT_ITEM(SPA_KEY_AUDIO_FORMAT, "F32P")),
SPA_DICT_ITEM(SPA_KEY_AUDIO_POSITION, DEFAULT_POSITION)),
&props->dict, &props->dict,
SPA_KEY_AUDIO_CHANNELS, SPA_KEY_AUDIO_CHANNELS,
SPA_KEY_AUDIO_POSITION, NULL); SPA_KEY_AUDIO_POSITION, NULL);
@ -1234,20 +1260,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->sink.impl = impl; impl->sink.impl = impl;
impl->sink.direction = PW_DIRECTION_INPUT; impl->sink.direction = PW_DIRECTION_INPUT;
impl->mode = MODE_DUPLEX;
if ((str = pw_properties_get(props, "driver.mode")) != NULL) {
if (spa_streq(str, "source")) {
impl->mode = MODE_SOURCE;
} else if (spa_streq(str, "sink")) {
impl->mode = MODE_SINK;
} else if (spa_streq(str, "duplex")) {
impl->mode = MODE_DUPLEX;
} else {
pw_log_error("invalid driver.mode '%s'", str);
res = -EINVAL;
goto error;
}
}
impl->latency = pw_properties_get_uint32(impl->props, "netjack2.latency", impl->latency = pw_properties_get_uint32(impl->props, "netjack2.latency",
DEFAULT_NETWORK_LATENCY); DEFAULT_NETWORK_LATENCY);
@ -1259,11 +1271,9 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
if (pw_properties_get(props, PW_KEY_NODE_ALWAYS_PROCESS) == NULL) if (pw_properties_get(props, PW_KEY_NODE_ALWAYS_PROCESS) == NULL)
pw_properties_set(props, PW_KEY_NODE_ALWAYS_PROCESS, "true"); pw_properties_set(props, PW_KEY_NODE_ALWAYS_PROCESS, "true");
pw_properties_set(impl->sink.props, PW_KEY_MEDIA_CLASS, "Audio/Sink");
pw_properties_set(impl->sink.props, PW_KEY_PRIORITY_DRIVER, "40000"); pw_properties_set(impl->sink.props, PW_KEY_PRIORITY_DRIVER, "40000");
pw_properties_set(impl->sink.props, PW_KEY_NODE_NAME, "netjack2_driver_send"); pw_properties_set(impl->sink.props, PW_KEY_NODE_NAME, "netjack2_driver_send");
pw_properties_set(impl->source.props, PW_KEY_MEDIA_CLASS, "Audio/Source");
pw_properties_set(impl->source.props, PW_KEY_PRIORITY_DRIVER, "40001"); pw_properties_set(impl->source.props, PW_KEY_PRIORITY_DRIVER, "40001");
pw_properties_set(impl->source.props, PW_KEY_NODE_NAME, "netjack2_driver_receive"); pw_properties_set(impl->source.props, PW_KEY_NODE_NAME, "netjack2_driver_receive");
@ -1278,22 +1288,20 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
copy_props(impl, props, PW_KEY_NODE_ALWAYS_PROCESS); copy_props(impl, props, PW_KEY_NODE_ALWAYS_PROCESS);
copy_props(impl, props, PW_KEY_NODE_GROUP); copy_props(impl, props, PW_KEY_NODE_GROUP);
copy_props(impl, props, PW_KEY_NODE_VIRTUAL); copy_props(impl, props, PW_KEY_NODE_VIRTUAL);
copy_props(impl, props, "midi.ports");
copy_props(impl, props, "audio.ports");
parse_audio_info(impl->source.props, &impl->source.info); parse_audio_info(impl->source.props, &impl->source.info);
parse_audio_info(impl->sink.props, &impl->sink.info); parse_audio_info(impl->sink.props, &impl->sink.info);
impl->source.n_midi = pw_properties_get_uint32(impl->source.props, impl->source.wanted_n_midi = pw_properties_get_int32(impl->source.props,
"midi.ports", DEFAULT_MIDI_PORTS); "midi.ports", DEFAULT_MIDI_PORTS);
impl->sink.n_midi = pw_properties_get_uint32(impl->sink.props, impl->sink.wanted_n_midi = pw_properties_get_int32(impl->sink.props,
"midi.ports", DEFAULT_MIDI_PORTS); "midi.ports", DEFAULT_MIDI_PORTS);
impl->source.wanted_n_audio = pw_properties_get_int32(impl->source.props,
impl->source.n_ports = impl->source.n_midi + impl->source.info.channels; "audio.ports", DEFAULT_AUDIO_PORTS);
impl->sink.n_ports = impl->sink.n_midi + impl->sink.info.channels; impl->sink.wanted_n_audio = pw_properties_get_int32(impl->sink.props,
if (impl->source.n_ports > MAX_PORTS || impl->sink.n_ports > MAX_PORTS) { "audio.ports", DEFAULT_AUDIO_PORTS);
pw_log_error("too many ports");
res = -EINVAL;
goto error;
}
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) { if (impl->core == NULL) {

View file

@ -67,8 +67,11 @@
* - `netjack2.period-size`: the buffer size to use, default 1024 * - `netjack2.period-size`: the buffer size to use, default 1024
* - `netjack2.encoding`: the encoding, float|opus|int, default float * - `netjack2.encoding`: the encoding, float|opus|int, default float
* - `netjack2.kbps`: the number of kilobits per second when encoding, default 64 * - `netjack2.kbps`: the number of kilobits per second when encoding, default 64
* - `audio.channels`: the number of audio ports. Can also be added to the stream props. * - `audio.ports`: the number of audio ports. Can also be added to the stream props. This
* - `midi.ports`: the number of midi ports. Can also be added to the stream props. * is the default suggestion for drivers that don't specify any number of audio channels.
* - `midi.ports`: the number of midi ports. Can also be added to the stream props. This
* is the default suggestion for drivers that don't specify any number of midi channels.
* - `audio.position`: default channel position for the number of audio.ports.
* - `source.props`: Extra properties for the source filter. * - `source.props`: Extra properties for the source filter.
* - `sink.props`: Extra properties for the sink filter. * - `sink.props`: Extra properties for the sink filter.
* *
@ -99,6 +102,7 @@
* #netjack2.period-size = 1024 * #netjack2.period-size = 1024
* #netjack2.encoding = float # float|opus * #netjack2.encoding = float # float|opus
* #netjack2.kbps = 64 * #netjack2.kbps = 64
* #audio.ports = 0
* #midi.ports = 0 * #midi.ports = 0
* #audio.channels = 2 * #audio.channels = 2
* #audio.position = [ FL FR ] * #audio.position = [ FL FR ]
@ -137,8 +141,7 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
#define DEFAULT_PERIOD_SIZE 1024 #define DEFAULT_PERIOD_SIZE 1024
#define DEFAULT_ENCODING "float" #define DEFAULT_ENCODING "float"
#define DEFAULT_KBPS 64 #define DEFAULT_KBPS 64
#define DEFAULT_CHANNELS 2 #define DEFAULT_AUDIO_PORTS 2
#define DEFAULT_POSITION "[ FL FR ]"
#define DEFAULT_MIDI_PORTS 1 #define DEFAULT_MIDI_PORTS 1
#define MODULE_USAGE "( remote.name=<remote> ) " \ #define MODULE_USAGE "( remote.name=<remote> ) " \
@ -151,8 +154,8 @@ PW_LOG_TOPIC_STATIC(mod_topic, "mod." NAME);
"( netjack2.connect=<autoconnect ports, default false> ) " \ "( netjack2.connect=<autoconnect ports, default false> ) " \
"( netjack2.sample-rate=<sampl erate, default 48000> ) "\ "( netjack2.sample-rate=<sampl erate, default 48000> ) "\
"( netjack2.period-size=<period size, default 1024> ) " \ "( netjack2.period-size=<period size, default 1024> ) " \
"( midi.ports=<number of midi ports> ) " \ "( midi.ports=<number of midi ports, default 1> ) " \
"( audio.channels=<number of channels> ) " \ "( audio.channels=<number of channels, default 2> ) " \
"( audio.position=<channel map> ) " \ "( audio.position=<channel map> ) " \
"( source.props=<properties> ) " \ "( source.props=<properties> ) " \
"( sink.props=<properties> ) " "( sink.props=<properties> ) "
@ -186,8 +189,9 @@ struct stream {
struct spa_io_position *position; struct spa_io_position *position;
struct spa_audio_info_raw info; struct spa_audio_info_raw info;
uint32_t n_audio;
uint32_t n_midi; uint32_t n_midi;
uint32_t n_ports; uint32_t n_ports;
struct port *ports[MAX_PORTS]; struct port *ports[MAX_PORTS];
@ -204,6 +208,11 @@ struct follower {
struct spa_list link; struct spa_list link;
struct impl *impl; struct impl *impl;
#define MODE_SINK (1<<0)
#define MODE_SOURCE (1<<1)
#define MODE_DUPLEX (MODE_SINK|MODE_SOURCE)
uint32_t mode;
struct stream source; struct stream source;
struct stream sink; struct stream sink;
@ -235,10 +244,6 @@ struct impl {
struct pw_loop *data_loop; struct pw_loop *data_loop;
struct spa_system *system; struct spa_system *system;
#define MODE_SINK (1<<0)
#define MODE_SOURCE (1<<1)
#define MODE_DUPLEX (MODE_SINK|MODE_SOURCE)
uint32_t mode;
struct pw_properties *props; struct pw_properties *props;
struct pw_properties *sink_props; struct pw_properties *sink_props;
struct pw_properties *source_props; struct pw_properties *source_props;
@ -371,6 +376,11 @@ static inline void handle_source_process(struct stream *s, struct spa_io_positio
static void source_process(void *d, struct spa_io_position *position) static void source_process(void *d, struct spa_io_position *position)
{ {
struct stream *s = d; struct stream *s = d;
struct follower *follower = s->follower;
if (!(follower->mode & MODE_SINK))
sink_process(&follower->sink, position);
handle_source_process(s, position); handle_source_process(s, position);
} }
@ -478,9 +488,15 @@ on_data_io(void *data, int fd, uint32_t mask)
if (mask & SPA_IO_IN) { if (mask & SPA_IO_IN) {
pw_loop_update_io(impl->data_loop, follower->socket, 0); pw_loop_update_io(impl->data_loop, follower->socket, 0);
if (pw_filter_trigger_process(follower->source.filter) < 0) { if (follower->mode & MODE_SOURCE) {
pw_log_warn("source not ready"); if (pw_filter_trigger_process(follower->source.filter) < 0) {
handle_source_process(&follower->source, follower->source.position); pw_log_warn("source not ready");
handle_source_process(&follower->source, follower->source.position);
}
} else {
/* There is no source, handle the source receive side (without ports)
* with the sink position io */
handle_source_process(&follower->source, follower->sink.position);
} }
} }
} }
@ -528,6 +544,9 @@ static void make_stream_ports(struct stream *s)
struct spa_latency_info latency; struct spa_latency_info latency;
const struct spa_pod *params[1]; const struct spa_pod *params[1];
if (s->ready)
return;
for (i = 0; i < s->n_ports; i++) { for (i = 0; i < s->n_ports; i++) {
struct port *port = s->ports[i]; struct port *port = s->ports[i];
if (port != NULL) { if (port != NULL) {
@ -578,6 +597,9 @@ static void make_stream_ports(struct stream *s)
s->ports[i] = port; s->ports[i] = port;
} }
s->ready = true;
if (s->follower->started)
pw_filter_set_active(s->filter, true);
} }
static struct spa_pod *make_props_param(struct spa_pod_builder *b, static struct spa_pod *make_props_param(struct spa_pod_builder *b,
@ -643,9 +665,6 @@ static void stream_param_changed(void *data, void *port_data, uint32_t id,
case SPA_PARAM_PortConfig: case SPA_PARAM_PortConfig:
pw_log_debug("PortConfig"); pw_log_debug("PortConfig");
make_stream_ports(s); make_stream_ports(s);
s->ready = true;
if (s->follower->started)
pw_filter_set_active(s->filter, true);
break; break;
case SPA_PARAM_Props: case SPA_PARAM_Props:
pw_log_debug("Props"); pw_log_debug("Props");
@ -681,6 +700,7 @@ static int make_stream(struct stream *s, const char *name)
uint8_t buffer[1024]; uint8_t buffer[1024];
struct spa_pod_builder b; struct spa_pod_builder b;
uint32_t flags; uint32_t flags;
int res;
n_params = 0; n_params = 0;
spa_pod_builder_init(&b, buffer, sizeof(buffer)); spa_pod_builder_init(&b, buffer, sizeof(buffer));
@ -700,7 +720,8 @@ static int make_stream(struct stream *s, const char *name)
} else { } else {
pw_filter_add_listener(s->filter, &s->listener, pw_filter_add_listener(s->filter, &s->listener,
&source_events, s); &source_events, s);
flags |= PW_FILTER_FLAG_TRIGGER; if (s->follower->mode & MODE_SINK)
flags |= PW_FILTER_FLAG_TRIGGER;
} }
reset_volume(&s->volume, s->info.channels); reset_volume(&s->volume, s->info.channels);
@ -712,18 +733,23 @@ static int make_stream(struct stream *s, const char *name)
SPA_PARAM_Format, &s->info); SPA_PARAM_Format, &s->info);
params[n_params++] = make_props_param(&b, &s->volume); params[n_params++] = make_props_param(&b, &s->volume);
return pw_filter_connect(s->filter, flags, params, n_params); if ((res = pw_filter_connect(s->filter, flags, params, n_params)) < 0)
return res;
if (s->info.channels == 0)
make_stream_ports(s);
return res;
} }
static int create_filters(struct follower *follower) static int create_filters(struct follower *follower)
{ {
struct impl *impl = follower->impl;
int res = 0; int res = 0;
if (impl->mode & MODE_SINK) if (follower->mode & MODE_SINK)
res = make_stream(&follower->sink, "NETJACK2 Send"); res = make_stream(&follower->sink, "NETJACK2 Send");
if (impl->mode & MODE_SOURCE) if (follower->mode & MODE_SOURCE)
res = make_stream(&follower->source, "NETJACK2 Receive"); res = make_stream(&follower->source, "NETJACK2 Receive");
return res; return res;
@ -867,6 +893,8 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
struct follower *follower; struct follower *follower;
char buffer[256]; char buffer[256];
struct netjack2_peer *peer; struct netjack2_peer *peer;
uint32_t i;
const char *media;
pw_log_info("got follower available"); pw_log_info("got follower available");
nj2_dump_session_params(params); nj2_dump_session_params(params);
@ -898,6 +926,12 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
parse_audio_info(follower->source.props, &follower->source.info); parse_audio_info(follower->source.props, &follower->source.info);
parse_audio_info(follower->sink.props, &follower->sink.info); parse_audio_info(follower->sink.props, &follower->sink.info);
follower->source.n_audio = pw_properties_get_uint32(follower->source.props,
"audio.ports", follower->source.info.channels ?
follower->source.info.channels : DEFAULT_AUDIO_PORTS);
follower->sink.n_audio = pw_properties_get_uint32(follower->sink.props,
"audio.ports", follower->sink.info.channels ?
follower->sink.info.channels : DEFAULT_AUDIO_PORTS);
follower->source.n_midi = pw_properties_get_uint32(follower->source.props, follower->source.n_midi = pw_properties_get_uint32(follower->source.props,
"midi.ports", DEFAULT_MIDI_PORTS); "midi.ports", DEFAULT_MIDI_PORTS);
follower->sink.n_midi = pw_properties_get_uint32(follower->sink.props, follower->sink.n_midi = pw_properties_get_uint32(follower->sink.props,
@ -932,29 +966,65 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
peer->params.sample_encoder = impl->encoding; peer->params.sample_encoder = impl->encoding;
peer->params.kbps = impl->kbps; peer->params.kbps = impl->kbps;
/* params send and recv are from the client point of view and reversed for the
* manager, so when the client sends, we receive in a source etc. We swap the params
* later after we replied to the client. */
if (peer->params.send_audio_channels < 0) if (peer->params.send_audio_channels < 0)
peer->params.send_audio_channels = follower->sink.info.channels; peer->params.send_audio_channels = follower->source.n_audio;
if (peer->params.recv_audio_channels < 0) if (peer->params.recv_audio_channels < 0)
peer->params.recv_audio_channels = follower->source.info.channels; peer->params.recv_audio_channels = follower->sink.n_audio;
if (peer->params.send_midi_channels < 0) if (peer->params.send_midi_channels < 0)
peer->params.send_midi_channels = follower->sink.n_midi; peer->params.send_midi_channels = follower->source.n_midi;
if (peer->params.recv_midi_channels < 0) if (peer->params.recv_midi_channels < 0)
peer->params.recv_midi_channels = follower->source.n_midi; peer->params.recv_midi_channels = follower->sink.n_midi;
follower->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels; follower->source.n_ports = peer->params.send_audio_channels + peer->params.send_midi_channels;
follower->source.info.rate = peer->params.sample_rate; follower->source.info.rate = peer->params.sample_rate;
follower->source.info.channels = peer->params.send_audio_channels; if ((uint32_t)peer->params.send_audio_channels != follower->source.info.channels) {
follower->source.info.channels = SPA_MIN(peer->params.send_audio_channels, (int)SPA_AUDIO_MAX_CHANNELS);
for (i = 0; i < follower->source.info.channels; i++)
follower->source.info.position[i] = SPA_AUDIO_CHANNEL_AUX0 + i;
}
follower->sink.n_ports = peer->params.recv_audio_channels + peer->params.recv_midi_channels; follower->sink.n_ports = peer->params.recv_audio_channels + peer->params.recv_midi_channels;
follower->sink.info.rate = peer->params.sample_rate; follower->sink.info.rate = peer->params.sample_rate;
follower->sink.info.channels = peer->params.recv_audio_channels; if ((uint32_t)peer->params.recv_audio_channels != follower->sink.info.channels) {
follower->sink.info.channels = SPA_MIN(peer->params.recv_audio_channels, (int)SPA_AUDIO_MAX_CHANNELS);
for (i = 0; i < follower->sink.info.channels; i++)
follower->sink.info.position[i] = SPA_AUDIO_CHANNEL_AUX0 + i;
}
follower->source.n_ports = follower->source.n_midi + follower->source.info.channels;
follower->sink.n_ports = follower->sink.n_midi + follower->sink.info.channels;
if (follower->source.n_ports > MAX_PORTS || follower->sink.n_ports > MAX_PORTS) { if (follower->source.n_ports > MAX_PORTS || follower->sink.n_ports > MAX_PORTS) {
pw_log_error("too many ports"); pw_log_error("too many ports source:%d sink:%d max:%d", follower->source.n_ports,
follower->sink.n_ports, MAX_PORTS);
res = -EINVAL; res = -EINVAL;
goto cleanup; goto cleanup;
} }
media = follower->sink.info.channels > 0 ? "Audio" : "Midi";
if (pw_properties_get_bool(follower->sink.props, "netjack2.connect", DEFAULT_CONNECT)) {
if (pw_properties_get(follower->sink.props, PW_KEY_NODE_AUTOCONNECT) == NULL)
pw_properties_set(follower->sink.props, PW_KEY_NODE_AUTOCONNECT, "true");
if (pw_properties_get(follower->sink.props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_setf(follower->sink.props, PW_KEY_MEDIA_CLASS, "Stream/Input/%s", media);
} else {
if (pw_properties_get(follower->sink.props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_setf(follower->sink.props, PW_KEY_MEDIA_CLASS, "%s/Sink", media);
}
media = follower->source.info.channels > 0 ? "Audio" : "Midi";
if (pw_properties_get_bool(follower->source.props, "netjack2.connect", DEFAULT_CONNECT)) {
if (pw_properties_get(follower->source.props, PW_KEY_NODE_AUTOCONNECT) == NULL)
pw_properties_set(follower->source.props, PW_KEY_NODE_AUTOCONNECT, "true");
if (pw_properties_get(follower->source.props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_setf(follower->source.props, PW_KEY_MEDIA_CLASS, "Stream/Output/%s", media);
} else {
if (pw_properties_get(follower->source.props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_setf(follower->source.props, PW_KEY_MEDIA_CLASS, "%s/Source", media);
}
follower->mode = 0;
if (follower->sink.n_ports > 0)
follower->mode |= MODE_SINK;
if (follower->source.n_ports > 0)
follower->mode |= MODE_SOURCE;
if ((res = create_filters(follower)) < 0) if ((res = create_filters(follower)) < 0)
goto create_failed; goto create_failed;
@ -1009,6 +1079,10 @@ static int handle_follower_available(struct impl *impl, struct nj2_session_param
nj2_dump_session_params(params); nj2_dump_session_params(params);
send(follower->socket->fd, params, sizeof(*params), 0); send(follower->socket->fd, params, sizeof(*params), 0);
/* now swap send and recv to make it match our point of view */
SPA_SWAP(peer->params.send_audio_channels, peer->params.recv_audio_channels);
SPA_SWAP(peer->params.send_midi_channels, peer->params.recv_midi_channels);
return 0; return 0;
create_failed: create_failed:
@ -1179,8 +1253,7 @@ static void parse_audio_info(const struct pw_properties *props, struct spa_audio
{ {
spa_audio_info_raw_init_dict_keys(info, spa_audio_info_raw_init_dict_keys(info,
&SPA_DICT_ITEMS( &SPA_DICT_ITEMS(
SPA_DICT_ITEM(SPA_KEY_AUDIO_FORMAT, "F32P"), SPA_DICT_ITEM(SPA_KEY_AUDIO_FORMAT, "F32P")),
SPA_DICT_ITEM(SPA_KEY_AUDIO_POSITION, DEFAULT_POSITION)),
&props->dict, &props->dict,
SPA_KEY_AUDIO_CHANNELS, SPA_KEY_AUDIO_CHANNELS,
SPA_KEY_AUDIO_POSITION, NULL); SPA_KEY_AUDIO_POSITION, NULL);
@ -1244,20 +1317,6 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
impl->main_loop = pw_context_get_main_loop(context); impl->main_loop = pw_context_get_main_loop(context);
impl->system = impl->main_loop->system; impl->system = impl->main_loop->system;
impl->mode = MODE_DUPLEX;
if ((str = pw_properties_get(props, "tunnel.mode")) != NULL) {
if (spa_streq(str, "source")) {
impl->mode = MODE_SOURCE;
} else if (spa_streq(str, "sink")) {
impl->mode = MODE_SINK;
} else if (spa_streq(str, "duplex")) {
impl->mode = MODE_DUPLEX;
} else {
pw_log_error("invalid tunnel.mode '%s'", str);
res = -EINVAL;
goto error;
}
}
impl->samplerate = pw_properties_get_uint32(impl->props, "netjack2.sample-rate", impl->samplerate = pw_properties_get_uint32(impl->props, "netjack2.sample-rate",
DEFAULT_SAMPLE_RATE); DEFAULT_SAMPLE_RATE);
impl->period_size = pw_properties_get_uint32(impl->props, "netjack2.period-size", impl->period_size = pw_properties_get_uint32(impl->props, "netjack2.period-size",
@ -1316,27 +1375,10 @@ int pipewire__module_init(struct pw_impl_module *module, const char *args)
copy_props(impl, props, PW_KEY_NODE_LOCK_RATE); copy_props(impl, props, PW_KEY_NODE_LOCK_RATE);
copy_props(impl, props, PW_KEY_AUDIO_CHANNELS); copy_props(impl, props, PW_KEY_AUDIO_CHANNELS);
copy_props(impl, props, SPA_KEY_AUDIO_POSITION); copy_props(impl, props, SPA_KEY_AUDIO_POSITION);
copy_props(impl, props, "audio.ports");
copy_props(impl, props, "midi.ports");
copy_props(impl, props, "netjack2.connect"); copy_props(impl, props, "netjack2.connect");
if (pw_properties_get_bool(impl->sink_props, "netjack2.connect", DEFAULT_CONNECT)) {
if (pw_properties_get(impl->sink_props, PW_KEY_NODE_AUTOCONNECT) == NULL)
pw_properties_set(impl->sink_props, PW_KEY_NODE_AUTOCONNECT, "true");
if (pw_properties_get(impl->sink_props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_set(impl->sink_props, PW_KEY_MEDIA_CLASS, "Stream/Input/Audio");
} else {
if (pw_properties_get(impl->sink_props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_set(impl->sink_props, PW_KEY_MEDIA_CLASS, "Audio/Sink");
}
if (pw_properties_get_bool(impl->source_props, "netjack2.connect", DEFAULT_CONNECT)) {
if (pw_properties_get(impl->source_props, PW_KEY_NODE_AUTOCONNECT) == NULL)
pw_properties_set(impl->source_props, PW_KEY_NODE_AUTOCONNECT, "true");
if (pw_properties_get(impl->source_props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_set(impl->source_props, PW_KEY_MEDIA_CLASS, "Stream/Output/Audio");
} else {
if (pw_properties_get(impl->source_props, PW_KEY_MEDIA_CLASS) == NULL)
pw_properties_set(impl->source_props, PW_KEY_MEDIA_CLASS, "Audio/Source");
}
impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core); impl->core = pw_context_get_object(impl->context, PW_TYPE_INTERFACE_Core);
if (impl->core == NULL) { if (impl->core == NULL) {
str = pw_properties_get(props, PW_KEY_REMOTE_NAME); str = pw_properties_get(props, PW_KEY_REMOTE_NAME);