diff --git a/src/modules/meson.build b/src/modules/meson.build index fccb02947..9b030a07b 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -24,6 +24,8 @@ module_sources = [ 'module-rtkit.c', 'module-session-manager.c', 'module-zeroconf-discover.c', + 'module-roc-source.c', + 'module-roc-sink.c', ] pipewire_module_access = shared_library('pipewire-module-access', [ 'module-access.c' ], @@ -363,3 +365,23 @@ pipewire_module_zeroconf_discover = shared_library('pipewire-module-zeroconf-dis dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, avahi_dep], ) endif + +if roc_lib.found() +pipewire_module_roc_sink = shared_library('pipewire-module-roc-sink', + [ 'module-roc-sink.c' ], + include_directories : [configinc, spa_inc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, roc_lib], +) + +pipewire_module_roc_source = shared_library('pipewire-module-roc-source', + [ 'module-roc-source.c' ], + include_directories : [configinc, spa_inc], + install : true, + install_dir : modules_install_dir, + install_rpath: modules_install_dir, + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep, roc_lib], +) +endif diff --git a/src/modules/module-protocol-pulse/modules/module-roc-sink.c b/src/modules/module-protocol-pulse/modules/module-roc-sink.c index bedadeefa..00f22d2e7 100644 --- a/src/modules/module-protocol-pulse/modules/module-roc-sink.c +++ b/src/modules/module-protocol-pulse/modules/module-roc-sink.c @@ -23,259 +23,102 @@ * DEALINGS IN THE SOFTWARE. */ -#include -#include -#include - +#include #include #include -#include -#include #include "../defs.h" #include "../module.h" #include "registry.h" -#include -#include -#include -#include -#include - #define ROC_DEFAULT_IP "0.0.0.0" -#define ROC_DEFAULT_SOURCE_PORT 10001 -#define ROC_DEFAULT_REPAIR_PORT 10002 +#define ROC_DEFAULT_SOURCE_PORT "10001" +#define ROC_DEFAULT_REPAIR_PORT "10002" -struct module_rocsink_data { +struct module_roc_sink_data { struct module *module; - struct pw_core *core; - struct pw_stream *capture; - struct spa_hook core_listener; - struct spa_hook capture_listener; - struct spa_audio_info_raw info; - struct pw_properties *capture_props; - roc_address local_addr; - roc_address remote_source_addr; - roc_address remote_repair_addr; - roc_context *context; - roc_sender *sender; + struct spa_hook mod_listener; + struct pw_impl_module *mod; - char *local_ip; - char *remote_ip; - int remote_source_port; - int remote_repair_port; + struct pw_properties *sink_props; + struct pw_properties *roc_props; }; -static void capture_process(void *data) +static void module_destroy(void *data) { - struct module_rocsink_data *impl = data; - struct pw_buffer *in; - struct spa_data *d; - roc_frame frame; - uint32_t i, size, offset; - - if ((in = pw_stream_dequeue_buffer(impl->capture)) == NULL) { - pw_log_warn("Out of capture buffers: %m"); - return; - } - - for (i = 0; i < in->buffer->n_datas; i++) { - d = &in->buffer->datas[i]; - size = d->chunk->size; - offset = d->chunk->offset; - - while (size > 0) { - memset(&frame, 0, sizeof(frame)); - - frame.samples = SPA_MEMBER(d->data, offset, void); - frame.samples_size = size; - - if (roc_sender_write(impl->sender, &frame) != 0) { - pw_log_warn("Failed to write to roc sink"); - break; - } - - offset += size; - size -= size; - } - } - pw_stream_queue_buffer(impl->capture, in); + struct module_roc_sink_data *d = data; + spa_hook_remove(&d->mod_listener); + d->mod = NULL; + module_schedule_unload(d->module); } -static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message) -{ - struct module_rocsink_data *d = data; - - pw_log_error("error id:%u seq:%d res:%d (%s): %s", - id, seq, res, spa_strerror(res), message); - - if (id == PW_ID_CORE && res == -EPIPE) - module_schedule_unload(d->module); -} - -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .error = on_core_error, +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy }; -static void on_stream_state_changed(void *data, enum pw_stream_state old, - enum pw_stream_state state, const char *error) +static int module_roc_sink_load(struct client *client, struct module *module) { - struct module_rocsink_data *d = data; + struct module_roc_sink_data *data = module->user_data; + FILE *f; + const char *str; + char *args; + size_t size; - switch (state) { - case PW_STREAM_STATE_UNCONNECTED: - pw_log_info("stream disconnected, unloading"); - module_schedule_unload(d->module); - break; - case PW_STREAM_STATE_ERROR: - pw_log_error("stream error: %s", error); - break; - default: - break; - } -} + f = open_memstream(&args, &size); + fprintf(f, "{"); + /* Can't just serialise this dict because the "null" method gets + * interpreted as a JSON null */ + if ((str = pw_properties_get(data->roc_props, "local.ip"))) + fprintf(f, " local.ip = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "remote.ip"))) + fprintf(f, " remote.ip = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "remote.source.port"))) + fprintf(f, " remote.source.port = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "remote.repair.port"))) + fprintf(f, " remote.repair.port = \"%s\"", str); + fprintf(f, " } sink.props = {"); + pw_properties_serialize_dict(f, &data->sink_props->dict, 0); + fprintf(f, " } }"); + fclose(f); -static const struct pw_stream_events in_stream_events = { - PW_VERSION_STREAM_EVENTS, - .state_changed = on_stream_state_changed, - .process = capture_process -}; - -static int module_rocsink_load(struct client *client, struct module *module) -{ - struct module_rocsink_data *data = module->user_data; - int res; - uint32_t n_params; - const struct spa_pod *params[1]; - uint8_t buffer[1024]; - struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - roc_context_config context_config; - roc_sender_config sender_config; - - if (roc_address_init(&data->local_addr, ROC_AF_AUTO, data->local_ip, 0)) { - pw_log_error("Invalid local IP address"); - return -EINVAL; - } - - if (roc_address_init(&data->remote_source_addr, ROC_AF_AUTO, data->remote_ip, - data->remote_source_port)) { - pw_log_error("Invalid remote source address"); - return -EINVAL; - } - - if (roc_address_init(&data->remote_repair_addr, ROC_AF_AUTO, data->remote_ip, - data->remote_repair_port)) { - pw_log_error("Invalid remote repair address"); - return -EINVAL; - } - - memset(&context_config, 0, sizeof(context_config)); - - data->context = roc_context_open(&context_config); - if (!data->context) { - pw_log_error("Failed to create roc context"); - return -EINVAL; - } - - memset(&sender_config, 0, sizeof(sender_config)); - - sender_config.frame_sample_rate = 44100; - sender_config.frame_channels = ROC_CHANNEL_SET_STEREO; - sender_config.frame_encoding = ROC_FRAME_ENCODING_PCM_FLOAT; - - /* Fixed to be the same as ROC sender config above */ - data->info.rate = 44100; - data->info.channels = 2; - data->info.format = SPA_AUDIO_FORMAT_F32_LE; - - data->sender = roc_sender_open(data->context, &sender_config); - if (!data->sender) { - pw_log_error("Failed to create roc sender"); - return -EINVAL; - } - - if (roc_sender_bind(data->sender, &data->local_addr) != 0) { - pw_log_error("Failed to bind sender to local address"); - return -EINVAL; - } - - if (roc_sender_connect(data->sender, ROC_PORT_AUDIO_SOURCE, ROC_PROTO_RTP_RS8M_SOURCE, - &data->remote_source_addr) != 0) { - pw_log_error("can't connect roc sender to remote source address"); - return -EINVAL; - } - - if (roc_sender_connect(data->sender, ROC_PORT_AUDIO_REPAIR, ROC_PROTO_RS8M_REPAIR, - &data->remote_repair_addr) != 0) { - pw_log_error("can't connect roc sender to remote repair address"); - return -EINVAL; - } - - data->core = pw_context_connect(module->impl->context, - pw_properties_copy(client->props), - 0); - if (data->core == NULL) + data->mod = pw_context_load_module(module->impl->context, + "libpipewire-module-roc-sink", + args, NULL); + if (data->mod == NULL) return -errno; - pw_core_add_listener(data->core, - &data->core_listener, - &core_events, data); - - data->capture = pw_stream_new(data->core, - "roc-sink capture", data->capture_props); - data->capture_props = NULL; - if (data->capture == NULL) - return -errno; - - pw_stream_add_listener(data->capture, - &data->capture_listener, - &in_stream_events, data); - - n_params = 0; - params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, - &data->info); - - if ((res = pw_stream_connect(data->capture, - PW_DIRECTION_INPUT, - PW_ID_ANY, - PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, - params, n_params)) < 0) - return res; + pw_impl_module_add_listener(data->mod, + &data->mod_listener, + &module_events, data); return 0; } -static int module_rocsink_unload(struct client *client, struct module *module) +static int module_roc_sink_unload(struct client *client, struct module *module) { - struct module_rocsink_data *d = module->user_data; + struct module_roc_sink_data *d = module->user_data; - pw_properties_free(d->capture_props); - if (d->capture != NULL) - pw_stream_destroy(d->capture); - if (d->core != NULL) - pw_core_disconnect(d->core); - if (d->sender) - roc_sender_close(d->sender); - if (d->context) - roc_context_close(d->context); + if (d->mod) { + spa_hook_remove(&d->mod_listener); + pw_impl_module_destroy(d->mod); + d->mod = NULL; + } - free(d->local_ip); - free(d->remote_ip); + pw_properties_free(d->roc_props); + pw_properties_free(d->sink_props); return 0; } -static const struct module_methods module_rocsink_methods = { +static const struct module_methods module_roc_sink_methods = { VERSION_MODULE_METHODS, - .load = module_rocsink_load, - .unload = module_rocsink_unload, + .load = module_roc_sink_load, + .unload = module_roc_sink_unload, }; -static const struct spa_dict_item module_rocsink_info[] = { +static const struct spa_dict_item module_roc_sink_info[] = { { PW_KEY_MODULE_AUTHOR, "Sanchayan Maity " }, { PW_KEY_MODULE_DESCRIPTION, "roc sink" }, { PW_KEY_MODULE_USAGE, "sink_name= " @@ -289,37 +132,33 @@ static const struct spa_dict_item module_rocsink_info[] = { struct module *create_module_roc_sink(struct impl *impl, const char *argument) { struct module *module; - struct module_rocsink_data *d; - struct pw_properties *props = NULL, *capture_props = NULL; - struct spa_audio_info_raw info = { 0 }; + struct module_roc_sink_data *d; + struct pw_properties *props = NULL, *sink_props = NULL, *roc_props = NULL; const char *str; - char *local_ip = NULL, *remote_ip = NULL; - int res = 0, remote_repair_port, remote_source_port; + int res; - props = pw_properties_new_dict(&SPA_DICT_INIT_ARRAY(module_rocsink_info)); - capture_props = pw_properties_new(NULL, NULL); - if (!props || !capture_props) { - res = -EINVAL; + props = pw_properties_new_dict(&SPA_DICT_INIT_ARRAY(module_roc_sink_info)); + sink_props = pw_properties_new(NULL, NULL); + roc_props = pw_properties_new(NULL, NULL); + if (!props || !sink_props || !roc_props) { + res = -errno; goto out; } - if (argument) + if (argument != NULL) module_args_add_props(props, argument); - if (module_args_to_audioinfo(impl, props, &info) < 0) { - res = -EINVAL; - goto out; - } - if ((str = pw_properties_get(props, "sink_name")) != NULL) { - pw_properties_set(capture_props, PW_KEY_NODE_NAME, str); + pw_properties_set(sink_props, PW_KEY_NODE_NAME, str); pw_properties_set(props, "sink_name", NULL); } - if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) == NULL) + if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) == NULL) { pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + pw_properties_set(sink_props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + } if ((str = pw_properties_get(props, "remote_ip")) != NULL) { - remote_ip = strdup(str); + pw_properties_set(roc_props, "remote.ip", str); pw_properties_set(props, "remote_ip", NULL); } else { pw_log_error("Remote IP not specified"); @@ -328,27 +167,27 @@ struct module *create_module_roc_sink(struct impl *impl, const char *argument) } if ((str = pw_properties_get(props, "local_ip")) != NULL) { - local_ip = strdup(str); + pw_properties_set(roc_props, "local.ip", str); pw_properties_set(props, "local_ip", NULL); } else { - local_ip = strdup(ROC_DEFAULT_IP); + pw_properties_set(roc_props, "local.ip", ROC_DEFAULT_IP); } if ((str = pw_properties_get(props, "remote_source_port")) != NULL) { - remote_source_port = pw_properties_parse_int(str); + pw_properties_set(roc_props, "remote.source.port", str); pw_properties_set(props, "remote_source_port", NULL); } else { - remote_source_port = ROC_DEFAULT_SOURCE_PORT; + pw_properties_set(roc_props, "remote.source.port", ROC_DEFAULT_SOURCE_PORT); } if ((str = pw_properties_get(props, "remote_repair_port")) != NULL) { - remote_repair_port = pw_properties_parse_int(str); + pw_properties_set(roc_props, "remote.repair.port", str); pw_properties_set(props, "remote_repair_port", NULL); } else { - remote_repair_port = ROC_DEFAULT_REPAIR_PORT; + pw_properties_set(roc_props, "remote.repair.port", ROC_DEFAULT_REPAIR_PORT); } - module = module_new(impl, &module_rocsink_methods, sizeof(*d)); + module = module_new(impl, &module_roc_sink_methods, sizeof(*d)); if (module == NULL) { res = -errno; goto out; @@ -357,22 +196,12 @@ struct module *create_module_roc_sink(struct impl *impl, const char *argument) module->props = props; d = module->user_data; d->module = module; - d->capture_props = capture_props; - d->info = info; - d->local_ip = local_ip; - d->remote_ip = remote_ip; - d->remote_source_port = remote_source_port; - d->remote_repair_port = remote_repair_port; - - pw_log_info("Successfully loaded module-roc-sink"); + d->sink_props = sink_props; + d->roc_props = roc_props; return module; out: pw_properties_free(props); - pw_properties_free(capture_props); - free(local_ip); - free(remote_ip); errno = -res; - return NULL; } diff --git a/src/modules/module-protocol-pulse/modules/module-roc-source.c b/src/modules/module-protocol-pulse/modules/module-roc-source.c index aa06f4105..386b95f2f 100644 --- a/src/modules/module-protocol-pulse/modules/module-roc-source.c +++ b/src/modules/module-protocol-pulse/modules/module-roc-source.c @@ -23,269 +23,77 @@ * DEALINGS IN THE SOFTWARE. */ -#include -#include -#include - +#include #include #include -#include -#include #include "../defs.h" #include "../module.h" #include "registry.h" -#include -#include -#include -#include -#include - -#define ROC_DEFAULT_IP "0.0.0.0" -#define ROC_DEFAULT_SOURCE_PORT 10001 -#define ROC_DEFAULT_REPAIR_PORT 10002 -#define ROC_DEFAULT_SESS_LATENCY 200 +#define ROC_DEFAULT_IP "0.0.0.0" +#define ROC_DEFAULT_SOURCE_PORT "10001" +#define ROC_DEFAULT_REPAIR_PORT "10002" struct module_roc_source_data { struct module *module; - struct pw_core *core; - struct pw_stream *playback; - struct spa_hook core_listener; - struct spa_hook playback_listener; - struct spa_audio_info_raw info; - struct pw_properties *playback_props; - roc_address local_addr; - roc_address local_source_addr; - roc_address local_repair_addr; - roc_context *context; - roc_receiver *receiver; + struct spa_hook mod_listener; + struct pw_impl_module *mod; - char *resampler_profile; - char *local_ip; - int local_source_port; - int local_repair_port; - int sess_latency_msec; + struct pw_properties *source_props; + struct pw_properties *roc_props; }; -static int roc_parse_resampler_profile(roc_resampler_profile *out, const char *str) -{ - if (!str || !*str) { - *out = ROC_RESAMPLER_DEFAULT; - return 0; - } else if (spa_streq(str, "disable") == 0) { - *out = ROC_RESAMPLER_DISABLE; - return 0; - } else if (spa_streq(str, "high") == 0) { - *out = ROC_RESAMPLER_HIGH; - return 0; - } else if (spa_streq(str, "medium") == 0) { - *out = ROC_RESAMPLER_MEDIUM; - return 0; - } else if (spa_streq(str, "low") == 0) { - *out = ROC_RESAMPLER_LOW; - return 0; - } else { - pw_log_error("Invalid resampler profile: %s", str); - return -EINVAL; - } -} - -static void playback_process(void *data) -{ - struct module_roc_source_data *impl = data; - struct pw_buffer *b; - struct spa_buffer *buf; - roc_frame frame; - uint8_t *dst; - - if ((b = pw_stream_dequeue_buffer(impl->playback)) == NULL) { - pw_log_warn("Out of playback buffers: %m"); - return; - } - - buf = b->buffer; - if ((dst = buf->datas[0].data) == NULL) - return; - - buf->datas[0].chunk->offset = 0; - buf->datas[0].chunk->stride = 8; /* channels = 2, format = F32LE */ - buf->datas[0].chunk->size = 0; - - memset(&frame, 0, sizeof(frame)); - - frame.samples = dst; - frame.samples_size = buf->datas[0].maxsize; - - if (roc_receiver_read(impl->receiver, &frame) != 0) { - /* Handle EOF and error */ - pw_log_error("Failed to read from roc source"); - module_schedule_unload(impl->module); - return; - } - - buf->datas[0].chunk->size = frame.samples_size; - - pw_stream_queue_buffer(impl->playback, b); -} - -static void on_core_error(void *data, uint32_t id, int seq, int res, const char *message) +static void module_destroy(void *data) { struct module_roc_source_data *d = data; - - pw_log_error("error id:%u seq:%d res:%d (%s): %s", - id, seq, res, spa_strerror(res), message); - - if (id == PW_ID_CORE && res == -EPIPE) - module_schedule_unload(d->module); + spa_hook_remove(&d->mod_listener); + d->mod = NULL; + module_schedule_unload(d->module); } -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .error = on_core_error, -}; - -static void on_stream_state_changed(void *data, enum pw_stream_state old, - enum pw_stream_state state, const char *error) -{ - struct module_roc_source_data *d = data; - - switch (state) { - case PW_STREAM_STATE_UNCONNECTED: - pw_log_info("stream disconnected, unloading"); - module_schedule_unload(d->module); - break; - case PW_STREAM_STATE_ERROR: - pw_log_error("stream error: %s", error); - break; - default: - break; - } -} - -static const struct pw_stream_events out_stream_events = { - PW_VERSION_STREAM_EVENTS, - .state_changed = on_stream_state_changed, - .process = playback_process +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy }; static int module_roc_source_load(struct client *client, struct module *module) { struct module_roc_source_data *data = module->user_data; - int res; - uint32_t n_params; - const struct spa_pod *params[1]; - uint8_t buffer[1024]; - struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - roc_context_config context_config; - roc_receiver_config receiver_config; + FILE *f; + const char *str; + char *args; + size_t size; - if (roc_address_init(&data->local_addr, ROC_AF_AUTO, data->local_ip, 0)) { - pw_log_error("Invalid local IP address"); - return -EINVAL; - } + f = open_memstream(&args, &size); + fprintf(f, "{"); + /* Can't just serialise this dict because the "null" method gets + * interpreted as a JSON null */ + if ((str = pw_properties_get(data->roc_props, "local.ip"))) + fprintf(f, " local.ip = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "local.source.port"))) + fprintf(f, " local.source.port = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "local.repair.port"))) + fprintf(f, " local.repair.port = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "sess.latency.msec"))) + fprintf(f, " sess.latency.msec = \"%s\"", str); + if ((str = pw_properties_get(data->roc_props, "resampler.profile"))) + fprintf(f, " resampler.profile = \"%s\"", str); + fprintf(f, " } source.props = {"); + pw_properties_serialize_dict(f, &data->source_props->dict, 0); + fprintf(f, " } }"); + fclose(f); - if (roc_address_init(&data->local_source_addr, ROC_AF_AUTO, data->local_ip, - data->local_source_port)) { - pw_log_error("Invalid local source address"); - return -EINVAL; - } - - if (roc_address_init(&data->local_repair_addr, ROC_AF_AUTO, data->local_ip, - data->local_repair_port)) { - pw_log_error("Invalid local repair address"); - return -EINVAL; - } - - memset(&context_config, 0, sizeof(context_config)); - - data->context = roc_context_open(&context_config); - if (!data->context) { - pw_log_error("Failed to create roc context"); - return -EINVAL; - } - - memset(&receiver_config, 0, sizeof(receiver_config)); - - receiver_config.frame_sample_rate = 44100; - receiver_config.frame_channels = ROC_CHANNEL_SET_STEREO; - receiver_config.frame_encoding = ROC_FRAME_ENCODING_PCM_FLOAT; - - /* Fixed to be the same as ROC receiver config above */ - data->info.rate = 44100; - data->info.channels = 2; - data->info.format = SPA_AUDIO_FORMAT_F32_LE; - - if (roc_parse_resampler_profile(&receiver_config.resampler_profile, - data->resampler_profile)) { - pw_log_error("Invalid resampler profile"); - return -EINVAL; - } - - /* - * Note that target latency is in nano seconds. - * - * The session will not start playing until it accumulates the - * requested latency. Then if resampler is enabled, the session will - * adjust it's clock to keep actual latency as close as possible to - * the target latency. If zero, default value will be used. - * - * See API reference: - * https://roc-streaming.org/toolkit/docs/api/reference.html - */ - receiver_config.target_latency = data->sess_latency_msec * 1000000; - - data->receiver = roc_receiver_open(data->context, &receiver_config); - if (!data->receiver) { - pw_log_error("Failed to create roc receiver"); - return -EINVAL; - } - - if (roc_receiver_bind(data->receiver, ROC_PORT_AUDIO_SOURCE, ROC_PROTO_RTP_RS8M_SOURCE, - &data->local_source_addr) != 0) { - pw_log_error("can't connect roc receiver to local source address"); - return -EINVAL; - } - - if (roc_receiver_bind(data->receiver, ROC_PORT_AUDIO_REPAIR, ROC_PROTO_RS8M_REPAIR, - &data->local_repair_addr) != 0) { - pw_log_error("can't connect roc receiver to local repair address"); - return -EINVAL; - } - - data->core = pw_context_connect(module->impl->context, - pw_properties_copy(client->props), - 0); - if (data->core == NULL) + data->mod = pw_context_load_module(module->impl->context, + "libpipewire-module-roc-source", + args, NULL); + if (data->mod == NULL) return -errno; - pw_core_add_listener(data->core, - &data->core_listener, - &core_events, data); - - data->playback = pw_stream_new(data->core, - "roc-source playback", data->playback_props); - data->playback_props = NULL; - if (data->playback == NULL) - return -errno; - - pw_stream_add_listener(data->playback, - &data->playback_listener, - &out_stream_events, data); - - n_params = 0; - params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, - &data->info); - - if ((res = pw_stream_connect(data->playback, - PW_DIRECTION_OUTPUT, - PW_ID_ANY, - PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS, - params, n_params)) < 0) - return res; + pw_impl_module_add_listener(data->mod, + &data->mod_listener, + &module_events, data); return 0; } @@ -294,18 +102,14 @@ static int module_roc_source_unload(struct client *client, struct module *module { struct module_roc_source_data *d = module->user_data; - pw_properties_free(d->playback_props); - if (d->playback != NULL) - pw_stream_destroy(d->playback); - if (d->core != NULL) - pw_core_disconnect(d->core); - if (d->receiver) - roc_receiver_close(d->receiver); - if (d->context) - roc_context_close(d->context); + if (d->mod) { + spa_hook_remove(&d->mod_listener); + pw_impl_module_destroy(d->mod); + d->mod = NULL; + } - free(d->local_ip); - free(d->resampler_profile); + pw_properties_free(d->roc_props); + pw_properties_free(d->source_props); return 0; } @@ -332,65 +136,64 @@ struct module *create_module_roc_source(struct impl *impl, const char *argument) { struct module *module; struct module_roc_source_data *d; - struct pw_properties *props = NULL, *playback_props = NULL; - struct spa_audio_info_raw info = { 0 }; + struct pw_properties *props = NULL, *source_props = NULL, *roc_props = NULL; const char *str; - char *local_ip = NULL, *resampler_profile = NULL; - int res = 0, local_repair_port, local_source_port, sess_latency_msec; + int res; props = pw_properties_new_dict(&SPA_DICT_INIT_ARRAY(module_roc_source_info)); - playback_props = pw_properties_new(NULL, NULL); - if (!props || !playback_props) { - res = -EINVAL; + source_props = pw_properties_new(NULL, NULL); + roc_props = pw_properties_new(NULL, NULL); + if (!props || !source_props || !roc_props) { + res = -errno; goto out; } - if (argument) + + if (argument != NULL) module_args_add_props(props, argument); - if (module_args_to_audioinfo(impl, props, &info) < 0) { - res = -EINVAL; - goto out; - } - if ((str = pw_properties_get(props, "source_name")) != NULL) { - pw_properties_set(playback_props, PW_KEY_NODE_NAME, str); + pw_properties_set(source_props, PW_KEY_NODE_NAME, str); pw_properties_set(props, "source_name", NULL); } - if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) == NULL) - pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Source"); + if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) == NULL) { + pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + pw_properties_set(source_props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + } if ((str = pw_properties_get(props, "local_ip")) != NULL) { - local_ip = strdup(str); + pw_properties_set(roc_props, "local.ip", str); pw_properties_set(props, "local_ip", NULL); } else { - local_ip = strdup(ROC_DEFAULT_IP); + pw_properties_set(roc_props, "local.ip", ROC_DEFAULT_IP); } if ((str = pw_properties_get(props, "local_source_port")) != NULL) { - local_source_port = pw_properties_parse_int(str); + pw_properties_set(roc_props, "local.source.port", str); pw_properties_set(props, "local_source_port", NULL); } else { - local_source_port = ROC_DEFAULT_SOURCE_PORT; + pw_properties_set(roc_props, "local.source.port", ROC_DEFAULT_SOURCE_PORT); } if ((str = pw_properties_get(props, "local_repair_port")) != NULL) { - local_repair_port = pw_properties_parse_int(str); + pw_properties_set(roc_props, "local.repair.port", str); pw_properties_set(props, "local_repair_port", NULL); } else { - local_repair_port = ROC_DEFAULT_REPAIR_PORT; + pw_properties_set(roc_props, "local.repair.port", ROC_DEFAULT_REPAIR_PORT); } if ((str = pw_properties_get(props, "sess_latency_msec")) != NULL) { - sess_latency_msec = pw_properties_parse_int(str); + pw_properties_set(roc_props, "sess.latency.msec", str); pw_properties_set(props, "sess_latency_msec", NULL); } else { - sess_latency_msec = ROC_DEFAULT_SESS_LATENCY; + pw_properties_set(roc_props, "sess.latency.msec", ROC_DEFAULT_REPAIR_PORT); } if ((str = pw_properties_get(props, "resampler_profile")) != NULL) { - resampler_profile = strdup(str); + pw_properties_set(roc_props, "resampler.profile", str); pw_properties_set(props, "resampler_profile", NULL); + } else { + pw_properties_set(roc_props, "resampler.profile", ROC_DEFAULT_REPAIR_PORT); } module = module_new(impl, &module_roc_source_methods, sizeof(*d)); @@ -402,22 +205,12 @@ struct module *create_module_roc_source(struct impl *impl, const char *argument) module->props = props; d = module->user_data; d->module = module; - d->playback_props = playback_props; - d->info = info; - d->local_ip = local_ip; - d->local_source_port = local_source_port; - d->local_repair_port = local_repair_port; - d->sess_latency_msec = sess_latency_msec; - d->resampler_profile = resampler_profile; - - pw_log_info("Successfully loaded module-roc-source"); + d->source_props = source_props; + d->roc_props = roc_props; return module; out: pw_properties_free(props); - pw_properties_free(playback_props); - free(local_ip); errno = -res; - return NULL; } diff --git a/src/modules/module-roc-sink.c b/src/modules/module-roc-sink.c new file mode 100644 index 000000000..6912072d0 --- /dev/null +++ b/src/modules/module-roc-sink.c @@ -0,0 +1,494 @@ +/* PipeWire + * + * Copyright © 2021 Wim Taymans + * Copyright © 2021 Sanchayan Maity + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include + +#include "config.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/** \page page_module_roc_sink PipeWire Module: ROC sink + * + * The `roc-sink` module creates a PipeWire sink that sends samples to + * a preconfigured receiver address. One can then connect an audio stream + * of any running application to that sink or make it the default sink. + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `sink.props = {}`: properties to be passed to the sink stream + * - `local.ip = `: local sender ip + * - `remote.ip = `: remote receiver ip + * - `remote.source.port = `: remote receiver port for source packets + * - `remote.repair.port = `: remote receiver port for receiver packets + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_NODE_NAME + * + * ## Example configuration + *\code{.unparsed} + * context.modules = [ + * { name = libpipewire-module-roc-sink + * args = { + * local.ip = 0.0.0.0 + * remote.ip = 192.168.0.244 + * remote.source.port = 10001 + * remote.repair.port = 10002 + * sink.props = { + * node.name = "roc-sink" + * } + * } + * } + *] + *\endcode + * + */ + +#define ROC_DEFAULT_IP "0.0.0.0" +#define ROC_DEFAULT_SOURCE_PORT 10001 +#define ROC_DEFAULT_REPAIR_PORT 10002 + +struct module_roc_sink_data { + struct pw_impl_module *module; + struct pw_work_queue *work; + struct spa_hook module_listener; + struct pw_properties *props; + struct pw_context *module_context; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + + struct pw_stream *capture; + struct spa_hook capture_listener; + struct pw_properties *capture_props; + + unsigned int do_disconnect:1; + unsigned int unloading:1; + + roc_address local_addr; + roc_address remote_source_addr; + roc_address remote_repair_addr; + roc_context *context; + roc_sender *sender; + + char *local_ip; + char *remote_ip; + int remote_source_port; + int remote_repair_port; +}; + +static void do_unload_module(void *obj, void *d, int res, uint32_t id) +{ + struct module_roc_sink_data *data = d; + pw_impl_module_destroy(data->module); +} + +static void unload_module(struct module_roc_sink_data *data) +{ + if (!data->unloading) { + data->unloading = true; + pw_work_queue_add(data->work, data, 0, do_unload_module, data); + } +} + +static void stream_destroy(void *d) +{ + struct module_roc_sink_data *data = d; + spa_hook_remove(&data->capture_listener); + data->capture = NULL; +} + +static void capture_process(void *data) +{ + struct module_roc_sink_data *impl = data; + struct pw_buffer *in; + struct spa_data *d; + roc_frame frame; + uint32_t i, size, offset; + + if ((in = pw_stream_dequeue_buffer(impl->capture)) == NULL) { + pw_log_warn("Out of capture buffers: %m"); + return; + } + + for (i = 0; i < in->buffer->n_datas; i++) { + d = &in->buffer->datas[i]; + size = d->chunk->size; + offset = d->chunk->offset; + + while (size > 0) { + memset(&frame, 0, sizeof(frame)); + + frame.samples = SPA_MEMBER(d->data, offset, void); + frame.samples_size = size; + + if (roc_sender_write(impl->sender, &frame) != 0) { + pw_log_warn("Failed to write to roc sink"); + break; + } + + offset += size; + size -= size; + } + } + pw_stream_queue_buffer(impl->capture, in); +} + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct module_roc_sink_data *data = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + unload_module(data); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct module_roc_sink_data *data = d; + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + pw_log_info("stream disconnected, unloading"); + unload_module(data); + break; + case PW_STREAM_STATE_ERROR: + pw_log_error("stream error: %s", error); + break; + default: + break; + } +} + +static const struct pw_stream_events in_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = on_stream_state_changed, + .process = capture_process +}; + +static void core_destroy(void *d) +{ + struct module_roc_sink_data *data = d; + spa_hook_remove(&data->core_listener); + data->core = NULL; + unload_module(data); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct module_roc_sink_data *data) +{ + if (data->capture) + pw_stream_destroy(data->capture); + if (data->core && data->do_disconnect) + pw_core_disconnect(data->core); + + pw_properties_free(data->capture_props); + pw_properties_free(data->props); + + if (data->work) + pw_work_queue_cancel(data->work, data, SPA_ID_INVALID); + if (data->sender) + roc_sender_close(data->sender); + if (data->context) + roc_context_close(data->context); + + free(data->local_ip); + free(data->remote_ip); + free(data); +} + +static void module_destroy(void *d) +{ + struct module_roc_sink_data *data = d; + data->unloading = true; + spa_hook_remove(&data->module_listener); + impl_destroy(data); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static int roc_sink_setup(struct module_roc_sink_data *data) +{ + roc_context_config context_config; + roc_sender_config sender_config; + struct spa_audio_info_raw info = { 0 }; + const struct spa_pod *params[1]; + struct spa_pod_builder b; + uint32_t n_params; + uint8_t buffer[1024]; + int res; + + if (roc_address_init(&data->local_addr, ROC_AF_AUTO, data->local_ip, 0)) { + pw_log_error("Invalid local IP address"); + return -EINVAL; + } + + if (roc_address_init(&data->remote_source_addr, ROC_AF_AUTO, data->remote_ip, + data->remote_source_port)) { + pw_log_error("Invalid remote source address"); + return -EINVAL; + } + + if (roc_address_init(&data->remote_repair_addr, ROC_AF_AUTO, data->remote_ip, + data->remote_repair_port)) { + pw_log_error("Invalid remote repair address"); + return -EINVAL; + } + + memset(&context_config, 0, sizeof(context_config)); + + data->context = roc_context_open(&context_config); + if (!data->context) { + pw_log_error("Failed to create roc context"); + return -EINVAL; + } + + memset(&sender_config, 0, sizeof(sender_config)); + + sender_config.frame_sample_rate = 44100; + sender_config.frame_channels = ROC_CHANNEL_SET_STEREO; + sender_config.frame_encoding = ROC_FRAME_ENCODING_PCM_FLOAT; + + /* Fixed to be the same as ROC sender config above */ + info.rate = 44100; + info.channels = 2; + info.format = SPA_AUDIO_FORMAT_F32_LE; + + data->sender = roc_sender_open(data->context, &sender_config); + if (!data->sender) { + pw_log_error("Failed to create roc sender"); + return -EINVAL; + } + + if (roc_sender_bind(data->sender, &data->local_addr) != 0) { + pw_log_error("Failed to bind sender to local address"); + return -EINVAL; + } + + if (roc_sender_connect(data->sender, ROC_PORT_AUDIO_SOURCE, ROC_PROTO_RTP_RS8M_SOURCE, + &data->remote_source_addr) != 0) { + pw_log_error("can't connect roc sender to remote source address"); + return -EINVAL; + } + + if (roc_sender_connect(data->sender, ROC_PORT_AUDIO_REPAIR, ROC_PROTO_RS8M_REPAIR, + &data->remote_repair_addr) != 0) { + pw_log_error("can't connect roc sender to remote repair address"); + return -EINVAL; + } + + data->capture = pw_stream_new(data->core, + "roc-sink capture", data->capture_props); + data->capture_props = NULL; + if (data->capture == NULL) + return -errno; + + pw_stream_add_listener(data->capture, + &data->capture_listener, + &in_stream_events, data); + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &info); + + if ((res = pw_stream_connect(data->capture, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + return 0; +} + +static const struct spa_dict_item module_roc_sink_info[] = { + { PW_KEY_MODULE_AUTHOR, "Sanchayan Maity " }, + { PW_KEY_MODULE_DESCRIPTION, "roc sink" }, + { PW_KEY_MODULE_USAGE, "sink.name= " + "local.ip= " + "remote.ip= " + "remote.source.port= " + "remote.repair.port= " }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct module_roc_sink_data *data; + struct pw_properties *props = NULL, *capture_props = NULL; + const char *str; + char *local_ip = NULL, *remote_ip = NULL; + int res = 0, remote_repair_port, remote_source_port; + + data = calloc(1, sizeof(struct module_roc_sink_data)); + if (data == NULL) + return -errno; + + if (args == NULL) + args = ""; + + props = pw_properties_new_string(args); + if (props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + data->props = props; + + capture_props = pw_properties_new(NULL, NULL); + if (capture_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + data->capture_props = capture_props; + + data->module = module; + data->module_context = context; + data->work = pw_context_get_work_queue(context); + if (data->work == NULL) { + res = -errno; + pw_log_error( "can't get work queue: %m"); + goto out; + } + + if ((str = pw_properties_get(props, "sink.name")) != NULL) { + pw_properties_set(capture_props, PW_KEY_NODE_NAME, str); + pw_properties_set(props, "sink.name", NULL); + } + + if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Sink"); + + if ((str = pw_properties_get(props, "remote.ip")) != NULL) { + remote_ip = strdup(str); + pw_properties_set(props, "remote.ip", NULL); + } else { + pw_log_error("Remote IP not specified"); + res = -EINVAL; + goto out; + } + + if ((str = pw_properties_get(props, "local.ip")) != NULL) { + local_ip = strdup(str); + pw_properties_set(props, "local.ip", NULL); + } else { + local_ip = strdup(ROC_DEFAULT_IP); + } + + if ((str = pw_properties_get(props, "remote.source.port")) != NULL) { + remote_source_port = pw_properties_parse_int(str); + pw_properties_set(props, "remote.source.port", NULL); + } else { + remote_source_port = ROC_DEFAULT_SOURCE_PORT; + } + + if ((str = pw_properties_get(props, "remote.repair.port")) != NULL) { + remote_repair_port = pw_properties_parse_int(str); + pw_properties_set(props, "remote.repair.port", NULL); + } else { + remote_repair_port = ROC_DEFAULT_REPAIR_PORT; + } + + data->core = pw_context_get_object(data->module_context, PW_TYPE_INTERFACE_Core); + if (data->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + data->core = pw_context_connect(data->module_context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + data->do_disconnect = true; + } + if (data->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)data->core, + &data->core_proxy_listener, + &core_proxy_events, data); + pw_core_add_listener(data->core, + &data->core_listener, + &core_events, data); + + data->capture_props = capture_props; + data->local_ip = local_ip; + data->remote_ip = remote_ip; + data->remote_source_port = remote_source_port; + data->remote_repair_port = remote_repair_port; + + if ((res = roc_sink_setup(data)) < 0) + goto out; + + pw_impl_module_add_listener(module, &data->module_listener, &module_events, data); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_roc_sink_info)); + + pw_log_info("Successfully loaded module-roc-sink"); + + return 0; + +out: + impl_destroy(data); + return res; +} diff --git a/src/modules/module-roc-source.c b/src/modules/module-roc-source.c new file mode 100644 index 000000000..04603f0c1 --- /dev/null +++ b/src/modules/module-roc-source.c @@ -0,0 +1,541 @@ +/* PipeWire + * + * Copyright © 2021 Wim Taymans + * Copyright © 2021 Sanchayan Maity + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include + +#include "config.h" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/** \page page_module_roc_source PipeWire Module: ROC source + * + * The `roc-source` module creates a PipeWire source that receives samples + * from ROC sender and passes them to the sink it is connected to. One can + * then connect it to any audio device. + * + * ## Module Options + * + * Options specific to the behavior of this module + * + * - `source.props = {}`: properties to be passed to the source stream + * - `local.ip = `: local sender ip + * - `local.source.port = `: local receiver port for source packets + * - `local.repair.port = `: local receiver port for receiver packets + * - `sess.latency.msec = `: target network latency in milliseconds + * - `resampler.profile = `: Possible values: `disable`, `high`, + * `medium`, `low`. + * + * ## General options + * + * Options with well-known behavior: + * + * - \ref PW_KEY_NODE_NAME + * + * ## Example configuration + *\code{.unparsed} + * context.modules = [ + * { name = libpipewire-module-roc-source + * args = { + * local.ip = 0.0.0.0 + * resampler.profile = medium + * sess.latency.msec = 5000 + * local.source.port = 10001 + * local.repair.port = 10002 + * source.props = { + * node.name = "roc-source" + * } + * } + * } + *] + *\endcode + * + */ + +#define ROC_DEFAULT_IP "0.0.0.0" +#define ROC_DEFAULT_SOURCE_PORT 10001 +#define ROC_DEFAULT_REPAIR_PORT 10002 +#define ROC_DEFAULT_SESS_LATENCY 200 + +struct module_roc_source_data { + struct pw_impl_module *module; + struct pw_work_queue *work; + struct spa_hook module_listener; + struct pw_properties *props; + struct pw_context *module_context; + + struct pw_core *core; + struct spa_hook core_listener; + struct spa_hook core_proxy_listener; + + struct pw_stream *playback; + struct spa_hook playback_listener; + struct pw_properties *playback_props; + + unsigned int do_disconnect:1; + unsigned int unloading:1; + + roc_address local_addr; + roc_address local_source_addr; + roc_address local_repair_addr; + roc_context *context; + roc_receiver *receiver; + + char *resampler_profile; + char *local_ip; + int local_source_port; + int local_repair_port; + int sess_latency_msec; +}; + +static void do_unload_module(void *obj, void *d, int res, uint32_t id) +{ + struct module_roc_source_data *data = d; + pw_impl_module_destroy(data->module); +} + +static void unload_module(struct module_roc_source_data *data) +{ + if (!data->unloading) { + data->unloading = true; + pw_work_queue_add(data->work, data, 0, do_unload_module, data); + } +} + +static void stream_destroy(void *d) +{ + struct module_roc_source_data *data = d; + spa_hook_remove(&data->playback_listener); + data->playback = NULL; +} + +static int roc_parse_resampler_profile(roc_resampler_profile *out, const char *str) +{ + if (!str || !*str) { + *out = ROC_RESAMPLER_DEFAULT; + return 0; + } else if (spa_streq(str, "disable") == 0) { + *out = ROC_RESAMPLER_DISABLE; + return 0; + } else if (spa_streq(str, "high") == 0) { + *out = ROC_RESAMPLER_HIGH; + return 0; + } else if (spa_streq(str, "medium") == 0) { + *out = ROC_RESAMPLER_MEDIUM; + return 0; + } else if (spa_streq(str, "low") == 0) { + *out = ROC_RESAMPLER_LOW; + return 0; + } else { + pw_log_error("Invalid resampler profile: %s", str); + return -EINVAL; + } +} + +static void playback_process(void *data) +{ + struct module_roc_source_data *impl = data; + struct pw_buffer *b; + struct spa_buffer *buf; + roc_frame frame; + uint8_t *dst; + + if ((b = pw_stream_dequeue_buffer(impl->playback)) == NULL) { + pw_log_warn("Out of playback buffers: %m"); + return; + } + + buf = b->buffer; + if ((dst = buf->datas[0].data) == NULL) + return; + + buf->datas[0].chunk->offset = 0; + buf->datas[0].chunk->stride = 8; /* channels = 2, format = F32LE */ + buf->datas[0].chunk->size = 0; + + memset(&frame, 0, sizeof(frame)); + + frame.samples = dst; + frame.samples_size = buf->datas[0].maxsize; + + if (roc_receiver_read(impl->receiver, &frame) != 0) { + /* Handle EOF and error */ + pw_log_error("Failed to read from roc source"); + unload_module(data); + return; + } + + buf->datas[0].chunk->size = frame.samples_size; + + pw_stream_queue_buffer(impl->playback, b); +} + +static void on_core_error(void *d, uint32_t id, int seq, int res, const char *message) +{ + struct module_roc_source_data *data = d; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE && res == -EPIPE) + unload_module(data); +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .error = on_core_error, +}; + +static void on_stream_state_changed(void *d, enum pw_stream_state old, + enum pw_stream_state state, const char *error) +{ + struct module_roc_source_data *data = d; + + switch (state) { + case PW_STREAM_STATE_UNCONNECTED: + pw_log_info("stream disconnected, unloading"); + unload_module(data); + break; + case PW_STREAM_STATE_ERROR: + pw_log_error("stream error: %s", error); + break; + default: + break; + } +} + +static const struct pw_stream_events out_stream_events = { + PW_VERSION_STREAM_EVENTS, + .destroy = stream_destroy, + .state_changed = on_stream_state_changed, + .process = playback_process +}; + +static void core_destroy(void *d) +{ + struct module_roc_source_data *data = d; + spa_hook_remove(&data->core_listener); + data->core = NULL; + unload_module(data); +} + +static const struct pw_proxy_events core_proxy_events = { + .destroy = core_destroy, +}; + +static void impl_destroy(struct module_roc_source_data *data) +{ + if (data->playback) + pw_stream_destroy(data->playback); + if (data->core && data->do_disconnect) + pw_core_disconnect(data->core); + + pw_properties_free(data->playback_props); + pw_properties_free(data->props); + + if (data->work) + pw_work_queue_cancel(data->work, data, SPA_ID_INVALID); + if (data->receiver) + roc_receiver_close(data->receiver); + if (data->context) + roc_context_close(data->context); + + free(data->local_ip); + free(data->resampler_profile); + free(data); +} + +static void module_destroy(void *d) +{ + struct module_roc_source_data *data = d; + data->unloading = true; + spa_hook_remove(&data->module_listener); + impl_destroy(data); +} + +static const struct pw_impl_module_events module_events = { + PW_VERSION_IMPL_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static int roc_source_setup(struct module_roc_source_data *data) +{ + roc_context_config context_config; + roc_receiver_config receiver_config; + struct spa_audio_info_raw info = { 0 }; + const struct spa_pod *params[1]; + struct spa_pod_builder b; + uint32_t n_params; + uint8_t buffer[1024]; + int res; + + if (roc_address_init(&data->local_addr, ROC_AF_AUTO, data->local_ip, 0)) { + pw_log_error("Invalid local IP address"); + return -EINVAL; + } + + if (roc_address_init(&data->local_source_addr, ROC_AF_AUTO, data->local_ip, + data->local_source_port)) { + pw_log_error("Invalid local source address"); + return -EINVAL; + } + + if (roc_address_init(&data->local_repair_addr, ROC_AF_AUTO, data->local_ip, + data->local_repair_port)) { + pw_log_error("Invalid local repair address"); + return -EINVAL; + } + + memset(&context_config, 0, sizeof(context_config)); + + data->context = roc_context_open(&context_config); + if (!data->context) { + pw_log_error("Failed to create roc context"); + return -EINVAL; + } + + memset(&receiver_config, 0, sizeof(receiver_config)); + + receiver_config.frame_sample_rate = 44100; + receiver_config.frame_channels = ROC_CHANNEL_SET_STEREO; + receiver_config.frame_encoding = ROC_FRAME_ENCODING_PCM_FLOAT; + + /* Fixed to be the same as ROC receiver config above */ + info.rate = 44100; + info.channels = 2; + info.format = SPA_AUDIO_FORMAT_F32_LE; + + if (roc_parse_resampler_profile(&receiver_config.resampler_profile, + data->resampler_profile)) { + pw_log_error("Invalid resampler profile"); + return -EINVAL; + } + + /* + * Note that target latency is in nano seconds. + * + * The session will not start playing until it accumulates the + * requested latency. Then if resampler is enabled, the session will + * adjust it's clock to keep actual latency as close as possible to + * the target latency. If zero, default value will be used. + * + * See API reference: + * https://roc-streaming.org/toolkit/docs/api/reference.html + */ + receiver_config.target_latency = data->sess_latency_msec * 1000000; + + data->receiver = roc_receiver_open(data->context, &receiver_config); + if (!data->receiver) { + pw_log_error("Failed to create roc receiver"); + return -EINVAL; + } + + if (roc_receiver_bind(data->receiver, ROC_PORT_AUDIO_SOURCE, ROC_PROTO_RTP_RS8M_SOURCE, + &data->local_source_addr) != 0) { + pw_log_error("can't connect roc receiver to local source address"); + return -EINVAL; + } + + if (roc_receiver_bind(data->receiver, ROC_PORT_AUDIO_REPAIR, ROC_PROTO_RS8M_REPAIR, + &data->local_repair_addr) != 0) { + pw_log_error("can't connect roc receiver to local repair address"); + return -EINVAL; + } + + data->playback = pw_stream_new(data->core, + "roc-source playback", data->playback_props); + data->playback_props = NULL; + if (data->playback == NULL) + return -errno; + + pw_stream_add_listener(data->playback, + &data->playback_listener, + &out_stream_events, data); + + n_params = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[n_params++] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &info); + + if ((res = pw_stream_connect(data->playback, + PW_DIRECTION_OUTPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, n_params)) < 0) + return res; + + return 0; +} + +static const struct spa_dict_item module_roc_source_info[] = { + { PW_KEY_MODULE_AUTHOR, "Sanchayan Maity " }, + { PW_KEY_MODULE_DESCRIPTION, "roc source" }, + { PW_KEY_MODULE_USAGE, "source.name= " + "resampler.profile=|disable|high|medium|low " + "sess.latency.msec= " + "local.ip= " + "local.source.port= " + "local.repair.port= " }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +SPA_EXPORT +int pipewire__module_init(struct pw_impl_module *module, const char *args) +{ + struct pw_context *context = pw_impl_module_get_context(module); + struct module_roc_source_data *data; + struct pw_properties *props = NULL, *playback_props = NULL; + const char *str; + char *local_ip = NULL, *resampler_profile = NULL; + int res = 0, local_repair_port, local_source_port, sess_latency_msec; + + data = calloc(1, sizeof(struct module_roc_source_data)); + if (data == NULL) + return -errno; + + if (args == NULL) + args = ""; + + props = pw_properties_new_string(args); + if (props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + data->props = props; + + playback_props = pw_properties_new(NULL, NULL); + if (playback_props == NULL) { + res = -errno; + pw_log_error( "can't create properties: %m"); + goto out; + } + data->playback_props = playback_props; + + data->module = module; + data->module_context = context; + data->work = pw_context_get_work_queue(context); + if (data->work == NULL) { + res = -errno; + pw_log_error( "can't get work queue: %m"); + goto out; + } + + if ((str = pw_properties_get(props, "source.name")) != NULL) { + pw_properties_set(playback_props, PW_KEY_NODE_NAME, str); + pw_properties_set(props, "source.name", NULL); + } + + if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) == NULL) + pw_properties_set(props, PW_KEY_MEDIA_CLASS, "Audio/Source"); + + if ((str = pw_properties_get(props, "local.ip")) != NULL) { + local_ip = strdup(str); + pw_properties_set(props, "local.ip", NULL); + } else { + local_ip = strdup(ROC_DEFAULT_IP); + } + + if ((str = pw_properties_get(props, "local.source.port")) != NULL) { + local_source_port = pw_properties_parse_int(str); + pw_properties_set(props, "local.source.port", NULL); + } else { + local_source_port = ROC_DEFAULT_SOURCE_PORT; + } + + if ((str = pw_properties_get(props, "local.repair.port")) != NULL) { + local_repair_port = pw_properties_parse_int(str); + pw_properties_set(props, "local.repair.port", NULL); + } else { + local_repair_port = ROC_DEFAULT_REPAIR_PORT; + } + + if ((str = pw_properties_get(props, "sess.latency.msec")) != NULL) { + sess_latency_msec = pw_properties_parse_int(str); + pw_properties_set(props, "sess.latency.msec", NULL); + } else { + sess_latency_msec = ROC_DEFAULT_SESS_LATENCY; + } + + if ((str = pw_properties_get(props, "resampler.profile")) != NULL) { + resampler_profile = strdup(str); + pw_properties_set(props, "resampler.profile", NULL); + } + + data->core = pw_context_get_object(data->module_context, PW_TYPE_INTERFACE_Core); + if (data->core == NULL) { + str = pw_properties_get(props, PW_KEY_REMOTE_NAME); + data->core = pw_context_connect(data->module_context, + pw_properties_new( + PW_KEY_REMOTE_NAME, str, + NULL), + 0); + data->do_disconnect = true; + } + if (data->core == NULL) { + res = -errno; + pw_log_error("can't connect: %m"); + goto out; + } + + pw_proxy_add_listener((struct pw_proxy*)data->core, + &data->core_proxy_listener, + &core_proxy_events, data); + pw_core_add_listener(data->core, + &data->core_listener, + &core_events, data); + + data->local_ip = local_ip; + data->local_source_port = local_source_port; + data->local_repair_port = local_repair_port; + data->sess_latency_msec = sess_latency_msec; + data->resampler_profile = resampler_profile; + + if ((res = roc_source_setup(data)) < 0) + goto out; + + pw_impl_module_add_listener(module, &data->module_listener, &module_events, data); + + pw_impl_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_roc_source_info)); + + pw_log_info("Successfully loaded module-roc-source"); + + return 0; +out: + impl_destroy(data); + return res; +}