diff --git a/src/modules/meson.build b/src/modules/meson.build index 69ce942f8..32b48d2a8 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -109,3 +109,15 @@ pipewire_module_audio_dsp = shared_library('pipewire-module-audio-dsp', install_dir : modules_install_dir, dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], ) + +pipewire_module_adapter = shared_library('pipewire-module-adapter', + [ 'module-adapter.c', + 'module-adapter/adapter.c', + 'module-adapter/floatmix.c', + 'spa/spa-node.c' ], + c_args : pipewire_module_c_args, + include_directories : [configinc, spa_inc], + install : true, + install_dir : modules_install_dir, + dependencies : [mathlib, dl_lib, rt_lib, pipewire_dep], +) diff --git a/src/modules/module-adapter.c b/src/modules/module-adapter.c new file mode 100644 index 000000000..897dc9356 --- /dev/null +++ b/src/modules/module-adapter.c @@ -0,0 +1,259 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include + +#include "config.h" + +#include +#include + +#include +#include "pipewire/private.h" + +#include "modules/spa/spa-node.h" +#include "module-adapter/adapter.h" + +#define FACTORY_USAGE SPA_KEY_FACTORY_NAME"= " \ + "["SPA_KEY_LIBRARY_NAME"=] " \ + ADAPTER_USAGE + +static const struct spa_dict_item module_props[] = { + { PW_KEY_MODULE_AUTHOR, "Wim Taymans " }, + { PW_KEY_MODULE_DESCRIPTION, "Manage adapter nodes" }, + { PW_KEY_MODULE_VERSION, PACKAGE_VERSION }, +}; + +struct factory_data { + struct pw_factory *this; + + struct spa_list node_list; + + struct pw_module *module; + struct spa_hook module_listener; +}; + +struct node_data { + struct factory_data *data; + struct spa_list link; + struct pw_node *adapter; + struct spa_hook adapter_listener; + struct spa_hook resource_listener; +}; + +static void resource_destroy(void *data) +{ + struct node_data *nd = data; + spa_hook_remove(&nd->resource_listener); + if (nd->adapter) + pw_node_destroy(nd->adapter); +} + +static const struct pw_resource_events resource_events = { + PW_VERSION_RESOURCE_EVENTS, + .destroy = resource_destroy +}; + +static void node_destroy(void *data) +{ + struct node_data *nd = data; + spa_list_remove(&nd->link); + nd->adapter = NULL; +} + +static const struct pw_node_events node_events = { + PW_VERSION_NODE_EVENTS, + .destroy = node_destroy +}; + +static void *create_object(void *_data, + struct pw_resource *resource, + uint32_t type, + uint32_t version, + struct pw_properties *properties, + uint32_t new_id) +{ + struct factory_data *d = _data; + struct pw_client *client = pw_resource_get_client(resource); + struct pw_node *adapter, *slave; + const char *factory_name; + int res; + struct node_data *nd; + struct pw_resource *bound_resource; + + if (resource == NULL) + goto error_resource; + + if (properties == NULL) + goto error_properties; + + factory_name = pw_properties_get(properties, SPA_KEY_FACTORY_NAME); + if (factory_name == NULL) + goto error_properties; + + slave = pw_spa_node_load(client->core, + NULL, + pw_factory_get_global(d->this), + factory_name, + "slave-node", + PW_SPA_NODE_FLAG_ACTIVATE | + PW_SPA_NODE_FLAG_NO_REGISTER, + pw_properties_copy(properties), 0); + if (slave == NULL) + goto error_no_mem; + + adapter = pw_adapter_new(pw_module_get_core(d->module), + slave, + properties, + sizeof(struct node_data)); + properties = NULL; + + if (adapter == NULL) { + if (errno == ENOMEM) + goto error_no_mem; + else + goto error_usage; + } + + nd = pw_adapter_get_user_data(adapter); + nd->data = d; + nd->adapter = adapter; + spa_list_append(&d->node_list, &nd->link); + + client = pw_resource_get_client(resource); + + pw_node_register(adapter, client, pw_module_get_global(d->module), NULL); + pw_node_add_listener(adapter, &nd->adapter_listener, &node_events, nd); + + res = pw_global_bind(pw_node_get_global(adapter), client, + PW_PERM_RWX, PW_VERSION_NODE_PROXY, new_id); + if (res < 0) + goto error_bind; + + if ((bound_resource = pw_client_find_resource(client, new_id)) == NULL) + goto error_bind; + + pw_resource_add_listener(bound_resource, &nd->resource_listener, &resource_events, nd); + + pw_node_set_active(adapter, true); + + return adapter; + +error_resource: + res = -EINVAL; + pw_log_error("adapter needs a resource"); + goto error_cleanup; +error_properties: + res = -EINVAL; + pw_log_error("factory %p: usage: " FACTORY_USAGE, d->this); + pw_resource_error(resource, res, "usage: " FACTORY_USAGE); + goto error_cleanup; +error_no_mem: + res = -errno; + pw_log_error("can't create node: %m"); + pw_resource_error(resource, res, "can't create node: %s", spa_strerror(res)); + goto error_cleanup; +error_usage: + res = -EINVAL; + pw_log_error("usage: "ADAPTER_USAGE); + pw_resource_error(resource, res, "usage: "ADAPTER_USAGE); + goto error_cleanup; +error_bind: + pw_resource_error(resource, res, "can't bind adapter node"); + goto error_cleanup; +error_cleanup: + if (properties) + pw_properties_free(properties); + errno = -res; + return NULL; +} + +static const struct pw_factory_implementation impl_factory = { + PW_VERSION_FACTORY_IMPLEMENTATION, + .create_object = create_object, +}; + +static void module_destroy(void *data) +{ + struct factory_data *d = data; + struct node_data *nd, *t; + + spa_hook_remove(&d->module_listener); + + spa_list_for_each_safe(nd, t, &d->node_list, link) + pw_node_destroy(nd->adapter); + + pw_factory_destroy(d->this); +} + +static const struct pw_module_events module_events = { + PW_VERSION_MODULE_EVENTS, + .destroy = module_destroy, +}; + +static int module_init(struct pw_module *module, struct pw_properties *properties) +{ + struct pw_core *core = pw_module_get_core(module); + struct pw_factory *factory; + struct factory_data *data; + + factory = pw_factory_new(core, + "adapter", + PW_TYPE_INTERFACE_Node, + PW_VERSION_NODE_PROXY, + pw_properties_new( + PW_KEY_FACTORY_USAGE, FACTORY_USAGE, + NULL), + sizeof(*data)); + if (factory == NULL) + return -errno; + + data = pw_factory_get_user_data(factory); + data->this = factory; + data->module = module; + spa_list_init(&data->node_list); + + pw_log_debug("module %p: new", module); + + pw_factory_set_implementation(factory, + &impl_factory, + data); + + pw_factory_register(factory, NULL, pw_module_get_global(module), NULL); + + pw_module_add_listener(module, &data->module_listener, &module_events, data); + + pw_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); + + return 0; +} + +SPA_EXPORT +int pipewire__module_init(struct pw_module *module, const char *args) +{ + return module_init(module, NULL); +} diff --git a/src/modules/module-adapter/adapter.c b/src/modules/module-adapter/adapter.c new file mode 100644 index 000000000..7d969a12d --- /dev/null +++ b/src/modules/module-adapter/adapter.c @@ -0,0 +1,1524 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "pipewire/pipewire.h" +#include "pipewire/interfaces.h" +#include "pipewire/control.h" +#include "pipewire/private.h" + +#include "modules/spa/spa-node.h" +#include "adapter.h" + +#include "pipewire/core.h" + +#define NAME "adapter" + +#undef spa_debug + +#include +#include + +/** \cond */ + +#define PORT_BUFFERS 1 +#define MAX_BUFFER_SIZE 2048 + +extern const struct spa_handle_factory spa_floatmix_factory; + +struct buffer { + struct spa_buffer buf; + struct spa_data datas[1]; + struct spa_chunk chunk[1]; +}; + +struct node; + +struct port { + struct spa_list link; + + struct pw_port *port; + struct node *node; + + struct buffer buffers[PORT_BUFFERS]; + + struct spa_buffer *bufs[PORT_BUFFERS]; + + struct spa_handle *spa_handle; + struct spa_node *spa_node; + + float empty[MAX_BUFFER_SIZE + 15]; +}; + +struct node { + struct spa_node node; + + struct impl *impl; + + struct spa_log *log; + + uint64_t info_all; + struct spa_node_info info; + struct spa_param_info params[5]; + + struct spa_hook_list hooks; + struct spa_callbacks callbacks; +}; + +struct impl { + struct pw_core *core; + + enum spa_direction direction; + + struct node node; + struct pw_node *this; + struct spa_hook node_listener; + + struct pw_node *slave; + struct spa_hook slave_listener; + struct spa_node *slave_node; + struct pw_port *slave_port; + struct pw_port_mix slave_port_mix; + struct spa_io_buffers slave_io; + + struct spa_handle *handle; + struct spa_node *adapter; + struct spa_hook adapter_listener; + struct spa_node *adapter_mix; + uint32_t adapter_mix_flags; + uint32_t adapter_mix_port; + + struct spa_list ports; + + unsigned int use_converter:1; + unsigned int started:1; + unsigned int active:1; + unsigned int driver:1; + + struct spa_io_buffers *io; + + struct spa_buffer **buffers; + uint32_t n_buffers; + struct pw_memblock *mem; + + struct pw_control_link control; + struct pw_control_link notify; +}; + +/** \endcond */ + +static int impl_node_enum_params(void *object, int seq, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + struct impl *impl; + struct spa_pod *param; + struct spa_pod_builder b = { 0 }; + uint8_t buffer[1024]; + struct spa_result_node_params result; + uint32_t count = 0; + int res; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + + impl = this->impl; + + result.id = id; + result.next = start; +next: + result.index = result.next++; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + + switch (id) { + case SPA_PARAM_PropInfo: + case SPA_PARAM_Props: + if (impl->adapter == impl->slave_node) + return 0; + + if ((res = spa_node_enum_params_sync(impl->adapter, + id, &start, filter, ¶m, &b)) != 1) + return res; + break; + + case SPA_PARAM_EnumFormat: + case SPA_PARAM_Format: + if ((res = spa_node_port_enum_params_sync(impl->slave_node, + impl->direction, 0, + id, &start, filter, ¶m, &b)) != 1) + return res; + break; + + default: + return -ENOENT; + } + + if (spa_pod_filter(&b, &result.param, param, filter) < 0) + goto next; + + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count != num) + goto next; + + return 0; +} + +static void try_link_controls(struct impl *impl) +{ + struct pw_control *cin, *cout; + struct pw_port *target, *port; + int res; + + if (!impl->use_converter) + return; + + if (impl->control.valid || impl->notify.valid) + return; + + target = pw_node_find_port(impl->this, impl->direction, 0); + + if (target == NULL) { + pw_log_warn(NAME " %p: can't link controls", &impl->this); + return; + } + + port = impl->slave_port; + + pw_log_debug(NAME " %p: trying controls", impl); + spa_list_for_each(cout, &port->control_list[SPA_DIRECTION_OUTPUT], port_link) { + spa_list_for_each(cin, &target->control_list[SPA_DIRECTION_INPUT], port_link) { + if ((res = pw_control_add_link(cout, 0, cin, 0, &impl->control)) < 0) + pw_log_error("failed to link controls: %s", spa_strerror(res)); + break; + } + } + spa_list_for_each(cin, &port->control_list[SPA_DIRECTION_INPUT], port_link) { + spa_list_for_each(cout, &target->control_list[SPA_DIRECTION_OUTPUT], port_link) { + if ((res = pw_control_add_link(cout, 0, cin, 0, &impl->notify)) < 0) + pw_log_error("failed to link controls: %s", spa_strerror(res)); + break; + } + } +} + +static void emit_node_info(struct node *this, bool full) +{ + if (full) + this->info.change_mask = this->info_all; + if (this->info.change_mask) { + spa_node_emit_info(&this->hooks, &this->info); + this->info.change_mask = 0; + } +} + +static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + int res = 0; + struct node *this = object; + struct impl *impl; + + impl = this->impl; + + pw_log_debug(NAME" %p: set param %d", this, id); + + switch (id) { + case SPA_PARAM_Profile: + if (impl->started) + return -EIO; + if (impl->adapter && impl->adapter != impl->slave_node) { + if ((res = spa_node_set_param(impl->adapter, id, flags, param)) < 0) + return res; + + try_link_controls(impl); + } + break; + case SPA_PARAM_Props: + if (impl->adapter && impl->adapter != impl->slave_node) { + if ((res = spa_node_set_param(impl->adapter, id, flags, param)) < 0) + return res; + + this->info.change_mask = SPA_NODE_CHANGE_MASK_PARAMS; + this->params[2].flags ^= SPA_PARAM_INFO_SERIAL; + emit_node_info(this, false); + } + break; + default: + res = -ENOTSUP; + break; + } + return res; +} + +static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) +{ + struct node *this = object; + struct impl *impl; + int res = 0; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + if (impl->adapter) + res = spa_node_set_io(impl->adapter, id, data, size); + + if (impl->slave_node && impl->adapter != impl->slave_node) { + res = spa_node_set_io(impl->slave_node, id, data, size); + } + return res; +} + +static int impl_node_send_command(void *object, const struct spa_command *command) +{ + struct node *this = object; + struct impl *impl; + int res; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + switch (SPA_NODE_COMMAND_ID(command)) { + case SPA_NODE_COMMAND_Start: + impl->started = true; + break; + case SPA_NODE_COMMAND_Pause: + impl->started = false; + break; + default: + break; + } + + if ((res = spa_node_send_command(impl->adapter, command)) < 0) + return res; + + if (impl->adapter != impl->slave_node) { + if ((res = spa_node_send_command(impl->slave_node, command)) < 0) + return res; + } + return res; +} + +static void adapter_port_info(void *data, + enum spa_direction direction, uint32_t port_id, + const struct spa_port_info *info) +{ + struct impl *impl = data; + struct node *this = &impl->node; + bool monitor; + + monitor = (info->props && + spa_dict_lookup(info->props, SPA_KEY_PORT_MONITOR) != NULL); + if (monitor) + port_id -= 1; + + if (direction == impl->direction || monitor) { + struct spa_port_info i = *info; + SPA_FLAG_UNSET(i.flags, SPA_PORT_FLAG_DYNAMIC_DATA); + spa_node_emit_port_info(&this->hooks, direction, port_id, &i); + } +} + +static void adapter_result(void *data, int seq, int res, uint32_t type, const void *result) +{ + struct impl *impl = data; + struct node *this = &impl->node; + pw_log_debug("%p: result %d %d", this, seq, res); + spa_node_emit_result(&this->hooks, seq, res, type, result); +} + +static const struct spa_node_events adapter_node_events = { + SPA_VERSION_NODE_EVENTS, + .port_info = adapter_port_info, + .result = adapter_result, +}; + +static int impl_node_add_listener(void *object, + struct spa_hook *listener, + const struct spa_node_events *events, + void *data) +{ + struct node *this = object; + struct impl *impl; + struct spa_hook l; + struct spa_hook_list save; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + pw_log_debug("%p: add listener %p", this, listener); + spa_hook_list_isolate(&this->hooks, &save, listener, events, data); + + emit_node_info(this, true); + + if (impl->adapter && impl->adapter != impl->slave_node) { + spa_zero(l); + spa_node_add_listener(impl->adapter, &l, &adapter_node_events, impl); + spa_hook_remove(&l); + } + + spa_hook_list_join(&this->hooks, &save); + + return 0; +} + +static int +impl_node_set_callbacks(void *object, + const struct spa_node_callbacks *callbacks, + void *data) +{ + struct node *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + + this->callbacks = SPA_CALLBACKS_INIT(callbacks, data); + + return 0; +} + +static int +impl_node_sync(void *object, int seq) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + return spa_node_sync(impl->slave_node, seq); +} + +static int +impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct node *this = object; + struct impl *impl; + int res; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + if (direction != impl->direction) + return -EINVAL; + + if ((res = spa_node_add_port(impl->adapter_mix, direction, port_id, props)) < 0) + return res; + + return res; +} + +static int +impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + if (direction != this->impl->direction) + return -EINVAL; + + return spa_node_remove_port(impl->adapter_mix, direction, port_id); +} + +static int +impl_node_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + + impl = this->impl; + + if (direction != impl->direction) + port_id++; + + pw_log_debug("%p: %d %u", this, seq, id); + + return spa_node_port_enum_params(impl->adapter, seq, direction, port_id, id, + start, num, filter); +} + +static int debug_params(struct impl *impl, struct spa_node *node, + enum spa_direction direction, uint32_t port_id, uint32_t id, struct spa_pod *filter) +{ + struct node *this = &impl->node; + struct spa_pod_builder b = { 0 }; + uint8_t buffer[4096]; + uint32_t state; + struct spa_pod *param; + int res; + + spa_log_error(this->log, "params %s:", spa_debug_type_find_name(spa_type_param, id)); + + state = 0; + while (true) { + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + res = spa_node_port_enum_params_sync(node, + direction, port_id, + id, &state, + NULL, ¶m, &b); + if (res != 1) { + if (res < 0) + spa_log_error(this->log, " error: %s", spa_strerror(res)); + break; + } + spa_debug_pod(2, NULL, param); + } + + spa_log_error(this->log, "failed filter:"); + if (filter) + spa_debug_pod(2, NULL, filter); + + return 0; +} + + +static int negotiate_format(struct impl *impl) +{ + struct node *this = &impl->node; + uint32_t state; + struct spa_pod *format; + uint8_t buffer[4096]; + struct spa_pod_builder b = { 0 }; + int res; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + + spa_log_debug(this->log, NAME "%p: negiotiate", impl); + + state = 0; + if ((res = spa_node_port_enum_params_sync(impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + SPA_PARAM_EnumFormat, &state, + NULL, &format, &b)) != 1) { + debug_params(impl, impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + SPA_PARAM_EnumFormat, NULL); + return -ENOTSUP; + } + + state = 0; + if ((res = spa_node_port_enum_params_sync(impl->slave_node, + impl->direction, 0, + SPA_PARAM_EnumFormat, &state, + format, &format, &b)) != 1) { + debug_params(impl, impl->slave_node, impl->direction, 0, + SPA_PARAM_EnumFormat, format); + return -ENOTSUP; + } + + spa_pod_fixate(format); + if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) + spa_debug_format(0, NULL, format); + + if ((res = spa_node_port_set_param(impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + SPA_PARAM_Format, 0, + format)) < 0) + return res; + + if ((res = spa_node_port_set_param(impl->slave_node, + impl->direction, 0, + SPA_PARAM_Format, 0, + format)) < 0) + return res; + + return res; +} + +static int negotiate_buffers(struct impl *impl) +{ + struct node *this = &impl->node; + uint8_t buffer[4096]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + uint32_t state; + struct spa_pod *param = NULL; + int res, i; + bool in_alloc, out_alloc; + int32_t size, buffers, blocks, align, flags; + uint32_t *aligns; + struct spa_data *datas; + uint32_t in_flags, out_flags; + struct spa_buffer_alloc_info info = { 0, }; + void *skel; + + spa_log_debug(this->log, "%p: %d", impl, impl->n_buffers); + + if (impl->n_buffers > 0) + return 0; + + state = 0; + if ((res = spa_node_port_enum_params_sync(impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + SPA_PARAM_Buffers, &state, + param, ¶m, &b)) != 1) { + debug_params(impl, impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + SPA_PARAM_Buffers, param); + return -ENOTSUP; + } + if (res != 1) + param = NULL; + + state = 0; + if ((res = spa_node_port_enum_params_sync(impl->slave_node, + impl->direction, 0, + SPA_PARAM_Buffers, &state, + param, ¶m, &b)) < 0) { + debug_params(impl, impl->slave_node, impl->direction, 0, + SPA_PARAM_Buffers, param); + return res; + } + + spa_pod_fixate(param); + + in_flags = impl->slave_port->spa_flags; + out_flags = impl->adapter_mix_flags; + + in_alloc = SPA_FLAG_CHECK(in_flags, SPA_PORT_FLAG_CAN_ALLOC_BUFFERS); + out_alloc = SPA_FLAG_CHECK(out_flags, SPA_PORT_FLAG_CAN_ALLOC_BUFFERS); + + flags = 0; + if (out_alloc || in_alloc) { + flags |= SPA_BUFFER_ALLOC_FLAG_NO_DATA; + if (out_alloc) + in_alloc = false; + } + + if ((res = spa_pod_parse_object(param, + SPA_TYPE_OBJECT_ParamBuffers, NULL, + SPA_PARAM_BUFFERS_buffers, SPA_POD_Int(&buffers), + SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(&blocks), + SPA_PARAM_BUFFERS_size, SPA_POD_Int(&size), + SPA_PARAM_BUFFERS_align, SPA_POD_Int(&align))) < 0) + return res; + + spa_log_debug(this->log, "%p: buffers %d, blocks %d, size %d, align %d %d:%d", + impl, buffers, blocks, size, align, in_alloc, out_alloc); + + datas = alloca(sizeof(struct spa_data) * blocks); + memset(datas, 0, sizeof(struct spa_data) * blocks); + aligns = alloca(sizeof(uint32_t) * blocks); + for (i = 0; i < blocks; i++) { + datas[i].type = SPA_DATA_MemPtr; + datas[i].flags = SPA_DATA_FLAG_DYNAMIC; + datas[i].maxsize = size; + aligns[i] = align; + } + + spa_buffer_alloc_fill_info(&info, 0, NULL, blocks, datas, aligns); + + free(impl->buffers); + impl->buffers = calloc(buffers, sizeof(struct spa_buffer *) + info.skel_size); + if (impl->buffers == NULL) + return -errno; + + skel = SPA_MEMBER(impl->buffers, sizeof(struct spa_buffer *) * buffers, void); + + if (impl->mem) { + pw_memblock_free(impl->mem); + impl->mem = NULL; + } + + if ((res = pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | + PW_MEMBLOCK_FLAG_MAP_READWRITE | + PW_MEMBLOCK_FLAG_SEAL, buffers * info.mem_size, + &impl->mem)) < 0) + return res; + + impl->n_buffers = buffers; + + spa_buffer_alloc_layout_array(&info, impl->n_buffers, impl->buffers, + skel, impl->mem->ptr); + + if (in_alloc) { + if ((res = spa_node_port_alloc_buffers(impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + NULL, 0, + impl->buffers, &impl->n_buffers)) < 0) + return res; + } + else { + if ((res = spa_node_port_use_buffers(impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + impl->buffers, impl->n_buffers)) < 0) + return res; + } + if (out_alloc) { + if ((res = spa_node_port_alloc_buffers(impl->slave_port->mix, + impl->direction, 0, + NULL, 0, + impl->buffers, &impl->n_buffers)) < 0) { + return res; + } + } + else { + if ((res = spa_node_port_use_buffers(impl->slave_port->mix, + impl->direction, 0, + impl->buffers, impl->n_buffers)) < 0) { + + if (res != -ENOTSUP) + return res; + + if ((res = spa_node_port_use_buffers(impl->slave_port->node->node, + impl->direction, 0, + impl->buffers, impl->n_buffers)) < 0) + return res; + } + } + + return 0; +} + +static int +impl_node_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct node *this = object; + struct impl *impl; + int res; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + if (direction != impl->direction) + port_id++; + + if ((res = spa_node_port_set_param(impl->adapter_mix, direction, port_id, id, + flags, param)) < 0) + return res; + + if (id == SPA_PARAM_Format && impl->use_converter) { + if (param == NULL) { + if ((res = spa_node_port_set_param(impl->adapter_mix, + SPA_DIRECTION_REVERSE(direction), + impl->adapter_mix_port, + id, 0, NULL)) < 0) + return res; + impl->n_buffers = 0; + } + else { + if (port_id == 0) + res = negotiate_format(impl); + } + } + return res; +} + +static int +impl_node_port_set_io(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t id, + void *data, size_t size) +{ + struct node *this = object; + struct impl *impl; + int res = 0; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + spa_log_debug(this->log, "set io %d %d %d %d", port_id, id, direction, impl->direction); + + if (direction != impl->direction) + port_id++; + + if (impl->use_converter) { + res = spa_node_port_set_io(impl->adapter_mix, direction, port_id, id, data, size); + } + else { + if (direction != impl->direction) + return -EINVAL; + if (id == SPA_IO_Buffers && size >= sizeof(struct spa_io_buffers)) + impl->io = data; + } + return res; +} + +static int +impl_node_port_use_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct node *this = object; + struct impl *impl; + int res; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + if (direction != impl->direction) + port_id++; + + if ((res = spa_node_port_use_buffers(impl->adapter_mix, + direction, port_id, buffers, n_buffers)) < 0) + return res; + + + spa_log_debug(this->log, NAME" %p: %d %d:%d", impl, + n_buffers, direction, port_id); + + if (n_buffers > 0 && impl->use_converter) { + if (port_id == 0) + res = negotiate_buffers(impl); + } + return res; +} + +static int +impl_node_port_alloc_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + struct spa_pod **params, + uint32_t n_params, + struct spa_buffer **buffers, + uint32_t *n_buffers) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + if (direction != impl->direction) + port_id++; + + return spa_node_port_alloc_buffers(impl->adapter_mix, direction, port_id, + params, n_params, buffers, n_buffers); +} + +static int +impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct node *this = object; + struct impl *impl; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + impl = this->impl; + + return spa_node_port_reuse_buffer(impl->adapter, port_id, buffer_id); +} + +static int impl_node_process(void *object) +{ + struct node *this = object; + struct impl *impl = this->impl; + struct spa_io_position *q = impl->this->driver_node->rt.position; + int status; + + spa_log_trace_fp(this->log, "%p: process %zd active:%u convert:%u", + this, q->size * sizeof(float), + impl->active, impl->use_converter); + + if (!impl->active) + return SPA_STATUS_HAVE_BUFFER; + + if (impl->direction == SPA_DIRECTION_INPUT) { + if (impl->use_converter) { + status = spa_node_process(impl->adapter); + } + else { + struct spa_io_buffers tmp; + + spa_log_trace_fp(this->log, "%p: process %d/%d %d/%d", this, + impl->io->status, impl->io->buffer_id, + impl->slave_port_mix.io->status, + impl->slave_port_mix.io->buffer_id); + + tmp = *impl->io; + *impl->io = *impl->slave_port_mix.io; + *impl->slave_port_mix.io = tmp; + + status = impl->slave_port_mix.io->status | impl->io->status; + } + } else { + status = SPA_STATUS_HAVE_BUFFER; + } + + impl->slave->rt.target.signal(impl->slave->rt.target.data); + + return status; +} + +static const struct spa_node_methods impl_node = { + SPA_VERSION_NODE_METHODS, + .add_listener = impl_node_add_listener, + .set_callbacks = impl_node_set_callbacks, + .sync = impl_node_sync, + .enum_params = impl_node_enum_params, + .set_param = impl_node_set_param, + .set_io = impl_node_set_io, + .send_command = impl_node_send_command, + .add_port = impl_node_add_port, + .remove_port = impl_node_remove_port, + .port_enum_params = impl_node_port_enum_params, + .port_set_param = impl_node_port_set_param, + .port_use_buffers = impl_node_port_use_buffers, + .port_alloc_buffers = impl_node_port_alloc_buffers, + .port_set_io = impl_node_port_set_io, + .port_reuse_buffer = impl_node_port_reuse_buffer, + .process = impl_node_process, +}; + +static int +node_init(struct node *this, + struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + uint32_t i; + + for (i = 0; i < n_support; i++) { + if (support[i].type == SPA_TYPE_INTERFACE_Log) + this->log = support[i].data; + } + this->node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl_node, this); + spa_hook_list_init(&this->hooks); + + this->info_all = SPA_NODE_CHANGE_MASK_PARAMS; + this->info = SPA_NODE_INFO_INIT(); + this->info.max_input_ports = 0; + this->info.max_output_ports = 0; + this->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ); + this->params[1] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ); + this->params[2] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE); + this->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_READ); + this->params[4] = SPA_PARAM_INFO(SPA_PARAM_Profile, SPA_PARAM_INFO_WRITE); + this->info.params = this->params; + this->info.n_params = 5; + + return 0; +} + +static int do_port_info(void *data, struct pw_port *port) +{ + struct impl *impl = data; + struct node *node = &impl->node; + struct spa_port_info info; + + info = SPA_PORT_INFO_INIT(); + info.change_mask = SPA_PORT_CHANGE_MASK_FLAGS | SPA_PORT_CHANGE_MASK_PROPS; + info.flags = port->spa_flags; + info.props = &port->properties->dict; + + spa_node_emit_port_info(&node->hooks, + impl->direction, port->port_id, &info); + return 0; +} + +static void emit_port_info(struct impl *impl) +{ + pw_node_for_each_port(impl->slave, + impl->direction, + do_port_info, impl); +} + +static void slave_initialized(void *data) +{ + struct impl *impl = data; + uint32_t state; + uint32_t media_type, media_subtype; + struct spa_pod *format; + uint8_t buffer[4096]; + struct spa_pod_builder b; + int res; + const struct pw_properties *props; + const char *str, *dir, *type; + char media_class[64]; + bool exclusive, monitor; + struct spa_dict_item items[1]; + const struct pw_node_info *info; + + pw_log_debug(NAME " %p: initialized", &impl->this); + + info = pw_node_get_info(impl->slave); + if (info == NULL) + return; + + if (info->n_output_ports == 0) { + impl->direction = SPA_DIRECTION_INPUT; + dir = "Playback"; + } + else { + impl->direction = SPA_DIRECTION_OUTPUT; + dir = "Capture"; + } + + pw_log_debug(NAME " %p: in %d/%d out %d/%d -> %s", &impl->this, + info->n_input_ports, info->max_input_ports, + info->n_output_ports, info->max_output_ports, + dir); + + props = pw_node_get_properties(impl->slave); + if (props != NULL && (str = pw_properties_get(props, PW_KEY_NODE_EXCLUSIVE)) != NULL) + exclusive = pw_properties_parse_bool(str); + else + exclusive = false; + + if (props != NULL && (str = pw_properties_get(props, PW_KEY_STREAM_MONITOR)) != NULL) + monitor = pw_properties_parse_bool(str); + else + monitor = false; + + impl->slave->driver_node = impl->this; + + impl->slave_port = pw_node_find_port(impl->slave, impl->direction, 0); + if (impl->slave_port == NULL) { + pw_log_warn(NAME " %p: can't find slave port", &impl->this); + return; + } + impl->slave_port_mix.io = &impl->slave_port->rt.io; + + if ((res = pw_port_init_mix(impl->slave_port, &impl->slave_port_mix)) < 0) { + pw_log_warn(NAME " %p: can't init slave port mix: %s", &impl->this, + spa_strerror(res)); + return; + } + + if ((res = spa_node_port_set_io(impl->slave_port->mix, + impl->direction, 0, + SPA_IO_Buffers, + impl->slave_port_mix.io, + sizeof(impl->slave_port_mix.io))) < 0) { + pw_log_warn(NAME " %p: can't set port io: %s", &impl->this, + spa_strerror(res)); + } + + state = 0; + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if ((res = spa_node_port_enum_params_sync(impl->slave_node, + impl->direction, 0, + SPA_PARAM_EnumFormat, &state, + NULL, &format, &b)) != 1) { + pw_log_warn(NAME " %p: no format given", &impl->this); + impl->adapter = impl->slave_node; + impl->adapter_mix = impl->slave_port->mix; + impl->adapter_mix_port = 0; + impl->adapter_mix_flags = impl->slave_port->spa_flags; + impl->use_converter = false; + emit_port_info(impl); + return; + } + + if (spa_format_parse(format, &media_type, &media_subtype) < 0) + return; + + pw_log_debug(NAME " %p: %s/%s", &impl->this, + spa_debug_type_find_name(spa_type_media_type, media_type), + spa_debug_type_find_name(spa_type_media_subtype, media_subtype)); + + if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) + spa_debug_format(2, NULL, format); + + if (!exclusive && + media_type == SPA_MEDIA_TYPE_audio && + media_subtype == SPA_MEDIA_SUBTYPE_raw) { + struct spa_dict_item items[4]; + uint32_t n_items; + const char *mode; + void *iface; + + n_items = 0; + if (impl->direction == SPA_DIRECTION_OUTPUT) { + mode = "split"; + } else { + items[n_items++] = SPA_DICT_ITEM_INIT("merger.monitor", "1"); + mode = "merge"; + } + items[n_items++] = SPA_DICT_ITEM_INIT("factory.mode", mode); + items[n_items++] = SPA_DICT_ITEM_INIT("resample.peaks", monitor ? "1" : "0"); + items[n_items++] = SPA_DICT_ITEM_INIT(SPA_KEY_LIBRARY_NAME, + "audioconvert/libspa-audioconvert"); + + if ((impl->handle = pw_core_load_spa_handle(impl->core, + SPA_NAME_AUDIO_CONVERT, + &SPA_DICT_INIT(items, n_items))) == NULL) + return; + + if ((res = spa_handle_get_interface(impl->handle, + SPA_TYPE_INTERFACE_Node, &iface)) < 0) + return; + + impl->adapter = iface; + impl->adapter_mix = impl->adapter; + impl->adapter_mix_port = 0; + impl->use_converter = true; + spa_node_add_listener(impl->adapter, &impl->adapter_listener, + &adapter_node_events, impl); + } + else { + impl->adapter = impl->slave_node; + impl->adapter_mix = impl->slave_port->mix; + impl->adapter_mix_port = 0; + impl->adapter_mix_flags = impl->slave_port->spa_flags; + impl->use_converter = false; + emit_port_info(impl); + } + + if (impl->use_converter) { + if ((res = spa_node_port_set_io(impl->adapter_mix, + SPA_DIRECTION_REVERSE(impl->direction), + impl->adapter_mix_port, + SPA_IO_Buffers, + impl->slave_port_mix.io, + sizeof(impl->slave_port_mix.io))) < 0) + return; + } + + switch (media_type) { + case SPA_MEDIA_TYPE_audio: + type = "Audio"; + break; + case SPA_MEDIA_TYPE_video: + type = "Video"; + break; + default: + type = "Generic"; + break; + } + + snprintf(media_class, sizeof(media_class), "%s/DSP/%s", type, dir); + + items[0] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_CLASS, media_class); + pw_node_update_properties(impl->this, &SPA_DICT_INIT(items, 1)); +} + +static void cleanup(struct impl *impl) +{ + pw_log_debug(NAME " %p: cleanup", &impl->this); + if (impl->use_converter) { + if (impl->handle) + pw_unload_spa_handle(impl->handle); + } + + free(impl->buffers); + if (impl->mem) + pw_memblock_free(impl->mem); + free(impl); +} + +static void slave_destroy(void *data) +{ + struct impl *impl = data; + pw_log_debug(NAME " %p: destroy", &impl->this); + + pw_node_set_driver(impl->slave, NULL); + + spa_hook_remove(&impl->node_listener); + pw_node_destroy(impl->this); + impl->this = NULL; +} + +static void slave_free(void *data) +{ + struct impl *impl = data; + pw_log_debug(NAME " %p: free", &impl->this); + spa_hook_remove(&impl->slave_listener); + cleanup(impl); +} + +static void slave_result(void *data, int seq, int res, uint32_t type, const void *result) +{ + struct impl *impl = data; + struct node *node = &impl->node; + pw_log_debug(NAME " %p: result %d %d", &impl->this, seq, res); + spa_node_emit_result(&node->hooks, seq, res, type, result); +} + +static void slave_active_changed(void *data, bool active) +{ + struct impl *impl = data; + + pw_log_debug(NAME " %p: active %d", &impl->this, active); + impl->active = active; +} + +static void slave_info_changed(void *data, const struct pw_node_info *info) +{ + struct impl *impl = data; + struct pw_node *this = impl->this; + + pw_log_debug(NAME " %p: info changed", this); + + if (this) + pw_node_update_properties(this, info->props); +} + +static const struct pw_node_events slave_events = { + PW_VERSION_NODE_EVENTS, + .destroy = slave_destroy, + .free = slave_free, + .initialized = slave_initialized, + .result = slave_result, + .active_changed = slave_active_changed, + .info_changed = slave_info_changed, +}; + +static void node_destroy(void *data) +{ + struct impl *impl = data; + struct port *p; + + pw_log_debug(NAME " %p: destroy", &impl->this); + + spa_list_consume(p, &impl->ports, link) { + pw_port_set_mix(p->port, NULL, 0); + spa_list_remove(&p->link); + spa_handle_clear(p->spa_handle); + free(p); + } + spa_hook_remove(&impl->slave_listener); +} + +static void node_free(void *data) +{ + struct impl *impl = data; + pw_log_debug(NAME " %p: free", &impl->this); + + pw_node_destroy(impl->slave); + spa_hook_remove(&impl->node_listener); + cleanup(impl); +} + +static void node_initialized(void *data) +{ +// struct impl *impl = data; +// pw_client_node_registered(impl->client_node, impl->this->global); +} + + +static void init_buffer(struct port *port, uint32_t id) +{ + struct buffer *b = &port->buffers[id]; + b->buf.n_metas = 0; + b->buf.metas = NULL; + b->buf.n_datas = 1; + b->buf.datas = b->datas; + b->datas[0].type = SPA_DATA_MemPtr; + b->datas[0].flags = SPA_DATA_FLAG_DYNAMIC; + b->datas[0].fd = -1; + b->datas[0].mapoffset = 0; + b->datas[0].maxsize = SPA_ROUND_DOWN_N(sizeof(port->empty), 16); + b->datas[0].data = SPA_PTR_ALIGN(port->empty, 16, void); + b->datas[0].chunk = b->chunk; + b->datas[0].chunk->offset = 0; + b->datas[0].chunk->size = 0; + b->datas[0].chunk->stride = 0; + port->bufs[id] = &b->buf; + memset(port->empty, 0, sizeof(port->empty)); + pw_log_debug("%p %d", b->datas[0].data, b->datas[0].maxsize); +} + +static void init_port(struct port *p, enum spa_direction direction) +{ + int i; + for (i = 0; i < PORT_BUFFERS; i++) + init_buffer(p, i); +} + +static int port_use_buffers(void *data, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct port *p = data; + struct pw_port *port = p->port; + struct pw_node *node = port->node; + int res, i; + + pw_log_debug(NAME " %p: port %p", node, port); + + if (n_buffers > 0) { + for (i = 0; i < PORT_BUFFERS; i++) + init_buffer(p, i); + + n_buffers = PORT_BUFFERS; + buffers = p->bufs; + } + + res = spa_node_port_use_buffers(port->mix, + pw_direction_reverse(port->direction), + 0, + buffers, + n_buffers); + res = spa_node_port_use_buffers(node->node, + port->direction, + port->port_id, + buffers, + n_buffers); + return res; +} + +static const struct pw_port_implementation port_implementation = { + .use_buffers = port_use_buffers, +}; + +static void node_port_init(void *data, struct pw_port *port) +{ + struct impl *impl = data; + struct node *n = &impl->node; + struct port *p; + const struct pw_properties *old, *nprops; + enum spa_direction direction; + struct pw_properties *new; + const char *str; + void *iface; + const struct spa_support *support; + uint32_t n_support; + char position[8], *prefix; + bool monitor; + + pw_log_debug(NAME" %p: new port %p", impl, port); + + direction = pw_port_get_direction(port) == PW_DIRECTION_INPUT ? + SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT; + + nprops = pw_node_get_properties(impl->this); + old = pw_port_get_properties(port); + + monitor = (str = pw_properties_get(old, PW_KEY_PORT_MONITOR)) != NULL && + pw_properties_parse_bool(str); + + new = pw_properties_new(PW_KEY_FORMAT_DSP, "32 bit float mono audio", NULL); + + if (monitor) + prefix = "monitor"; + else if (direction == SPA_DIRECTION_INPUT) + prefix = "playback"; + else + prefix = "capture"; + + if ((str = pw_properties_get(old, PW_KEY_AUDIO_CHANNEL)) == NULL || + strcmp(str, "UNK") == 0) { + snprintf(position, 7, "%d", port->port_id); + str = position; + } + + pw_properties_setf(new, PW_KEY_PORT_NAME, "%s_%s", prefix, str); + + if (direction == impl->direction) { + pw_properties_setf(new, PW_KEY_PORT_ALIAS1, "%s_pcm:%s:%s%s", + pw_properties_get(nprops, PW_KEY_DEVICE_API), + pw_properties_get(nprops, PW_KEY_NODE_NAME), + direction == SPA_DIRECTION_INPUT ? "in" : "out", + str); + + pw_properties_set(new, PW_KEY_PORT_PHYSICAL, "1"); + pw_properties_set(new, PW_KEY_PORT_TERMINAL, "1"); + } + + pw_port_update_properties(port, &new->dict); + pw_properties_free(new); + + if (direction != SPA_DIRECTION_INPUT) + return; + + p = calloc(1, sizeof(struct port) + + spa_handle_factory_get_size(&spa_floatmix_factory, NULL)); + p->node = n; + p->port = port; + init_port(p, direction); + p->spa_handle = SPA_MEMBER(p, sizeof(struct port), struct spa_handle); + + support = pw_core_get_support(impl->core, &n_support); + spa_handle_factory_init(&spa_floatmix_factory, + p->spa_handle, NULL, + support, n_support); + + spa_handle_get_interface(p->spa_handle, SPA_TYPE_INTERFACE_Node, &iface); + p->spa_node = iface; + + pw_log_debug("mix node %p", p->spa_node); + pw_port_set_mix(port, p->spa_node, PW_PORT_MIX_FLAG_MULTI); + port->impl = SPA_CALLBACKS_INIT(&port_implementation, p); + + spa_list_append(&impl->ports, &p->link); +} + +static const struct pw_node_events node_events = { + PW_VERSION_NODE_EVENTS, + .destroy = node_destroy, + .free = node_free, + .initialized = node_initialized, + .port_init = node_port_init, +}; + +static int node_ready(void *data, int status) +{ + struct impl *impl = data; + pw_log_trace_fp(NAME " %p: ready %d", &impl->this, status); + + if (impl->direction == SPA_DIRECTION_OUTPUT) + status = spa_node_process(impl->adapter); + else + status = SPA_STATUS_NEED_BUFFER | SPA_STATUS_HAVE_BUFFER; + + return spa_node_call_ready(&impl->node.callbacks, status); +} + +static const struct spa_node_callbacks node_callbacks = { + SPA_VERSION_NODE_CALLBACKS, + .ready = node_ready, +}; + +/** Create a new client stream + * \param client an owner \ref pw_client + * \param id an id + * \param name a name + * \param properties extra properties + * \return a newly allocated client stream + * + * Create a new \ref pw_stream. + * + * \memberof pw_client_stream + */ +struct pw_node *pw_adapter_new(struct pw_core *core, + struct pw_node *slave, + struct pw_properties *properties, + size_t user_data_size) +{ + struct impl *impl; + const struct spa_support *support; + uint32_t n_support; + const char *name; + int res; + + impl = calloc(1, sizeof(struct impl)); + if (impl == NULL) { + res = -errno; + goto error_exit_cleanup; + } + + impl->core = core; + + pw_log_debug(NAME " %p: new", impl); + + pw_properties_set(properties, PW_KEY_MEDIA_CLASS, NULL); + + impl->slave = slave; + impl->active = pw_node_is_active(slave); + spa_list_init(&impl->ports); + + impl->slave_node = pw_node_get_implementation(impl->slave); + spa_node_set_callbacks(impl->slave_node, &node_callbacks, impl); + + support = pw_core_get_support(impl->core, &n_support); + node_init(&impl->node, NULL, support, n_support); + impl->node.impl = impl; + + if ((name = pw_properties_get(properties, PW_KEY_NODE_NAME)) == NULL) + name = NAME; + + impl->this = pw_spa_node_new(core, NULL, NULL, + name, + PW_SPA_NODE_FLAG_ASYNC | + PW_SPA_NODE_FLAG_ACTIVATE | + PW_SPA_NODE_FLAG_NO_REGISTER, + (struct spa_node *)&impl->node.node, + NULL, + properties, user_data_size); + properties = NULL; + if (impl->this == NULL) { + res = -errno; + goto error_exit_free; + } + + pw_node_add_listener(impl->slave, + &impl->slave_listener, + &slave_events, impl); + pw_node_add_listener(impl->this, + &impl->node_listener, + &node_events, impl); + + slave_initialized(impl); + + return impl->this; + +error_exit_free: + free(impl); +error_exit_cleanup: + if (properties) + pw_properties_free(properties); + errno = -res; + return NULL; +} + +void *pw_adapter_get_user_data(struct pw_node *node) +{ + return pw_spa_node_get_user_data(node); +} diff --git a/src/modules/module-adapter/adapter.h b/src/modules/module-adapter/adapter.h new file mode 100644 index 000000000..1cf46f248 --- /dev/null +++ b/src/modules/module-adapter/adapter.h @@ -0,0 +1,48 @@ +/* PipeWire + * + * Copyright © 2019 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PIPEWIRE_ADAPTER_H +#define PIPEWIRE_ADAPTER_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define ADAPTER_USAGE PW_KEY_NODE_NAME"= " + +struct pw_node * +pw_adapter_new(struct pw_core *core, + struct pw_node *slave, + struct pw_properties *properties, + size_t user_data_size); + +void *pw_adapter_get_user_data(struct pw_node *node); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_ADAPTER_H */ diff --git a/src/modules/module-adapter/floatmix.c b/src/modules/module-adapter/floatmix.c new file mode 100644 index 000000000..5e665b9f3 --- /dev/null +++ b/src/modules/module-adapter/floatmix.c @@ -0,0 +1,940 @@ +/* Spa + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define NAME "floatmix" + +#define MAX_BUFFERS 64 +#define MAX_PORTS 128 +#define MAX_SAMPLES 1024 + +#define PORT_DEFAULT_VOLUME 1.0 +#define PORT_DEFAULT_MUTE false + +struct port_props { + double volume; + int32_t mute; +}; + +static void port_props_reset(struct port_props *props) +{ + props->volume = PORT_DEFAULT_VOLUME; + props->mute = PORT_DEFAULT_MUTE; +} + +struct buffer { + uint32_t id; +#define BUFFER_FLAG_QUEUED (1 << 0) + uint32_t flags; + + struct spa_list link; + struct spa_buffer *buffer; + struct spa_meta_header *h; + struct spa_buffer buf; + struct spa_data datas[1]; + struct spa_chunk chunk[1]; +}; + +struct port { + uint32_t direction; + uint32_t id; + + struct port_props props; + + struct spa_io_buffers *io; + + uint64_t info_all; + struct spa_port_info info; + struct spa_param_info params[8]; + + unsigned int valid:1; + unsigned int have_format:1; + + struct buffer buffers[MAX_BUFFERS]; + uint32_t n_buffers; + + struct spa_list queue; + size_t queued_bytes; +}; + +struct impl { + struct spa_handle handle; + struct spa_node node; + + struct spa_log *log; + + uint64_t info_all; + struct spa_node_info info; + struct spa_param_info params[8]; + + struct spa_hook_list hooks; + + uint32_t port_count; + uint32_t last_port; + struct port in_ports[MAX_PORTS]; + struct port out_ports[1]; + + bool have_format; + int n_formats; + struct spa_audio_info format; + uint32_t stride; + + bool started; + float empty[MAX_SAMPLES + 15]; +}; + +#define CHECK_FREE_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && !this->in_ports[(p)].valid) +#define CHECK_IN_PORT(this,d,p) ((d) == SPA_DIRECTION_INPUT && (p) < MAX_PORTS && this->in_ports[(p)].valid) +#define CHECK_OUT_PORT(this,d,p) ((d) == SPA_DIRECTION_OUTPUT && (p) == 0) +#define CHECK_PORT(this,d,p) (CHECK_OUT_PORT(this,d,p) || CHECK_IN_PORT (this,d,p)) +#define GET_IN_PORT(this,p) (&this->in_ports[p]) +#define GET_OUT_PORT(this,p) (&this->out_ports[p]) +#define GET_PORT(this,d,p) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(this,p) : GET_OUT_PORT(this,p)) + +static int impl_node_enum_params(void *object, int seq, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + return -ENOTSUP; +} + +static int impl_node_set_param(void *object, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + return -ENOTSUP; +} + +static int impl_node_set_io(void *object, uint32_t id, void *data, size_t size) +{ + return -ENOTSUP; +} + +static int impl_node_send_command(void *object, const struct spa_command *command) +{ + struct impl *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(command != NULL, -EINVAL); + + switch (SPA_NODE_COMMAND_ID(command)) { + case SPA_NODE_COMMAND_Start: + this->started = true; + break; + case SPA_NODE_COMMAND_Pause: + this->started = false; + break; + default: + return -ENOTSUP; + } + return 0; +} + +static void emit_node_info(struct impl *this, bool full) +{ + if (full) + this->info.change_mask = this->info_all; + if (this->info.change_mask) { + spa_node_emit_info(&this->hooks, &this->info); + this->info.change_mask = 0; + } +} + +static void emit_port_info(struct impl *this, struct port *port, bool full) +{ + if (full) + port->info.change_mask = port->info_all; + if (port->info.change_mask) { + spa_node_emit_port_info(&this->hooks, + port->direction, port->id, &port->info); + port->info.change_mask = 0; + } +} + +static int impl_node_add_listener(void *object, + struct spa_hook *listener, + const struct spa_node_events *events, + void *data) +{ + struct impl *this = object; + struct spa_hook_list save; + uint32_t i; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + spa_hook_list_isolate(&this->hooks, &save, listener, events, data); + + emit_node_info(this, true); + emit_port_info(this, GET_OUT_PORT(this, 0), true); + for (i = 0; i < this->last_port; i++) { + if (this->in_ports[i].valid) + emit_port_info(this, GET_IN_PORT(this, i), true); + } + + spa_hook_list_join(&this->hooks, &save); + + return 0; +} + +static int +impl_node_set_callbacks(void *object, + const struct spa_node_callbacks *callbacks, + void *user_data) +{ + return 0; +} + +static int impl_node_add_port(void *object, enum spa_direction direction, uint32_t port_id, + const struct spa_dict *props) +{ + struct impl *this = object; + struct port *port; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_FREE_IN_PORT(this, direction, port_id), -EINVAL); + + port = GET_IN_PORT (this, port_id); + port->direction = direction; + port->id = port_id; + + port_props_reset(&port->props); + + spa_list_init(&port->queue); + port->info_all = SPA_PORT_CHANGE_MASK_FLAGS | + SPA_PORT_CHANGE_MASK_PARAMS; + port->info = SPA_PORT_INFO_INIT(); + port->info.flags = SPA_PORT_FLAG_CAN_USE_BUFFERS | + SPA_PORT_FLAG_NO_REF | + SPA_PORT_FLAG_DYNAMIC_DATA | + SPA_PORT_FLAG_REMOVABLE | + SPA_PORT_FLAG_OPTIONAL; + port->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ); + port->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ); + port->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ); + port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); + port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + port->info.params = port->params; + port->info.n_params = 5; + + this->port_count++; + if (this->last_port <= port_id) + this->last_port = port_id + 1; + port->valid = true; + + spa_log_debug(this->log, NAME " %p: add port %d %d", this, port_id, this->last_port); + emit_port_info(this, port, true); + + return 0; +} + +static int +impl_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id) +{ + struct impl *this = object; + struct port *port; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_IN_PORT(this, direction, port_id), -EINVAL); + + port = GET_IN_PORT (this, port_id); + + port->valid = false; + this->port_count--; + if (port->have_format && this->have_format) { + if (--this->n_formats == 0) + this->have_format = false; + } + spa_memzero(port, sizeof(struct port)); + + if (port_id == this->last_port - 1) { + int i; + + for (i = this->last_port - 1; i >= 0; i--) + if (GET_IN_PORT (this, i)->valid) + break; + + this->last_port = i + 1; + } + spa_log_debug(this->log, NAME " %p: remove port %d %d", this, port_id, this->last_port); + + spa_node_emit_port_info(&this->hooks, direction, port_id, NULL); + + return 0; +} + +static int port_enum_formats(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t index, + struct spa_pod **param, + struct spa_pod_builder *builder) +{ + struct impl *this = object; + + switch (index) { + case 0: + if (this->have_format) { + *param = spa_format_audio_raw_build(builder, SPA_PARAM_EnumFormat, + &this->format.info.raw); + } else { + *param = spa_pod_builder_add_object(builder, + SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw), + SPA_FORMAT_AUDIO_format, SPA_POD_Id(SPA_AUDIO_FORMAT_F32P), + SPA_FORMAT_AUDIO_rate, SPA_POD_CHOICE_RANGE_Int(44100, 1, INT32_MAX), + SPA_FORMAT_AUDIO_channels, SPA_POD_Int(1)); + } + break; + default: + return 0; + } + return 1; +} + +static int +impl_node_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct impl *this = object; + struct port *port; + struct spa_pod *param; + struct spa_pod_builder b = { 0 }; + uint8_t buffer[1024]; + struct spa_result_node_params result; + uint32_t count = 0; + int res; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(num != 0, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + + result.id = id; + result.next = start; +next: + result.index = result.next++; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + + switch (id) { + case SPA_PARAM_EnumFormat: + if ((res = port_enum_formats(this, direction, port_id, result.index, ¶m, &b)) <= 0) + return res; + break; + + case SPA_PARAM_Format: + if (!port->have_format) + return -EIO; + if (result.index > 0) + return 0; + + param = spa_format_audio_raw_build(&b, id, &this->format.info.raw); + break; + + case SPA_PARAM_Buffers: + if (!port->have_format) + return -EIO; + if (result.index > 0) + return 0; + + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamBuffers, id, + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, MAX_BUFFERS), + SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( + 1024 * this->stride, + 16 * this->stride, + INT32_MAX / this->stride), + SPA_PARAM_BUFFERS_stride, SPA_POD_Int(this->stride), + SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); + break; + + case SPA_PARAM_Meta: + switch (result.index) { + case 0: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamMeta, id, + SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header), + SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_header))); + break; + default: + return 0; + } + break; + + case SPA_PARAM_IO: + switch (result.index) { + case 0: + param = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, id, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))); + break; + default: + return 0; + } + break; + default: + return -ENOENT; + } + + if (spa_pod_filter(&b, &result.param, param, filter) < 0) + goto next; + + spa_node_emit_result(&this->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count != num) + goto next; + + return 0; +} + +static int clear_buffers(struct impl *this, struct port *port) +{ + if (port->n_buffers > 0) { + spa_log_debug(this->log, NAME " %p: clear buffers %p", this, port); + port->n_buffers = 0; + spa_list_init(&port->queue); + } + return 0; +} + +static int queue_buffer(struct impl *this, struct port *port, struct buffer *b) +{ + if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_QUEUED)) + return -EINVAL; + + spa_list_append(&port->queue, &b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_QUEUED); + spa_log_trace_fp(this->log, NAME " %p: queue buffer %d", this, b->id); + return 0; +} + +static struct buffer *dequeue_buffer(struct impl *this, struct port *port) +{ + struct buffer *b; + + if (spa_list_is_empty(&port->queue)) + return NULL; + + b = spa_list_first(&port->queue, struct buffer, link); + spa_list_remove(&b->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_QUEUED); + spa_log_trace_fp(this->log, NAME " %p: dequeue buffer %d", this, b->id); + return b; +} + +static int port_set_format(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t flags, + const struct spa_pod *format) +{ + struct impl *this = object; + struct port *port; + int res; + + port = GET_PORT(this, direction, port_id); + + if (format == NULL) { + if (port->have_format) { + port->have_format = false; + if (--this->n_formats == 0) + this->have_format = false; + clear_buffers(this, port); + } + } else { + struct spa_audio_info info = { 0 }; + + if ((res = spa_format_parse(format, &info.media_type, &info.media_subtype)) < 0) + return res; + + if (info.media_type != SPA_MEDIA_TYPE_audio || + info.media_subtype != SPA_MEDIA_SUBTYPE_raw) + return -EINVAL; + + if (spa_format_audio_raw_parse(format, &info.info.raw) < 0) + return -EINVAL; + + if (info.info.raw.format != SPA_AUDIO_FORMAT_F32P) + return -EINVAL; + if (info.info.raw.channels != 1) + return -EINVAL; + + if (this->have_format) { + if (info.info.raw.rate != this->format.info.raw.rate) + return -EINVAL; + } else { + this->stride = sizeof(float); + this->have_format = true; + this->format = info; + } + if (!port->have_format) { + this->n_formats++; + port->have_format = true; + spa_log_debug(this->log, NAME " %p: set format on port %d:%d", + this, direction, port_id); + } + } + port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + if (port->have_format) { + port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_READWRITE); + port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, SPA_PARAM_INFO_READ); + } else { + port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); + port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + } + emit_port_info(this, port, false); + + return 0; +} + + +static int +impl_node_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct impl *this = object; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + if (id == SPA_PARAM_Format) { + return port_set_format(this, direction, port_id, flags, param); + } + else + return -ENOENT; +} + +static int +impl_node_port_use_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct impl *this = object; + struct port *port; + uint32_t i; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + + spa_log_debug(this->log, NAME " %p: use buffers %d on port %d:%d", + this, n_buffers, direction, port_id); + + spa_return_val_if_fail(port->have_format, -EIO); + + clear_buffers(this, port); + + for (i = 0; i < n_buffers; i++) { + struct buffer *b; + struct spa_data *d = buffers[i]->datas; + + b = &port->buffers[i]; + b->buffer = buffers[i]; + b->flags = 0; + b->id = i; + b->h = spa_buffer_find_meta_data(buffers[i], SPA_META_Header, sizeof(*b->h)); + + if (!((d[0].type == SPA_DATA_MemPtr || + d[0].type == SPA_DATA_MemFd || + d[0].type == SPA_DATA_DmaBuf) && d[0].data != NULL)) { + spa_log_error(this->log, NAME " %p: invalid memory on buffer %d", this, i); + return -EINVAL; + } + if (!SPA_IS_ALIGNED(d[0].data, 16)) { + spa_log_warn(this->log, NAME " %p: memory on buffer %d not aligned", this, i); + } + if (direction == SPA_DIRECTION_OUTPUT) + queue_buffer(this, port, b); + } + port->n_buffers = n_buffers; + + return 0; +} + +static int +impl_node_port_alloc_buffers(void *object, + enum spa_direction direction, + uint32_t port_id, + struct spa_pod **params, + uint32_t n_params, + struct spa_buffer **buffers, + uint32_t *n_buffers) +{ + return -ENOTSUP; +} + +static int +impl_node_port_set_io(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, void *data, size_t size) +{ + struct impl *this = object; + struct port *port; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, direction, port_id), -EINVAL); + + port = GET_PORT(this, direction, port_id); + + switch (id) { + case SPA_IO_Buffers: + port->io = data; + break; + default: + return -ENOENT; + } + return 0; +} + +static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct impl *this = object; + struct port *port; + + spa_return_val_if_fail(this != NULL, -EINVAL); + spa_return_val_if_fail(CHECK_PORT(this, SPA_DIRECTION_OUTPUT, port_id), -EINVAL); + port = GET_OUT_PORT(this, 0); + + if (buffer_id >= port->n_buffers) + return -EINVAL; + + return queue_buffer(this, port, &port->buffers[buffer_id]); +} + +#if defined (__SSE__) +#include +static void mix_2(float * dst, const float * SPA_RESTRICT src1, + const float * SPA_RESTRICT src2, uint32_t n_samples) +{ + uint32_t n, unrolled; + __m128 in1[4], in2[4]; + + if (SPA_IS_ALIGNED(src1, 16) && + SPA_IS_ALIGNED(src2, 16) && + SPA_IS_ALIGNED(dst, 16)) + unrolled = n_samples & ~15; + else + unrolled = 0; + + for (n = 0; n < unrolled; n += 16) { + in1[0] = _mm_load_ps(&src1[n+ 0]); + in1[1] = _mm_load_ps(&src1[n+ 4]); + in1[2] = _mm_load_ps(&src1[n+ 8]); + in1[3] = _mm_load_ps(&src1[n+12]); + + in2[0] = _mm_load_ps(&src2[n+ 0]); + in2[1] = _mm_load_ps(&src2[n+ 4]); + in2[2] = _mm_load_ps(&src2[n+ 8]); + in2[3] = _mm_load_ps(&src2[n+12]); + + in1[0] = _mm_add_ps(in1[0], in2[0]); + in1[1] = _mm_add_ps(in1[1], in2[1]); + in1[2] = _mm_add_ps(in1[2], in2[2]); + in1[3] = _mm_add_ps(in1[3], in2[3]); + + _mm_store_ps(&dst[n+ 0], in1[0]); + _mm_store_ps(&dst[n+ 4], in1[1]); + _mm_store_ps(&dst[n+ 8], in1[2]); + _mm_store_ps(&dst[n+12], in1[3]); + } + for (; n < n_samples; n++) { + in1[0] = _mm_load_ss(&src1[n]), + in2[0] = _mm_load_ss(&src2[n]), + in1[0] = _mm_add_ss(in1[0], in2[0]); + _mm_store_ss(&dst[n], in1[0]); + } +} +#else +static void mix_2(float * dst, const float * SPA_RESTRICT src1, + const float * SPA_RESTRICT src2, uint32_t n_samples) +{ + uint32_t i; + for (i = 0; i < n_samples; i++) + dst[i] = src1[i] + src2[i]; +} +#endif + + +static int impl_node_process(void *object) +{ + struct impl *this = object; + struct port *outport; + struct spa_io_buffers *outio; + uint32_t n_samples, n_buffers, i, maxsize; + struct buffer **buffers; + struct buffer *outb; + + spa_return_val_if_fail(this != NULL, -EINVAL); + + outport = GET_OUT_PORT(this, 0); + outio = outport->io; + spa_return_val_if_fail(outio != NULL, -EIO); + + spa_log_trace_fp(this->log, NAME " %p: status %p %d %d", + this, outio, outio->status, outio->buffer_id); + + if (outio->status == SPA_STATUS_HAVE_BUFFER) + return outio->status; + + /* recycle */ + if (outio->buffer_id < outport->n_buffers) { + queue_buffer(this, outport, &outport->buffers[outio->buffer_id]); + outio->buffer_id = SPA_ID_INVALID; + } + + buffers = alloca(MAX_PORTS * sizeof(struct buffer *)); + n_buffers = 0; + + maxsize = MAX_SAMPLES * sizeof(float); + + for (i = 0; i < this->last_port; i++) { + struct port *inport = GET_IN_PORT(this, i); + struct spa_io_buffers *inio = NULL; + struct buffer *inb; + + if (!inport->valid || + (inio = inport->io) == NULL || + inio->buffer_id >= inport->n_buffers || + inio->status != SPA_STATUS_HAVE_BUFFER) { + spa_log_trace_fp(this->log, NAME " %p: skip input %d %d %p %d %d %d", this, + i, inport->valid, inio, + inio ? inio->status : -1, + inio ? inio->buffer_id : SPA_ID_INVALID, + inport->n_buffers); + continue; + } + + spa_log_trace_fp(this->log, NAME " %p: mix input %d %p->%p %d %d", this, + i, inio, outio, inio->status, inio->buffer_id); + + inb = &inport->buffers[inio->buffer_id]; + + maxsize = SPA_MIN(inb->buffer->datas[0].chunk->size, maxsize); + + buffers[n_buffers++] = inb; + inio->status = SPA_STATUS_NEED_BUFFER; + } + + outb = dequeue_buffer(this, outport); + if (outb == NULL) { + spa_log_trace(this->log, NAME " %p: out of buffers", this); + return -EPIPE; + } + + n_samples = maxsize / sizeof(float); + + if (n_buffers == 1) { + *outb->buffer = *buffers[0]->buffer; + } + else { + float *dst; + + outb->buffer->n_datas = 1; + outb->buffer->datas = outb->datas; + outb->datas[0].data = SPA_PTR_ALIGN(this->empty, 16, void); + outb->datas[0].chunk = outb->chunk; + outb->datas[0].chunk->offset = 0; + outb->datas[0].chunk->size = n_samples * sizeof(float); + outb->datas[0].chunk->stride = sizeof(float); + + dst = outb->datas[0].data; + if (n_buffers == 0) { + memset(dst, 0, n_samples * sizeof(float)); + } + else { + /* first 2 buffers, add and store */ + mix_2(dst, buffers[0]->buffer->datas[0].data, + buffers[1]->buffer->datas[0].data, n_samples); + /* next buffers */ + for (i = 2; i < n_buffers; i++) + mix_2(dst, dst, buffers[i]->buffer->datas[0].data, n_samples); + } + } + + outio->buffer_id = outb->id; + outio->status = SPA_STATUS_HAVE_BUFFER; + + return SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER; +} + +static const struct spa_node_methods impl_node = { + SPA_VERSION_NODE_METHODS, + .add_listener = impl_node_add_listener, + .set_callbacks = impl_node_set_callbacks, + .enum_params = impl_node_enum_params, + .set_param = impl_node_set_param, + .set_io = impl_node_set_io, + .send_command = impl_node_send_command, + .add_port = impl_node_add_port, + .remove_port = impl_node_remove_port, + .port_enum_params = impl_node_port_enum_params, + .port_set_param = impl_node_port_set_param, + .port_use_buffers = impl_node_port_use_buffers, + .port_alloc_buffers = impl_node_port_alloc_buffers, + .port_set_io = impl_node_port_set_io, + .port_reuse_buffer = impl_node_port_reuse_buffer, + .process = impl_node_process, +}; + +static int impl_get_interface(struct spa_handle *handle, uint32_t type, void **interface) +{ + struct impl *this; + + spa_return_val_if_fail(handle != NULL, -EINVAL); + spa_return_val_if_fail(interface != NULL, -EINVAL); + + this = (struct impl *) handle; + + if (type == SPA_TYPE_INTERFACE_Node) + *interface = &this->node; + else + return -ENOENT; + + return 0; +} + +static int impl_clear(struct spa_handle *handle) +{ + return 0; +} + +static size_t +impl_get_size(const struct spa_handle_factory *factory, + const struct spa_dict *params) +{ + return sizeof(struct impl); +} + +static int +impl_init(const struct spa_handle_factory *factory, + struct spa_handle *handle, + const struct spa_dict *info, + const struct spa_support *support, + uint32_t n_support) +{ + struct impl *this; + struct port *port; + uint32_t i; + + spa_return_val_if_fail(factory != NULL, -EINVAL); + spa_return_val_if_fail(handle != NULL, -EINVAL); + + handle->get_interface = impl_get_interface; + handle->clear = impl_clear; + + this = (struct impl *) handle; + + for (i = 0; i < n_support; i++) { + if (support[i].type == SPA_TYPE_INTERFACE_Log) + this->log = support[i].data; + } + + spa_hook_list_init(&this->hooks); + + this->node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl_node, this); + this->info = SPA_NODE_INFO_INIT(); + this->info.max_input_ports = MAX_PORTS; + this->info.max_output_ports = 1; + this->info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS; + this->info.flags = SPA_NODE_FLAG_RT | SPA_NODE_FLAG_DYNAMIC_INPUT_PORTS; + + port = GET_OUT_PORT(this, 0); + port->valid = true; + port->direction = SPA_DIRECTION_OUTPUT; + port->id = 0; + port->info = SPA_PORT_INFO_INIT(); + port->info.change_mask |= SPA_PORT_CHANGE_MASK_FLAGS; + port->info.flags = SPA_PORT_FLAG_CAN_USE_BUFFERS | + SPA_PORT_FLAG_DYNAMIC_DATA; + port->info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS; + port->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ); + port->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, SPA_PARAM_INFO_READ); + port->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, SPA_PARAM_INFO_READ); + port->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); + port->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + port->info.params = port->params; + port->info.n_params = 5; + + spa_list_init(&port->queue); + + return 0; +} + +static const struct spa_interface_info impl_interfaces[] = { + {SPA_TYPE_INTERFACE_Node,}, +}; + +static int +impl_enum_interface_info(const struct spa_handle_factory *factory, + const struct spa_interface_info **info, + uint32_t *index) +{ + spa_return_val_if_fail(factory != NULL, -EINVAL); + spa_return_val_if_fail(info != NULL, -EINVAL); + spa_return_val_if_fail(index != NULL, -EINVAL); + + switch (*index) { + case 0: + *info = &impl_interfaces[*index]; + break; + default: + return 0; + } + (*index)++; + return 1; +} + +const struct spa_handle_factory spa_floatmix_factory = { + SPA_VERSION_HANDLE_FACTORY, + SPA_NAME_AUDIO_MIXER, + NULL, + impl_get_size, + impl_init, + impl_enum_interface_info, +};