From 31dacd9d6fa1fe9458309193773ab1605f1a2d57 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 31 Jan 2019 11:02:13 +0100 Subject: [PATCH] remote: move node export code to client-node module Make the code to export objects more generic. Make it possible for modules to register a type to export. Make the client-node also able to export plain spa_nodes. Let the remote signal the global of the exported object if any. We can then remote the (unused) remote_id from the proxy. --- src/examples/export-sink.c | 8 +- src/examples/export-source.c | 8 +- src/examples/export-spa.c | 2 +- src/extensions/client-node.h | 2 + src/modules/meson.build | 1 + src/modules/module-client-node.c | 19 + src/modules/module-client-node/remote-node.c | 1093 ++++++++++++++++++ src/pipewire/core.c | 1 + src/pipewire/interfaces.h | 2 - src/pipewire/module.h | 1 + src/pipewire/node.c | 4 +- src/pipewire/port.c | 14 + src/pipewire/private.h | 6 +- src/pipewire/proxy.c | 1 - src/pipewire/remote.c | 1060 +---------------- src/pipewire/remote.h | 19 +- src/pipewire/stream.c | 38 +- src/pipewire/type.h | 6 +- src/tests/test-remote.c | 4 +- src/tools/pipewire-cli.c | 2 +- 20 files changed, 1236 insertions(+), 1055 deletions(-) create mode 100644 src/modules/module-client-node/remote-node.c diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index 319ee6334..cac7fc8c0 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -72,7 +72,6 @@ struct data { struct pw_remote *remote; struct spa_hook remote_listener; - struct pw_node *node; struct spa_port_info port_info; struct spa_node impl_node; @@ -493,13 +492,8 @@ static void make_node(struct data *data) pw_properties_set(props, PW_NODE_PROP_CATEGORY, "Capture"); pw_properties_set(props, PW_NODE_PROP_ROLE, "Camera"); - data->node = pw_node_new(data->core, "SDL-sink", props, 0); data->impl_node = impl_node; - pw_node_set_implementation(data->node, &data->impl_node); - pw_node_register(data->node, NULL, NULL, NULL); - pw_node_set_active(data->node, true); - - pw_remote_export(data->remote, data->node); + pw_remote_export(data->remote, SPA_TYPE_INTERFACE_Node, props, &data->impl_node); } static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error) diff --git a/src/examples/export-source.c b/src/examples/export-source.c index 43002f0cb..ffdc4ac64 100644 --- a/src/examples/export-source.c +++ b/src/examples/export-source.c @@ -505,14 +505,8 @@ static void make_node(struct data *data) if (data->path) pw_properties_set(props, PW_NODE_PROP_TARGET_NODE, data->path); - data->node = pw_node_new(data->core, "export-source", props, 0); data->impl_node = impl_node; - pw_node_set_implementation(data->node, &data->impl_node); - - pw_node_register(data->node, NULL, NULL, NULL); - pw_node_set_active(data->node, true); - - pw_remote_export(data->remote, data->node); + pw_remote_export(data->remote, SPA_TYPE_INTERFACE_Node, props, &data->impl_node); } static void on_state_changed(void *_data, enum pw_remote_state old, diff --git a/src/examples/export-spa.c b/src/examples/export-spa.c index bb46eb967..b93496cc3 100644 --- a/src/examples/export-spa.c +++ b/src/examples/export-spa.c @@ -70,7 +70,7 @@ static int make_node(struct data *data) pw_node_set_active(data->node, true); - pw_remote_export(data->remote, data->node); + pw_remote_export(data->remote, PW_TYPE_INTERFACE_Node, NULL, data->node); return 0; } diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 0f0ebe6d0..57c94395a 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -38,6 +38,8 @@ struct pw_client_node_proxy; #define PW_VERSION_CLIENT_NODE 0 +#define PW_EXTENSION_MODULE_CLIENT_NODE PIPEWIRE_MODULE_PREFIX "module-client-node" + /** information about a buffer */ struct pw_client_node_buffer { uint32_t mem_id; /**< the memory id for the metadata */ diff --git a/src/modules/meson.build b/src/modules/meson.build index 8c25ee052..fe2e4af80 100644 --- a/src/modules/meson.build +++ b/src/modules/meson.build @@ -25,6 +25,7 @@ endif pipewire_module_client_node = shared_library('pipewire-module-client-node', [ 'module-client-node.c', + 'module-client-node/remote-node.c', 'module-client-node/client-node.c', 'module-client-node/client-stream.c', 'module-client-node/protocol-native.c', diff --git a/src/modules/module-client-node.c b/src/modules/module-client-node.c index 174613001..dadd62b66 100644 --- a/src/modules/module-client-node.c +++ b/src/modules/module-client-node.c @@ -40,6 +40,11 @@ static const struct spa_dict_item module_props[] = { { PW_MODULE_PROP_VERSION, PACKAGE_VERSION }, }; +struct pw_proxy *pw_remote_node_export(struct pw_remote *remote, + uint32_t type, struct pw_properties *props, void *object); +struct pw_proxy *pw_remote_spa_node_export(struct pw_remote *remote, + uint32_t type, struct pw_properties *props, void *object); + struct pw_protocol *pw_protocol_native_ext_client_node_init(struct pw_core *core); struct factory_data { @@ -48,6 +53,9 @@ struct factory_data { struct pw_module *module; struct spa_hook module_listener; + + struct pw_export_type export_node; + struct pw_export_type export_spanode; }; static void *create_object(void *_data, @@ -103,6 +111,9 @@ static void module_destroy(void *data) if (d->properties) pw_properties_free(d->properties); + spa_list_remove(&d->export_node.link); + spa_list_remove(&d->export_spanode.link); + pw_factory_destroy(d->this); } @@ -141,6 +152,14 @@ static int module_init(struct pw_module *module, struct pw_properties *propertie pw_factory_register(factory, NULL, pw_module_get_global(module), NULL); + data->export_node.type = PW_TYPE_INTERFACE_Node; + data->export_node.func = pw_remote_node_export; + pw_core_register_export_type(core, &data->export_node); + + data->export_spanode.type = SPA_TYPE_INTERFACE_Node; + data->export_spanode.func = pw_remote_spa_node_export; + pw_core_register_export_type(core, &data->export_spanode); + pw_module_add_listener(module, &data->module_listener, &module_events, data); pw_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c new file mode 100644 index 000000000..40bc6f395 --- /dev/null +++ b/src/modules/module-client-node/remote-node.c @@ -0,0 +1,1093 @@ +/* PipeWire + * + * Copyright © 2018 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "pipewire/pipewire.h" +#include "pipewire/private.h" + +#include "extensions/protocol-native.h" +#include "extensions/client-node.h" + +#define MAX_MIX 4096 + +/** \cond */ + +struct mapping { + void *ptr; + struct pw_map_range map; + int prot; +}; + +struct mem { + uint32_t id; + int fd; + uint32_t flags; + uint32_t ref; + struct mapping map; +}; + +struct buffer_mem { + uint32_t mem_id; + struct mapping map; +}; + +struct buffer { + uint32_t id; + struct spa_buffer *buf; + struct buffer_mem *mem; + uint32_t n_mem; +}; + +struct mix { + struct spa_list link; + struct pw_port *port; + uint32_t mix_id; + struct pw_port_mix mix; + struct pw_array buffers; + bool active; +}; + +struct node_data { + struct pw_remote *remote; + struct pw_core *core; + + uint32_t remote_id; + int rtwritefd; + struct spa_source *rtsocket_source; + + struct mix mix_pool[MAX_MIX]; + struct spa_list mix[2]; + struct spa_list free_mix; + + struct pw_array mems; + + struct pw_node *node; + struct spa_hook node_listener; + bool do_free; + + struct pw_client_node_proxy *node_proxy; + struct spa_hook node_proxy_listener; + struct spa_hook proxy_listener; + + struct spa_io_position *position; + + struct spa_graph_node_callbacks callbacks; + void *callbacks_data; + + struct spa_graph_state state; + struct spa_graph_link link; +}; + +/** \endcond */ + +static int +do_remove_source(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct node_data *d = user_data; + + if (d->rtsocket_source) { + pw_loop_destroy_source(d->core->data_loop, d->rtsocket_source); + d->rtsocket_source = NULL; + } + return 0; +} + + +static void unhandle_socket(struct node_data *data) +{ + pw_loop_invoke(data->core->data_loop, + do_remove_source, 1, NULL, 0, true, data); +} + +static void +on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) +{ + struct pw_proxy *proxy = user_data; + struct node_data *data = proxy->user_data; + struct spa_graph_node *node = &data->node->rt.root; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + pw_log_warn("got error"); + unhandle_socket(data); + return; + } + + if (mask & SPA_IO_IN) { + uint64_t cmd; + + if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t) || cmd != 1) + pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd); + + pw_log_trace("remote %p: process %p", data->remote, proxy); + spa_graph_run(node->graph); + } +} + +static struct mem *find_mem(struct node_data *data, uint32_t id) +{ + struct mem *m; + pw_array_for_each(m, &data->mems) { + if (m->id == id) + return m; + } + return NULL; +} + +static struct mem *find_mem_ptr(struct node_data *data, void *ptr) +{ + struct mem *m; + pw_array_for_each(m, &data->mems) { + if (m->map.ptr == ptr) + return m; + } + return NULL; +} + +static void *mem_map(struct node_data *data, struct mapping *map, + int fd, int prot, uint32_t offset, uint32_t size) +{ + struct mapping m; + void *ptr; + + pw_map_range_init(&m.map, offset, size, data->core->sc_pagesize); + + if (map->ptr == NULL || map->map.offset != m.map.offset || map->map.size != m.map.size) { + map->ptr = mmap(map->ptr, m.map.size, prot, MAP_SHARED, fd, m.map.offset); + if (map->ptr == MAP_FAILED) { + pw_log_error("remote %p: Failed to mmap memory %d: %m", data, size); + return NULL; + } + map->map = m.map; + } + ptr = SPA_MEMBER(map->ptr, map->map.start, void); + pw_log_debug("remote %p: fd %d mapped %d %d %p", data, fd, offset, size, ptr); + + return ptr; +} + +static void *mem_unmap(struct node_data *data, void *ptr, struct pw_map_range *range) +{ + if (ptr != NULL) { + if (munmap(SPA_MEMBER(ptr, -range->start, void), range->size) < 0) + pw_log_warn("failed to unmap: %m"); + } + return NULL; +} + +static void clear_mem(struct node_data *data, struct mem *m) +{ + if (m->fd != -1) { + bool has_ref = false; + int fd; + struct mem *m2; + + pw_log_debug("remote %p: clear mem %d", data, m->id); + + fd = m->fd; + m->fd = -1; + m->id = SPA_ID_INVALID; + + pw_array_for_each(m2, &data->mems) { + if (m2->fd == fd) { + has_ref = true; + break; + } + } + if (!has_ref) { + m->map.ptr = mem_unmap(data, m->map.ptr, &m->map.map); + close(fd); + } + } +} + +static void clean_transport(struct node_data *data) +{ + struct mem *m; + + if (data->rtsocket_source == NULL) + return; + + unhandle_socket(data); + + pw_array_for_each(m, &data->mems) + clear_mem(data, m); + pw_array_clear(&data->mems); + + close(data->rtwritefd); + data->remote_id = SPA_ID_INVALID; +} + +static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) +{ + mix->port = port; + mix->mix_id = mix_id; + pw_port_init_mix(port, &mix->mix); + mix->active = false; + pw_array_init(&mix->buffers, 32); + pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); +} + +static int +do_deactivate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct mix *mix = user_data; + spa_graph_port_remove(&mix->mix.port); + return 0; +} + +static int +deactivate_mix(struct node_data *data, struct mix *mix) +{ + if (mix->active) { + pw_log_debug("node %p: mix %p deactivate", data, mix); + pw_loop_invoke(data->core->data_loop, + do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); + mix->active = false; + } + return 0; +} + +static int +do_activate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct mix *mix = user_data; + spa_graph_port_add(&mix->port->rt.mix_node, &mix->mix.port); + return 0; +} + +static int +activate_mix(struct node_data *data, struct mix *mix) +{ + if (!mix->active) { + pw_log_debug("node %p: mix %p activate", data, mix); + pw_loop_invoke(data->core->data_loop, + do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); + mix->active = true; + } + return 0; +} + +static struct mix *find_mix(struct node_data *data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id) +{ + struct mix *mix; + + spa_list_for_each(mix, &data->mix[direction], link) { + if (mix->port->port_id == port_id && + mix->mix_id == mix_id) + return mix; + } + return NULL; +} + +static struct mix *ensure_mix(struct node_data *data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id) +{ + struct mix *mix; + struct pw_port *port; + + if ((mix = find_mix(data, direction, port_id, mix_id))) + return mix; + + if (spa_list_is_empty(&data->free_mix)) + return NULL; + + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) + return NULL; + + mix = spa_list_first(&data->free_mix, struct mix, link); + spa_list_remove(&mix->link); + + mix_init(mix, port, mix_id); + spa_list_append(&data->mix[direction], &mix->link); + + return mix; +} + +static void client_node_add_mem(void *object, + uint32_t mem_id, + uint32_t type, int memfd, uint32_t flags) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mem *m; + + m = find_mem(data, mem_id); + if (m) { + pw_log_warn("duplicate mem %u, fd %d, flags %d", + mem_id, memfd, flags); + return; + } + + m = pw_array_add(&data->mems, sizeof(struct mem)); + pw_log_debug("add mem %u, fd %d, flags %d", mem_id, memfd, flags); + + m->id = mem_id; + m->fd = memfd; + m->flags = flags; + m->ref = 0; + m->map.map = PW_MAP_RANGE_INIT; + m->map.ptr = NULL; +} + +static void client_node_transport(void *object, uint32_t node_id, + int readfd, int writefd) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct pw_remote *remote = proxy->remote; + + clean_transport(data); + + data->remote_id = node_id; + + pw_log_debug("remote-node %p: create transport with fds %d %d for node %u", + proxy, readfd, writefd, node_id); + + data->rtwritefd = writefd; + data->rtsocket_source = pw_loop_add_io(remote->core->data_loop, + readfd, + SPA_IO_ERR | SPA_IO_HUP, + true, on_rtsocket_condition, proxy); + if (data->node->active) + pw_client_node_proxy_set_active(data->node_proxy, true); + + pw_remote_events_exported(remote, proxy->id, node_id); +} + +static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_t change_mask) +{ + struct node_data *data = proxy->user_data; + const struct spa_port_info *port_info = NULL; + struct spa_port_info pi; + uint32_t n_params = 0; + struct spa_pod **params = NULL; + + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { + uint32_t idx1, idx2, id; + uint8_t buf[2048]; + struct spa_pod_builder b = { 0 }; + + for (idx1 = 0;;) { + struct spa_pod *param; + + spa_pod_builder_init(&b, buf, sizeof(buf)); + if (spa_node_port_enum_params(port->node->node, + port->direction, port->port_id, + SPA_PARAM_List, &idx1, + NULL, ¶m, &b) <= 0) + break; + + spa_pod_parse_object(param, + SPA_TYPE_OBJECT_ParamList, NULL, + SPA_PARAM_LIST_id, SPA_POD_Id(&id)); + + params = realloc(params, sizeof(struct spa_pod *) * (n_params + 1)); + params[n_params++] = pw_spa_pod_copy(param); + + for (idx2 = 0;;) { + spa_pod_builder_init(&b, buf, sizeof(buf)); + if (spa_node_port_enum_params(port->node->node, + port->direction, port->port_id, + id, &idx2, + NULL, ¶m, &b) <= 0) + break; + + params = realloc(params, sizeof(struct spa_pod *) * (n_params + 1)); + params[n_params++] = pw_spa_pod_copy(param); + } + } + } + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_INFO) { + spa_node_port_get_info(port->node->node, port->direction, port->port_id, &port_info); + pi = * port_info; + pi.flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + } + + pw_client_node_proxy_port_update(data->node_proxy, + port->direction, + port->port_id, + change_mask, + n_params, + (const struct spa_pod **)params, + &pi); + if (params) { + while (n_params > 0) + free(params[--n_params]); + free(params); + } +} + +static void +client_node_set_param(void *object, uint32_t seq, uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + pw_log_warn("set param not implemented"); +} + + +static void +client_node_set_io(void *object, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mem *m; + void *ptr; + + if (memid == SPA_ID_INVALID) { + ptr = NULL; + size = 0; + } + else { + m = find_mem(data, memid); + if (m == NULL) { + pw_log_warn("unknown memory id %u", memid); + return; + } + ptr = mem_map(data, &m->map, m->fd, + PROT_READ|PROT_WRITE, offset, size); + if (ptr == NULL) + return; + m->ref++; + } + + pw_log_debug("node %p: set io %s %p", proxy, + spa_debug_type_find_name(spa_type_io, id), ptr); + + if (id == SPA_IO_Position) { + if (ptr == NULL && data->position) { + m = find_mem_ptr(data, data->position); + if (m && --m->ref == 0) + clear_mem(data, m); + } + data->position = ptr; + } + spa_node_set_io(data->node->node, id, ptr, size); +} + +static void client_node_event(void *object, const struct spa_event *event) +{ + pw_log_warn("unhandled node event %d", SPA_EVENT_TYPE(event)); +} + +static int +do_pause_source(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct node_data *d = user_data; + pw_loop_update_io(d->core->data_loop, + d->rtsocket_source, + SPA_IO_ERR | SPA_IO_HUP); + return 0; +} + +static void client_node_command(void *object, uint32_t seq, const struct spa_command *command) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct pw_remote *remote = proxy->remote; + int res; + + switch (SPA_NODE_COMMAND_ID(command)) { + case SPA_NODE_COMMAND_Pause: + pw_log_debug("node %p: pause %d", proxy, seq); + + if (data->rtsocket_source) { + pw_loop_invoke(data->core->data_loop, + do_pause_source, 1, NULL, 0, true, data); + } + if ((res = spa_node_send_command(data->node->node, command)) < 0) + pw_log_warn("node %p: pause failed", proxy); + + pw_client_node_proxy_done(data->node_proxy, seq, res); + break; + case SPA_NODE_COMMAND_Start: + pw_log_debug("node %p: start %d", proxy, seq); + + if ((res = spa_node_send_command(data->node->node, command)) < 0) { + pw_log_warn("node %p: start failed", proxy); + } + else if (data->rtsocket_source) { + pw_loop_update_io(remote->core->data_loop, + data->rtsocket_source, + SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); + } + + pw_client_node_proxy_done(data->node_proxy, seq, res); + break; + default: + pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command)); + pw_client_node_proxy_done(data->node_proxy, seq, -ENOTSUP); + } +} + +static void +client_node_add_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + pw_log_warn("add port not supported"); +} + +static void +client_node_remove_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + pw_log_warn("remove port not supported"); +} + +static void clear_buffers(struct node_data *data, struct mix *mix) +{ + struct pw_port *port = mix->port; + struct buffer *b; + uint32_t i; + int res; + + pw_log_debug("port %p: clear buffers %d", port, mix->mix_id); + if ((res = pw_port_use_buffers(port, mix->mix_id, NULL, 0)) < 0) { + pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res)); + return; + } + + pw_array_for_each(b, &mix->buffers) { + for (i = 0; i < b->n_mem; i++) { + struct buffer_mem *bm = &b->mem[i]; + struct mem *m; + + pw_log_debug("port %p: clear buffer %d mem %d", + port, b->id, bm->mem_id); + + m = find_mem(data, bm->mem_id); + if (m && --m->ref == 0) + clear_mem(data, m); + } + b->n_mem = 0; + free(b->buf); + } + mix->buffers.size = 0; +} + +static void +client_node_port_set_param(void *object, + uint32_t seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct pw_port *port; + int res; + + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) { + res = -EINVAL; + goto done; + } + + pw_log_debug("port %p: set param %d %p", port, id, param); + + if (id == SPA_PARAM_Format) { + struct mix *mix; + spa_list_for_each(mix, &data->mix[direction], link) { + if (mix->port->port_id == port_id) + clear_buffers(data, mix); + } + } + + res = pw_port_set_param(port, SPA_ID_INVALID, id, flags, param); + if (res < 0) + goto done; + + add_port_update(proxy, port, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + + done: + pw_client_node_proxy_done(data->node_proxy, seq, res); +} + +static void +client_node_port_use_buffers(void *object, + uint32_t seq, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id, + uint32_t n_buffers, struct pw_client_node_buffer *buffers) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct buffer *bid; + uint32_t i, j; + struct spa_buffer *b, **bufs; + struct mix *mix; + int res, prot; + + mix = ensure_mix(data, direction, port_id, mix_id); + if (mix == NULL) { + res = -EINVAL; + goto done; + } + + prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + + /* clear previous buffers */ + clear_buffers(data, mix); + + bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); + + for (i = 0; i < n_buffers; i++) { + struct buffer_mem bmem = { 0, }; + size_t size; + off_t offset; + struct mem *m; + + m = find_mem(data, buffers[i].mem_id); + if (m == NULL) { + pw_log_error("unknown memory id %u", buffers[i].mem_id); + res = -EINVAL; + goto cleanup; + } + + bid = pw_array_add(&mix->buffers, sizeof(struct buffer)); + bid->id = i; + + bmem.mem_id = m->id; + bmem.map.ptr = mem_map(data, &bmem.map, m->fd, prot, + buffers[i].offset, buffers[i].size); + if (bmem.map.ptr == NULL) { + res = -errno; + goto cleanup; + } + if (mlock(bmem.map.ptr, bmem.map.map.size) < 0) + pw_log_warn("Failed to mlock memory %u %u: %m", + bmem.map.map.offset, bmem.map.map.size); + + size = sizeof(struct spa_buffer); + size += sizeof(struct buffer_mem); + for (j = 0; j < buffers[i].buffer->n_metas; j++) + size += sizeof(struct spa_meta); + for (j = 0; j < buffers[i].buffer->n_datas; j++) { + size += sizeof(struct spa_data); + size += sizeof(struct buffer_mem); + } + + b = bid->buf = malloc(size); + if (b == NULL) { + res = -ENOMEM; + goto cleanup; + } + memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); + + b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct buffer_mem); + bid->n_mem = 0; + + bid->mem[bid->n_mem++] = bmem; + m->ref++; + + pw_log_debug("add buffer %d %d %u %u", m->id, + bid->id, bmem.map.map.offset, bmem.map.map.size); + + offset = 0; + for (j = 0; j < b->n_metas; j++) { + struct spa_meta *m = &b->metas[j]; + memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); + m->data = SPA_MEMBER(bmem.map.ptr, offset, void); + offset += SPA_ROUND_UP_N(m->size, 8); + } + + for (j = 0; j < b->n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); + d->chunk = + SPA_MEMBER(bmem.map.ptr, offset + sizeof(struct spa_chunk) * j, + struct spa_chunk); + + if (d->type == SPA_DATA_MemFd || d->type == SPA_DATA_DmaBuf) { + uint32_t mem_id = SPA_PTR_TO_UINT32(d->data); + struct mem *bm = find_mem(data, mem_id); + struct buffer_mem bm2; + + if (bm == NULL) { + pw_log_error("unknown buffer mem %u", mem_id); + res = -EINVAL; + goto cleanup; + } + + d->fd = bm->fd; + bm->ref++; + bm2.mem_id = bm->id; + bm2.map.ptr = NULL; + d->data = bm2.map.ptr; + + bid->mem[bid->n_mem++] = bm2; + + pw_log_debug(" data %d %u -> fd %d maxsize %d", + j, bm->id, bm->fd, d->maxsize); + } else if (d->type == SPA_DATA_MemPtr) { + int offs = SPA_PTR_TO_INT(d->data); + d->data = SPA_MEMBER(bmem.map.ptr, offs, void); + d->fd = -1; + pw_log_debug(" data %d %u -> mem %p maxsize %d", + j, bid->id, d->data, d->maxsize); + } else { + pw_log_warn("unknown buffer data type %d", d->type); + } + } + bufs[i] = b; + } + + res = pw_port_use_buffers(mix->port, mix->mix_id, bufs, n_buffers); + + done: + pw_client_node_proxy_done(data->node_proxy, seq, res); + return; + + cleanup: + clear_buffers(data, mix); + goto done; + +} + +static void +client_node_port_command(void *object, + uint32_t direction, + uint32_t port_id, + const struct spa_command *command) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct pw_port *port; + + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) + return; + + pw_port_send_command(port, true, command); +} + +static void +client_node_port_set_io(void *object, + uint32_t seq, + uint32_t direction, + uint32_t port_id, + uint32_t mix_id, + uint32_t id, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mix *mix; + struct mem *m; + void *ptr; + + mix = ensure_mix(data, direction, port_id, mix_id); + if (mix == NULL) + return; + + if (memid == SPA_ID_INVALID) { + ptr = NULL; + size = 0; + } + else { + m = find_mem(data, memid); + if (m == NULL) { + pw_log_warn("unknown memory id %u", memid); + return; + } + ptr = mem_map(data, &m->map, m->fd, + PROT_READ|PROT_WRITE, offset, size); + if (ptr == NULL) + return; + + m->ref++; + } + + pw_log_debug("port %p: set io %s %p", mix->port, + spa_debug_type_find_name(spa_type_io, id), ptr); + + if (id == SPA_IO_Buffers) { + if (ptr == NULL && mix->mix.io) { + deactivate_mix(data, mix); + m = find_mem_ptr(data, mix->mix.io); + if (m && --m->ref == 0) + clear_mem(data, m); + } + mix->mix.io = ptr; + if (ptr) + activate_mix(data, mix); + } else { + spa_node_port_set_io(mix->port->node->node, + direction, port_id, + id, + ptr, + size); + } +} + +static const struct pw_client_node_proxy_events client_node_events = { + PW_VERSION_CLIENT_NODE_PROXY_EVENTS, + .add_mem = client_node_add_mem, + .transport = client_node_transport, + .set_param = client_node_set_param, + .set_io = client_node_set_io, + .event = client_node_event, + .command = client_node_command, + .add_port = client_node_add_port, + .remove_port = client_node_remove_port, + .port_set_param = client_node_port_set_param, + .port_use_buffers = client_node_port_use_buffers, + .port_command = client_node_port_command, + .port_set_io = client_node_port_set_io, +}; + +static void do_node_init(struct pw_proxy *proxy) +{ + struct node_data *data = proxy->user_data; + struct pw_port *port; + + pw_log_debug("%p: init", data); + pw_client_node_proxy_update(data->node_proxy, + PW_CLIENT_NODE_UPDATE_MAX_INPUTS | + PW_CLIENT_NODE_UPDATE_MAX_OUTPUTS | + PW_CLIENT_NODE_UPDATE_PARAMS, + data->node->info.max_input_ports, + data->node->info.max_output_ports, + 0, NULL, NULL); + + spa_list_for_each(port, &data->node->input_ports, link) { + add_port_update(proxy, port, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + } + spa_list_for_each(port, &data->node->output_ports, link) { + add_port_update(proxy, port, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + } + pw_client_node_proxy_done(data->node_proxy, 0, 0); +} + +static void clear_mix(struct node_data *data, struct mix *mix) +{ + clear_buffers(data, mix); + pw_array_clear(&mix->buffers); + + deactivate_mix(data, mix); + + spa_list_remove(&mix->link); + spa_list_append(&data->free_mix, &mix->link); +} + +static void clean_node(struct node_data *d) +{ + struct mix *mix, *tmp; + + if (d->remote_id != SPA_ID_INVALID) { + spa_list_for_each_safe(mix, tmp, &d->mix[SPA_DIRECTION_INPUT], link) + clear_mix(d, mix); + spa_list_for_each_safe(mix, tmp, &d->mix[SPA_DIRECTION_OUTPUT], link) + clear_mix(d, mix); + } + clean_transport(d); +} + +static void node_destroy(void *data) +{ + struct node_data *d = data; + struct pw_remote *remote = d->remote; + struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy; + + pw_log_debug("%p: destroy", d); + + if (remote->core_proxy) + pw_core_proxy_destroy(remote->core_proxy, proxy); + + clean_node(d); + + spa_hook_remove(&d->proxy_listener); +} + +static void node_info_changed(void *data, const struct pw_node_info *info) +{ + struct node_data *d = data; + uint32_t change_mask = 0; + + pw_log_debug("info changed %p", d); + + if (info->change_mask & PW_NODE_CHANGE_MASK_PROPS) { + change_mask |= PW_CLIENT_NODE_UPDATE_PROPS; + } + pw_client_node_proxy_update(d->node_proxy, + change_mask, + 0, 0, + 0, NULL, + info->props); +} + +static void node_active_changed(void *data, bool active) +{ + struct node_data *d = data; + pw_log_debug("active %d", active); + pw_client_node_proxy_set_active(d->node_proxy, active); +} + + +static const struct pw_node_events node_events = { + PW_VERSION_NODE_EVENTS, + .destroy = node_destroy, + .info_changed = node_info_changed, + .active_changed = node_active_changed, +}; + +static void node_proxy_destroy(void *_data) +{ + struct node_data *data = _data; + clean_node(data); + spa_hook_remove(&data->node_listener); + if (data->do_free) + pw_node_destroy(data->node); +} + +static const struct pw_proxy_events proxy_events = { + PW_VERSION_PROXY_EVENTS, + .destroy = node_proxy_destroy, +}; + +static int remote_impl_signal(void *data) +{ + struct node_data *d = data; + uint64_t cmd = 1; + pw_log_trace("remote %p: send process", data); + write(d->rtwritefd, &cmd, 8); + return 0; +} + +static inline int remote_process(void *data, struct spa_graph_node *node) +{ + struct node_data *d = data; + spa_debug("remote %p: begin graph", data); + spa_graph_state_reset(&d->state); + return d->callbacks.process(d->callbacks_data, node); +} + +static const struct spa_graph_node_callbacks impl_root = { + SPA_VERSION_GRAPH_NODE_CALLBACKS, + .process = remote_process, +}; + +static struct pw_proxy *node_export(struct pw_remote *remote, void *object, bool do_free) +{ + struct pw_node *node = object; + struct pw_proxy *proxy; + struct node_data *data; + int i; + + proxy = pw_core_proxy_create_object(remote->core_proxy, + "client-node", + PW_TYPE_INTERFACE_ClientNode, + PW_VERSION_CLIENT_NODE, + &node->properties->dict, + sizeof(struct node_data)); + if (proxy == NULL) + return NULL; + + data = pw_proxy_get_user_data(proxy); + data->remote = remote; + data->node = node; + data->do_free = do_free; + data->core = pw_node_get_core(node); + data->node_proxy = (struct pw_client_node_proxy *)proxy; + data->remote_id = SPA_ID_INVALID; + + data->link.signal = remote_impl_signal; + data->link.signal_data = data; + data->callbacks = *node->rt.root.callbacks; + spa_graph_node_set_callbacks(&node->rt.root, &impl_root, data); + spa_graph_link_add(&node->rt.root, &data->state, &data->link); + spa_graph_node_add(node->rt.driver, &node->rt.root); + + node->exported = true; + + spa_list_init(&data->free_mix); + spa_list_init(&data->mix[0]); + spa_list_init(&data->mix[1]); + for (i = 0; i < MAX_MIX; i++) + spa_list_append(&data->free_mix, &data->mix_pool[i].link); + + pw_array_init(&data->mems, 64); + pw_array_ensure_size(&data->mems, sizeof(struct mem) * 64); + + pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data); + pw_node_add_listener(node, &data->node_listener, &node_events, data); + + pw_client_node_proxy_add_listener(data->node_proxy, + &data->node_proxy_listener, + &client_node_events, + proxy); + do_node_init(proxy); + + return proxy; +} + +struct pw_proxy *pw_remote_node_export(struct pw_remote *remote, + uint32_t type, struct pw_properties *props, void *object) +{ + return node_export(remote, object, false); +} + +struct pw_proxy *pw_remote_spa_node_export(struct pw_remote *remote, + uint32_t type, struct pw_properties *props, void *object) +{ + struct pw_node *node; + + node = pw_node_new(pw_remote_get_core(remote), NULL, props, 0); + if (node == NULL) + return NULL; + + pw_node_set_implementation(node, (struct spa_node*)object); + pw_node_register(node, NULL, NULL, NULL); + pw_node_set_active(node, true); + + return node_export(remote, node, true); +} diff --git a/src/pipewire/core.c b/src/pipewire/core.c index 906c7ed49..c89e85980 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -445,6 +445,7 @@ struct pw_core *pw_core_new(struct pw_loop *main_loop, spa_list_init(&this->link_list); spa_list_init(&this->control_list[0]); spa_list_init(&this->control_list[1]); + spa_list_init(&this->export_list); spa_hook_list_init(&this->listener_list); if ((name = pw_properties_get(properties, PW_CORE_PROP_NAME)) == NULL) { diff --git a/src/pipewire/interfaces.h b/src/pipewire/interfaces.h index ab28b8368..3d13773df 100644 --- a/src/pipewire/interfaces.h +++ b/src/pipewire/interfaces.h @@ -116,8 +116,6 @@ struct pw_core_proxy_methods { void (*get_registry) (void *object, uint32_t version, uint32_t new_id); /** * Create a new object on the PipeWire server from a factory. - * Use a \a factory_name of "client-node" to create a - * \ref pw_client_node. * * \param factory_name the factory name to use * \param type the interface to bind to diff --git a/src/pipewire/module.h b/src/pipewire/module.h index dea9b2361..434c3bbb3 100644 --- a/src/pipewire/module.h +++ b/src/pipewire/module.h @@ -35,6 +35,7 @@ extern "C" { #include #define PIPEWIRE_SYMBOL_MODULE_INIT "pipewire__module_init" +#define PIPEWIRE_MODULE_PREFIX "libpipewire-" /** \class pw_module * diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 183a41e17..e79bd4c75 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -655,6 +655,9 @@ struct pw_node *pw_node_new(struct pw_core *core, if (impl == NULL) return NULL; + if (name == NULL) + name = "node"; + this = &impl->this; this->core = core; pw_log_debug("node %p: new \"%s\"", this, name); @@ -687,7 +690,6 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_list_init(&this->output_ports); pw_map_init(&this->output_port_map, 64, 64); - spa_graph_init(&impl->driver_graph, &impl->driver_state); this->rt.driver = &impl->driver_graph; diff --git a/src/pipewire/port.c b/src/pipewire/port.c index e45f19316..4422a8109 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -150,6 +150,19 @@ static int schedule_mix_input(struct spa_node *data) return SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER; } +static int schedule_mix_port_set_param(struct spa_node *data, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + switch (id) { + case SPA_PARAM_Format: + pw_log_debug("port %d:%d: set format", direction, port_id); + break; + } + return 0; +} + static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) { struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node); @@ -170,6 +183,7 @@ static const struct spa_node schedule_mix_node = { SPA_VERSION_NODE, NULL, .process = schedule_mix_input, + .port_set_param = schedule_mix_port_set_param, .port_reuse_buffer = schedule_mix_reuse_buffer, }; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index c16c04e3a..20ee7b700 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -181,6 +181,7 @@ struct pw_core { struct spa_list factory_list; /**< list of factories */ struct spa_list link_list; /**< list of links */ struct spa_list control_list[2]; /**< list of controls, indexed by direction */ + struct spa_list export_list; /**< list of export types */ struct spa_hook_list listener_list; @@ -537,7 +538,6 @@ struct pw_proxy { struct spa_list link; /**< link in the remote */ uint32_t id; /**< client side id */ - uint32_t remote_id; /**< remote id */ struct spa_hook_list listener_list; struct spa_hook_list proxy_listener_list; @@ -550,7 +550,7 @@ struct pw_proxy { #define pw_remote_events_emit(r,m,v,...) spa_hook_list_call(&r->listener_list, struct pw_remote_events, m, v, ##__VA_ARGS__) #define pw_remote_events_destroy(r) pw_remote_events_emit(r, destroy, 0) #define pw_remote_events_state_changed(r,o,s,e) pw_remote_events_emit(r, state_changed, 0, o, s, e) -#define pw_remote_events_exported(r,i) pw_remote_events_emit(r, exported, 0, i) +#define pw_remote_events_exported(r,i,g) pw_remote_events_emit(r, exported, 0, i,g) struct pw_remote { struct pw_core *core; /**< core */ @@ -676,6 +676,8 @@ pw_core_find_port(struct pw_core *core, struct spa_pod **format_filters, char **error); +const struct pw_export_type *pw_core_find_export_type(struct pw_core *core, uint32_t type); + /** Create a new port \memberof pw_port * \return a newly allocated port */ struct pw_port * diff --git a/src/pipewire/proxy.c b/src/pipewire/proxy.c index 1ff7199dc..378417f2b 100644 --- a/src/pipewire/proxy.c +++ b/src/pipewire/proxy.c @@ -70,7 +70,6 @@ struct pw_proxy *pw_proxy_new(struct pw_proxy *factory, spa_hook_list_init(&this->proxy_listener_list); this->id = pw_map_insert_new(&remote->objects, this); - this->remote_id = SPA_ID_INVALID; if (user_data_size > 0) this->user_data = SPA_MEMBER(impl, sizeof(struct proxy), void); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 5ed844984..3207c0655 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -36,9 +36,6 @@ #include "pipewire/private.h" #include "extensions/protocol-native.h" -#include "extensions/client-node.h" - -#define MAX_MIX 4096 /** \cond */ @@ -47,70 +44,6 @@ struct remote { struct spa_hook core_listener; }; -struct mapping { - void *ptr; - struct pw_map_range map; - int prot; -}; - -struct mem { - uint32_t id; - int fd; - uint32_t flags; - uint32_t ref; - struct mapping map; -}; - -struct buffer_mem { - uint32_t mem_id; - struct mapping map; -}; - -struct buffer { - uint32_t id; - struct spa_buffer *buf; - struct buffer_mem *mem; - uint32_t n_mem; -}; - -struct mix { - struct spa_list link; - struct pw_port *port; - uint32_t mix_id; - struct pw_port_mix mix; - struct pw_array buffers; - bool active; -}; - -struct node_data { - struct pw_remote *remote; - struct pw_core *core; - - int rtwritefd; - struct spa_source *rtsocket_source; - - struct mix mix_pool[MAX_MIX]; - struct spa_list mix[2]; - struct spa_list free_mix; - - struct pw_array mems; - - struct pw_node *node; - struct spa_hook node_listener; - - struct pw_client_node_proxy *node_proxy; - struct spa_hook node_proxy_listener; - struct spa_hook proxy_listener; - - struct spa_io_position *position; - - struct spa_graph_node_callbacks callbacks; - void *callbacks_data; - - struct spa_graph_state state; - struct spa_graph_link link; -}; - /** \endcond */ const char *pw_remote_state_as_string(enum pw_remote_state state) @@ -456,965 +389,52 @@ int pw_remote_disconnect(struct pw_remote *remote) return 0; } -static int -do_remove_source(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct node_data *d = user_data; - - if (d->rtsocket_source) { - pw_loop_destroy_source(d->core->data_loop, d->rtsocket_source); - d->rtsocket_source = NULL; - } - return 0; -} - - -static void unhandle_socket(struct node_data *data) -{ - pw_loop_invoke(data->core->data_loop, - do_remove_source, 1, NULL, 0, true, data); -} - -static void -on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) -{ - struct pw_proxy *proxy = user_data; - struct node_data *data = proxy->user_data; - struct spa_graph_node *node = &data->node->rt.root; - - if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { - pw_log_warn("got error"); - unhandle_socket(data); - return; - } - - if (mask & SPA_IO_IN) { - uint64_t cmd; - - if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t) || cmd != 1) - pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd); - - pw_log_trace("remote %p: process %p", data->remote, proxy); - spa_graph_run(node->graph); - } -} - -static struct mem *find_mem(struct node_data *data, uint32_t id) -{ - struct mem *m; - pw_array_for_each(m, &data->mems) { - if (m->id == id) - return m; - } - return NULL; -} - -static struct mem *find_mem_ptr(struct node_data *data, void *ptr) -{ - struct mem *m; - pw_array_for_each(m, &data->mems) { - if (m->map.ptr == ptr) - return m; - } - return NULL; -} - -static void *mem_map(struct node_data *data, struct mapping *map, - int fd, int prot, uint32_t offset, uint32_t size) -{ - struct mapping m; - void *ptr; - - pw_map_range_init(&m.map, offset, size, data->core->sc_pagesize); - - if (map->ptr == NULL || map->map.offset != m.map.offset || map->map.size != m.map.size) { - map->ptr = mmap(map->ptr, m.map.size, prot, MAP_SHARED, fd, m.map.offset); - if (map->ptr == MAP_FAILED) { - pw_log_error("remote %p: Failed to mmap memory %d: %m", data, size); - return NULL; - } - map->map = m.map; - } - ptr = SPA_MEMBER(map->ptr, map->map.start, void); - pw_log_debug("remote %p: fd %d mapped %d %d %p", data, fd, offset, size, ptr); - - return ptr; -} - -static void *mem_unmap(struct node_data *data, void *ptr, struct pw_map_range *range) -{ - if (ptr != NULL) { - if (munmap(SPA_MEMBER(ptr, -range->start, void), range->size) < 0) - pw_log_warn("failed to unmap: %m"); - } - return NULL; -} - -static void clear_mem(struct node_data *data, struct mem *m) -{ - if (m->fd != -1) { - bool has_ref = false; - int fd; - struct mem *m2; - - pw_log_debug("remote %p: clear mem %d", data, m->id); - - fd = m->fd; - m->fd = -1; - m->id = SPA_ID_INVALID; - - pw_array_for_each(m2, &data->mems) { - if (m2->fd == fd) { - has_ref = true; - break; - } - } - if (!has_ref) { - m->map.ptr = mem_unmap(data, m->map.ptr, &m->map.map); - close(fd); - } - } -} - -static void clean_transport(struct node_data *data) -{ - struct mem *m; - - if (data->rtsocket_source == NULL) - return; - - unhandle_socket(data); - - pw_array_for_each(m, &data->mems) - clear_mem(data, m); - pw_array_clear(&data->mems); - - close(data->rtwritefd); -} - -static void mix_init(struct mix *mix, struct pw_port *port, uint32_t mix_id) -{ - mix->port = port; - mix->mix_id = mix_id; - pw_port_init_mix(port, &mix->mix); - mix->active = false; - pw_array_init(&mix->buffers, 32); - pw_array_ensure_size(&mix->buffers, sizeof(struct buffer) * 64); -} - -static int -do_deactivate_mix(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct mix *mix = user_data; - spa_graph_port_remove(&mix->mix.port); - return 0; -} - -static int -deactivate_mix(struct node_data *data, struct mix *mix) -{ - if (mix->active) { - pw_log_debug("node %p: mix %p deactivate", data, mix); - pw_loop_invoke(data->core->data_loop, - do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); - mix->active = false; - } - return 0; -} - -static int -do_activate_mix(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct mix *mix = user_data; - spa_graph_port_add(&mix->port->rt.mix_node, &mix->mix.port); - return 0; -} - -static int -activate_mix(struct node_data *data, struct mix *mix) -{ - if (!mix->active) { - pw_log_debug("node %p: mix %p activate", data, mix); - pw_loop_invoke(data->core->data_loop, - do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); - mix->active = true; - } - return 0; -} - -static struct mix *find_mix(struct node_data *data, - enum spa_direction direction, uint32_t port_id, uint32_t mix_id) -{ - struct mix *mix; - - spa_list_for_each(mix, &data->mix[direction], link) { - if (mix->port->port_id == port_id && - mix->mix_id == mix_id) - return mix; - } - return NULL; -} - -static struct mix *ensure_mix(struct node_data *data, - enum spa_direction direction, uint32_t port_id, uint32_t mix_id) -{ - struct mix *mix; - struct pw_port *port; - - if ((mix = find_mix(data, direction, port_id, mix_id))) - return mix; - - if (spa_list_is_empty(&data->free_mix)) - return NULL; - - port = pw_node_find_port(data->node, direction, port_id); - if (port == NULL) - return NULL; - - mix = spa_list_first(&data->free_mix, struct mix, link); - spa_list_remove(&mix->link); - - mix_init(mix, port, mix_id); - spa_list_append(&data->mix[direction], &mix->link); - - return mix; -} - -static void client_node_add_mem(void *object, - uint32_t mem_id, - uint32_t type, int memfd, uint32_t flags) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct mem *m; - - m = find_mem(data, mem_id); - if (m) { - pw_log_warn("duplicate mem %u, fd %d, flags %d", - mem_id, memfd, flags); - return; - } - - m = pw_array_add(&data->mems, sizeof(struct mem)); - pw_log_debug("add mem %u, fd %d, flags %d", mem_id, memfd, flags); - - m->id = mem_id; - m->fd = memfd; - m->flags = flags; - m->ref = 0; - m->map.map = PW_MAP_RANGE_INIT; - m->map.ptr = NULL; -} - -static void client_node_transport(void *object, uint32_t node_id, - int readfd, int writefd) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct pw_remote *remote = proxy->remote; - - clean_transport(data); - - proxy->remote_id = node_id; - - pw_log_debug("remote-node %p: create transport with fds %d %d for node %u", - proxy, readfd, writefd, node_id); - - data->rtwritefd = writefd; - data->rtsocket_source = pw_loop_add_io(remote->core->data_loop, - readfd, - SPA_IO_ERR | SPA_IO_HUP, - true, on_rtsocket_condition, proxy); - if (data->node->active) - pw_client_node_proxy_set_active(data->node_proxy, true); - - pw_remote_events_exported(remote, proxy->id); -} - -static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_t change_mask) -{ - struct node_data *data = proxy->user_data; - const struct spa_port_info *port_info = NULL; - struct spa_port_info pi; - uint32_t n_params = 0; - struct spa_pod **params = NULL; - - if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { - uint32_t idx1, idx2, id; - uint8_t buf[2048]; - struct spa_pod_builder b = { 0 }; - - for (idx1 = 0;;) { - struct spa_pod *param; - - spa_pod_builder_init(&b, buf, sizeof(buf)); - if (spa_node_port_enum_params(port->node->node, - port->direction, port->port_id, - SPA_PARAM_List, &idx1, - NULL, ¶m, &b) <= 0) - break; - - spa_pod_parse_object(param, - SPA_TYPE_OBJECT_ParamList, NULL, - SPA_PARAM_LIST_id, SPA_POD_Id(&id)); - - params = realloc(params, sizeof(struct spa_pod *) * (n_params + 1)); - params[n_params++] = pw_spa_pod_copy(param); - - for (idx2 = 0;;) { - spa_pod_builder_init(&b, buf, sizeof(buf)); - if (spa_node_port_enum_params(port->node->node, - port->direction, port->port_id, - id, &idx2, - NULL, ¶m, &b) <= 0) - break; - - params = realloc(params, sizeof(struct spa_pod *) * (n_params + 1)); - params[n_params++] = pw_spa_pod_copy(param); - } - } - } - if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_INFO) { - spa_node_port_get_info(port->node->node, port->direction, port->port_id, &port_info); - pi = * port_info; - pi.flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; - } - - pw_client_node_proxy_port_update(data->node_proxy, - port->direction, - port->port_id, - change_mask, - n_params, - (const struct spa_pod **)params, - &pi); - if (params) { - while (n_params > 0) - free(params[--n_params]); - free(params); - } -} - -static void -client_node_set_param(void *object, uint32_t seq, uint32_t id, uint32_t flags, - const struct spa_pod *param) -{ - pw_log_warn("set param not implemented"); -} - - -static void -client_node_set_io(void *object, - uint32_t id, - uint32_t memid, - uint32_t offset, - uint32_t size) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct mem *m; - void *ptr; - - if (memid == SPA_ID_INVALID) { - ptr = NULL; - size = 0; - } - else { - m = find_mem(data, memid); - if (m == NULL) { - pw_log_warn("unknown memory id %u", memid); - return; - } - ptr = mem_map(data, &m->map, m->fd, - PROT_READ|PROT_WRITE, offset, size); - if (ptr == NULL) - return; - m->ref++; - } - - pw_log_debug("node %p: set io %s %p", proxy, - spa_debug_type_find_name(spa_type_io, id), ptr); - - if (id == SPA_IO_Position) { - if (ptr == NULL && data->position) { - m = find_mem_ptr(data, data->position); - if (m && --m->ref == 0) - clear_mem(data, m); - } - data->position = ptr; - } - spa_node_set_io(data->node->node, id, ptr, size); -} - -static void client_node_event(void *object, const struct spa_event *event) -{ - pw_log_warn("unhandled node event %d", SPA_EVENT_TYPE(event)); -} - -static int -do_pause_source(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct node_data *d = user_data; - pw_loop_update_io(d->core->data_loop, - d->rtsocket_source, - SPA_IO_ERR | SPA_IO_HUP); - return 0; -} - -static void client_node_command(void *object, uint32_t seq, const struct spa_command *command) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct pw_remote *remote = proxy->remote; - int res; - - switch (SPA_NODE_COMMAND_ID(command)) { - case SPA_NODE_COMMAND_Pause: - pw_log_debug("node %p: pause %d", proxy, seq); - - if (data->rtsocket_source) { - pw_loop_invoke(data->core->data_loop, - do_pause_source, 1, NULL, 0, true, data); - } - if ((res = spa_node_send_command(data->node->node, command)) < 0) - pw_log_warn("node %p: pause failed", proxy); - - pw_client_node_proxy_done(data->node_proxy, seq, res); - break; - case SPA_NODE_COMMAND_Start: - pw_log_debug("node %p: start %d", proxy, seq); - - if ((res = spa_node_send_command(data->node->node, command)) < 0) { - pw_log_warn("node %p: start failed", proxy); - } - else if (data->rtsocket_source) { - pw_loop_update_io(remote->core->data_loop, - data->rtsocket_source, - SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); - } - - pw_client_node_proxy_done(data->node_proxy, seq, res); - break; - default: - pw_log_warn("unhandled node command %d", SPA_NODE_COMMAND_ID(command)); - pw_client_node_proxy_done(data->node_proxy, seq, -ENOTSUP); - } -} - -static void -client_node_add_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id) -{ - pw_log_warn("add port not supported"); -} - -static void -client_node_remove_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id) -{ - pw_log_warn("remove port not supported"); -} - -static void clear_buffers(struct node_data *data, struct mix *mix) -{ - struct pw_port *port = mix->port; - struct buffer *b; - uint32_t i; - int res; - - pw_log_debug("port %p: clear buffers %d", port, mix->mix_id); - if ((res = pw_port_use_buffers(port, mix->mix_id, NULL, 0)) < 0) { - pw_log_error("port %p: error clear buffers %s", port, spa_strerror(res)); - return; - } - - pw_array_for_each(b, &mix->buffers) { - for (i = 0; i < b->n_mem; i++) { - struct buffer_mem *bm = &b->mem[i]; - struct mem *m; - - pw_log_debug("port %p: clear buffer %d mem %d", - port, b->id, bm->mem_id); - - m = find_mem(data, bm->mem_id); - if (m && --m->ref == 0) - clear_mem(data, m); - } - b->n_mem = 0; - free(b->buf); - } - mix->buffers.size = 0; -} - -static void -client_node_port_set_param(void *object, - uint32_t seq, - enum spa_direction direction, uint32_t port_id, - uint32_t id, uint32_t flags, - const struct spa_pod *param) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct pw_port *port; - int res; - - port = pw_node_find_port(data->node, direction, port_id); - if (port == NULL) { - res = -EINVAL; - goto done; - } - - pw_log_debug("port %p: set param %d %p", port, id, param); - - if (id == SPA_PARAM_Format) { - struct mix *mix; - spa_list_for_each(mix, &data->mix[direction], link) { - if (mix->port->port_id == port_id) - clear_buffers(data, mix); - } - } - - res = pw_port_set_param(port, SPA_ID_INVALID, id, flags, param); - if (res < 0) - goto done; - - add_port_update(proxy, port, - PW_CLIENT_NODE_PORT_UPDATE_PARAMS | - PW_CLIENT_NODE_PORT_UPDATE_INFO); - - done: - pw_client_node_proxy_done(data->node_proxy, seq, res); -} - -static void -client_node_port_use_buffers(void *object, - uint32_t seq, - enum spa_direction direction, uint32_t port_id, uint32_t mix_id, - uint32_t n_buffers, struct pw_client_node_buffer *buffers) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct buffer *bid; - uint32_t i, j; - struct spa_buffer *b, **bufs; - struct mix *mix; - int res, prot; - - mix = ensure_mix(data, direction, port_id, mix_id); - if (mix == NULL) { - res = -EINVAL; - goto done; - } - - prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); - - /* clear previous buffers */ - clear_buffers(data, mix); - - bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); - - for (i = 0; i < n_buffers; i++) { - struct buffer_mem bmem = { 0, }; - size_t size; - off_t offset; - struct mem *m; - - m = find_mem(data, buffers[i].mem_id); - if (m == NULL) { - pw_log_error("unknown memory id %u", buffers[i].mem_id); - res = -EINVAL; - goto cleanup; - } - - bid = pw_array_add(&mix->buffers, sizeof(struct buffer)); - bid->id = i; - - bmem.mem_id = m->id; - bmem.map.ptr = mem_map(data, &bmem.map, m->fd, prot, - buffers[i].offset, buffers[i].size); - if (bmem.map.ptr == NULL) { - res = -errno; - goto cleanup; - } - if (mlock(bmem.map.ptr, bmem.map.map.size) < 0) - pw_log_warn("Failed to mlock memory %u %u: %m", - bmem.map.map.offset, bmem.map.map.size); - - size = sizeof(struct spa_buffer); - size += sizeof(struct buffer_mem); - for (j = 0; j < buffers[i].buffer->n_metas; j++) - size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) { - size += sizeof(struct spa_data); - size += sizeof(struct buffer_mem); - } - - b = bid->buf = malloc(size); - if (b == NULL) { - res = -ENOMEM; - goto cleanup; - } - memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); - - b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, - struct spa_data); - bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, - struct buffer_mem); - bid->n_mem = 0; - - bid->mem[bid->n_mem++] = bmem; - m->ref++; - - pw_log_debug("add buffer %d %d %u %u", m->id, - bid->id, bmem.map.map.offset, bmem.map.map.size); - - offset = 0; - for (j = 0; j < b->n_metas; j++) { - struct spa_meta *m = &b->metas[j]; - memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); - m->data = SPA_MEMBER(bmem.map.ptr, offset, void); - offset += SPA_ROUND_UP_N(m->size, 8); - } - - for (j = 0; j < b->n_datas; j++) { - struct spa_data *d = &b->datas[j]; - - memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); - d->chunk = - SPA_MEMBER(bmem.map.ptr, offset + sizeof(struct spa_chunk) * j, - struct spa_chunk); - - if (d->type == SPA_DATA_MemFd || d->type == SPA_DATA_DmaBuf) { - uint32_t mem_id = SPA_PTR_TO_UINT32(d->data); - struct mem *bm = find_mem(data, mem_id); - struct buffer_mem bm2; - - if (bm == NULL) { - pw_log_error("unknown buffer mem %u", mem_id); - res = -EINVAL; - goto cleanup; - } - - d->fd = bm->fd; - bm->ref++; - bm2.mem_id = bm->id; - bm2.map.ptr = NULL; - d->data = bm2.map.ptr; - - bid->mem[bid->n_mem++] = bm2; - - pw_log_debug(" data %d %u -> fd %d maxsize %d", - j, bm->id, bm->fd, d->maxsize); - } else if (d->type == SPA_DATA_MemPtr) { - int offs = SPA_PTR_TO_INT(d->data); - d->data = SPA_MEMBER(bmem.map.ptr, offs, void); - d->fd = -1; - pw_log_debug(" data %d %u -> mem %p maxsize %d", - j, bid->id, d->data, d->maxsize); - } else { - pw_log_warn("unknown buffer data type %d", d->type); - } - } - bufs[i] = b; - } - - res = pw_port_use_buffers(mix->port, mix->mix_id, bufs, n_buffers); - - done: - pw_client_node_proxy_done(data->node_proxy, seq, res); - return; - - cleanup: - clear_buffers(data, mix); - goto done; - -} - -static void -client_node_port_command(void *object, - uint32_t direction, - uint32_t port_id, - const struct spa_command *command) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct pw_port *port; - - port = pw_node_find_port(data->node, direction, port_id); - if (port == NULL) - return; - - pw_port_send_command(port, true, command); -} - -static void -client_node_port_set_io(void *object, - uint32_t seq, - uint32_t direction, - uint32_t port_id, - uint32_t mix_id, - uint32_t id, - uint32_t memid, - uint32_t offset, - uint32_t size) -{ - struct pw_proxy *proxy = object; - struct node_data *data = proxy->user_data; - struct mix *mix; - struct mem *m; - void *ptr; - - mix = ensure_mix(data, direction, port_id, mix_id); - if (mix == NULL) - return; - - if (memid == SPA_ID_INVALID) { - ptr = NULL; - size = 0; - } - else { - m = find_mem(data, memid); - if (m == NULL) { - pw_log_warn("unknown memory id %u", memid); - return; - } - ptr = mem_map(data, &m->map, m->fd, - PROT_READ|PROT_WRITE, offset, size); - if (ptr == NULL) - return; - - m->ref++; - } - - pw_log_debug("port %p: set io %s %p", mix->port, - spa_debug_type_find_name(spa_type_io, id), ptr); - - if (id == SPA_IO_Buffers) { - if (ptr == NULL && mix->mix.io) { - deactivate_mix(data, mix); - m = find_mem_ptr(data, mix->mix.io); - if (m && --m->ref == 0) - clear_mem(data, m); - } - mix->mix.io = ptr; - if (ptr) - activate_mix(data, mix); - } else { - spa_node_port_set_io(mix->port->node->node, - direction, port_id, - id, - ptr, - size); - } -} - -static const struct pw_client_node_proxy_events client_node_events = { - PW_VERSION_CLIENT_NODE_PROXY_EVENTS, - .add_mem = client_node_add_mem, - .transport = client_node_transport, - .set_param = client_node_set_param, - .set_io = client_node_set_io, - .event = client_node_event, - .command = client_node_command, - .add_port = client_node_add_port, - .remove_port = client_node_remove_port, - .port_set_param = client_node_port_set_param, - .port_use_buffers = client_node_port_use_buffers, - .port_command = client_node_port_command, - .port_set_io = client_node_port_set_io, -}; - -static void do_node_init(struct pw_proxy *proxy) -{ - struct node_data *data = proxy->user_data; - struct pw_port *port; - - pw_log_debug("%p: init", data); - pw_client_node_proxy_update(data->node_proxy, - PW_CLIENT_NODE_UPDATE_MAX_INPUTS | - PW_CLIENT_NODE_UPDATE_MAX_OUTPUTS | - PW_CLIENT_NODE_UPDATE_PARAMS, - data->node->info.max_input_ports, - data->node->info.max_output_ports, - 0, NULL, NULL); - - spa_list_for_each(port, &data->node->input_ports, link) { - add_port_update(proxy, port, - PW_CLIENT_NODE_PORT_UPDATE_PARAMS | - PW_CLIENT_NODE_PORT_UPDATE_INFO); - } - spa_list_for_each(port, &data->node->output_ports, link) { - add_port_update(proxy, port, - PW_CLIENT_NODE_PORT_UPDATE_PARAMS | - PW_CLIENT_NODE_PORT_UPDATE_INFO); - } - pw_client_node_proxy_done(data->node_proxy, 0, 0); -} - -static void clear_mix(struct node_data *data, struct mix *mix) -{ - clear_buffers(data, mix); - pw_array_clear(&mix->buffers); - - deactivate_mix(data, mix); - - spa_list_remove(&mix->link); - spa_list_append(&data->free_mix, &mix->link); -} - -static void clean_node(struct node_data *d) -{ - struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy; - struct mix *mix, *tmp; - - if (proxy->remote_id != SPA_ID_INVALID) { - spa_list_for_each_safe(mix, tmp, &d->mix[SPA_DIRECTION_INPUT], link) - clear_mix(d, mix); - spa_list_for_each_safe(mix, tmp, &d->mix[SPA_DIRECTION_OUTPUT], link) - clear_mix(d, mix); - } - clean_transport(d); -} - -static void node_destroy(void *data) -{ - struct node_data *d = data; - struct pw_remote *remote = d->remote; - struct pw_proxy *proxy = (struct pw_proxy*) d->node_proxy; - - pw_log_debug("%p: destroy", d); - - if (remote->core_proxy) - pw_core_proxy_destroy(remote->core_proxy, proxy); - - clean_node(d); - - spa_hook_remove(&d->proxy_listener); -} - -static void node_info_changed(void *data, const struct pw_node_info *info) -{ - struct node_data *d = data; - uint32_t change_mask = 0; - - pw_log_debug("info changed %p", d); - - if (info->change_mask & PW_NODE_CHANGE_MASK_PROPS) { - change_mask |= PW_CLIENT_NODE_UPDATE_PROPS; - } - pw_client_node_proxy_update(d->node_proxy, - change_mask, - 0, 0, - 0, NULL, - info->props); -} - -static void node_active_changed(void *data, bool active) -{ - struct node_data *d = data; - pw_log_debug("active %d", active); - pw_client_node_proxy_set_active(d->node_proxy, active); -} - - -static const struct pw_node_events node_events = { - PW_VERSION_NODE_EVENTS, - .destroy = node_destroy, - .info_changed = node_info_changed, - .active_changed = node_active_changed, -}; - -static void node_proxy_destroy(void *_data) -{ - struct node_data *data = _data; - clean_node(data); - spa_hook_remove(&data->node_listener); -} - -static const struct pw_proxy_events proxy_events = { - PW_VERSION_PROXY_EVENTS, - .destroy = node_proxy_destroy, -}; - -static int remote_impl_signal(void *data) -{ - struct node_data *d = data; - uint64_t cmd = 1; - pw_log_trace("remote %p: send process", data); - write(d->rtwritefd, &cmd, 8); - return 0; -} - -static inline int remote_process(void *data, struct spa_graph_node *node) -{ - struct node_data *d = data; - spa_debug("remote %p: begin graph", data); - spa_graph_state_reset(&d->state); - return d->callbacks.process(d->callbacks_data, node); -} - -static const struct spa_graph_node_callbacks impl_root = { - SPA_VERSION_GRAPH_NODE_CALLBACKS, - .process = remote_process, -}; - struct pw_proxy *pw_remote_export(struct pw_remote *remote, - struct pw_node *node) + uint32_t type, struct pw_properties *props, void *object) { struct pw_proxy *proxy; - struct node_data *data; - int i; + const struct pw_export_type *t; - if (remote->core_proxy == NULL) { - pw_log_error("node core proxy"); - return NULL; - } + if (remote->core_proxy == NULL) + goto no_core_proxy; - proxy = pw_core_proxy_create_object(remote->core_proxy, - "client-node", - PW_TYPE_INTERFACE_ClientNode, - PW_VERSION_CLIENT_NODE, - &node->properties->dict, - sizeof(struct node_data)); - if (proxy == NULL) { - pw_log_error("failed to create proxy"); - return NULL; - } + t = pw_core_find_export_type(remote->core, type); + if (t == NULL) + goto no_export_type; - data = pw_proxy_get_user_data(proxy); - data->remote = remote; - data->node = node; - data->core = pw_node_get_core(node); - data->node_proxy = (struct pw_client_node_proxy *)proxy; - - data->link.signal = remote_impl_signal; - data->link.signal_data = data; - data->callbacks = *node->rt.root.callbacks; - spa_graph_node_set_callbacks(&node->rt.root, &impl_root, data); - spa_graph_link_add(&node->rt.root, &data->state, &data->link); - spa_graph_node_add(node->rt.driver, &node->rt.root); - - node->exported = true; - - spa_list_init(&data->free_mix); - spa_list_init(&data->mix[0]); - spa_list_init(&data->mix[1]); - for (i = 0; i < MAX_MIX; i++) - spa_list_append(&data->free_mix, &data->mix_pool[i].link); - - pw_array_init(&data->mems, 64); - pw_array_ensure_size(&data->mems, sizeof(struct mem) * 64); - - pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data); - pw_node_add_listener(node, &data->node_listener, &node_events, data); - - pw_client_node_proxy_add_listener(data->node_proxy, - &data->node_proxy_listener, - &client_node_events, - proxy); - do_node_init(proxy); + proxy = t->func(remote, type, props, object); + if (proxy == NULL) + goto proxy_failed; return proxy; + + no_core_proxy: + errno = ENETDOWN; + pw_log_error("no core proxy: %m"); + return NULL; + no_export_type: + pw_log_error("can't export type %d: %m", type); + return NULL; + proxy_failed: + pw_log_error("failed to create proxy: %m"); + return NULL; +} + +int pw_core_register_export_type(struct pw_core *core, struct pw_export_type *type) +{ + pw_log_debug("Add export type %d/%s to core", type->type, + spa_debug_type_find_name(pw_type_info(), type->type)); + spa_list_append(&core->export_list, &type->link); + return 0; +} + +const struct pw_export_type *pw_core_find_export_type(struct pw_core *core, uint32_t type) +{ + const struct pw_export_type *t; + spa_list_for_each(t, &core->export_list, link) { + if (t->type == type) + return t; + } + errno = EPROTO; + return NULL; } diff --git a/src/pipewire/remote.h b/src/pipewire/remote.h index 50e8a7e30..b5456d839 100644 --- a/src/pipewire/remote.h +++ b/src/pipewire/remote.h @@ -138,7 +138,7 @@ struct pw_remote_events { void (*state_changed) (void *data, enum pw_remote_state old, enum pw_remote_state state, const char *error); /** emited when a node was exported */ - void (*exported) (void *data, uint32_t id); + void (*exported) (void *data, uint32_t proxy_id, uint32_t global_id); }; /** Specify the name of the protocol to use, default is using the native protocol */ @@ -203,7 +203,22 @@ struct pw_proxy *pw_remote_find_proxy(struct pw_remote *remote, uint32_t id); int pw_remote_disconnect(struct pw_remote *remote); /** run a local node in a remote graph */ -struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct pw_node *node); +struct pw_proxy *pw_remote_export(struct pw_remote *remote, /**< the remote */ + uint32_t type, /**< the type of object */ + struct pw_properties *properties, /**< extra properties */ + void *object /**< object to export */); + +/** data for registering export functions */ +struct pw_export_type { + struct spa_list link; + uint32_t type; + struct pw_proxy * (*func) (struct pw_remote *remote, + uint32_t type, struct pw_properties *properties, void *object); +}; + +/** register a type that can be exported on a remote. This is usually used by + * extension modules */ +int pw_core_register_export_type(struct pw_core *core, struct pw_export_type *type); #ifdef __cplusplus } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 29bd1d9a3..e4cd47325 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -41,7 +41,6 @@ #include "pipewire/pipewire.h" #include "pipewire/stream.h" #include "pipewire/private.h" -#include "extensions/client-node.h" #define MAX_BUFFERS 64 #define MIN_QUEUED 1 @@ -195,8 +194,10 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu uint32_t index, id; struct buffer *buffer; - if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < MIN_QUEUED) + if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < MIN_QUEUED) { + errno = EPIPE; return NULL; + } id = queue->ids[index & MASK_BUFFERS]; spa_ringbuffer_read_update(&queue->ring, index + 1); @@ -236,6 +237,8 @@ static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id) struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); if (id < impl->n_buffers) return &impl->buffers[id]; + + errno = EINVAL; return NULL; } @@ -806,6 +809,7 @@ static void proxy_destroy(void *_data) struct pw_stream *stream = _data; stream->proxy = NULL; spa_hook_remove(&stream->proxy_listener); + stream->node_id = SPA_ID_INVALID; stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL); } @@ -821,6 +825,9 @@ static int handle_connect(struct pw_stream *stream) pw_log_debug("stream %p: creating node", stream); impl->node = pw_node_new(impl->core, stream->name, pw_properties_copy(stream->properties), 0); + if (impl->node == NULL) + goto no_node; + impl->impl_node = impl_node; if (impl->direction == SPA_DIRECTION_INPUT) @@ -835,10 +842,21 @@ static int handle_connect(struct pw_stream *stream) pw_node_set_active(impl->node, true); pw_log_debug("stream %p: export node %p", stream, impl->node); - stream->proxy = pw_remote_export(stream->remote, impl->node); + stream->proxy = pw_remote_export(stream->remote, + PW_TYPE_INTERFACE_Node, NULL, impl->node); + if (stream->proxy == NULL) + goto no_proxy; + pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream); return 0; + + no_node: + pw_log_error("stream %p: can't make node: %m", stream); + return -errno; + no_proxy: + pw_log_error("stream %p: can't make proxy: %m", stream); + return -errno; } static void on_remote_state_changed(void *_data, enum pw_remote_state old, @@ -866,11 +884,14 @@ static void on_remote_state_changed(void *_data, enum pw_remote_state old, break; } } -static void on_remote_exported(void *_data, uint32_t id) + +static void on_remote_exported(void *_data, uint32_t proxy_id, uint32_t global_id) { struct pw_stream *stream = _data; - if (stream->proxy && stream->proxy->id == id) + if (stream->proxy && stream->proxy->id == proxy_id) { + stream->node_id = global_id; stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); + } } static const struct pw_remote_events remote_events = { @@ -916,6 +937,7 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, this->remote = remote; this->name = name ? strdup(name) : NULL; + this->node_id = SPA_ID_INVALID; reset_props(&impl->props); @@ -1146,9 +1168,7 @@ pw_stream_connect(struct pw_stream *stream, uint32_t pw_stream_get_node_id(struct pw_stream *stream) { - if (stream->proxy == NULL) - return SPA_ID_INVALID; - return stream->proxy->remote_id; + return stream->node_id; } int pw_stream_disconnect(struct pw_stream *stream) @@ -1165,6 +1185,7 @@ int pw_stream_disconnect(struct pw_stream *stream) if (stream->proxy) { stream->proxy = NULL; spa_hook_remove(&stream->proxy_listener); + stream->node_id = SPA_ID_INVALID; } stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL); return 0; @@ -1277,6 +1298,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { pw_log_trace("stream %p: no more buffers", stream); call_trigger(impl); + errno = EPIPE; return NULL; } pw_log_trace("stream %p: dequeue buffer %d", stream, b->id); diff --git a/src/pipewire/type.h b/src/pipewire/type.h index 3a50b3ce5..a1b205f75 100644 --- a/src/pipewire/type.h +++ b/src/pipewire/type.h @@ -34,6 +34,7 @@ extern "C" { enum { PW_TYPE_FIRST = SPA_TYPE_VENDOR_PipeWire, + PW_TYPE_INTERFACE_START = PW_TYPE_FIRST + SPA_TYPE_INTERFACE_START, PW_TYPE_INTERFACE_Core, PW_TYPE_INTERFACE_Registry, PW_TYPE_INTERFACE_Node, @@ -42,9 +43,12 @@ enum { PW_TYPE_INTERFACE_Link, PW_TYPE_INTERFACE_Client, PW_TYPE_INTERFACE_Module, - PW_TYPE_INTERFACE_ClientNode, PW_TYPE_INTERFACE_Device, + /* extensions */ + PW_TYPE_INTERFACE_EXTENSIONS = PW_TYPE_INTERFACE_START + 0x1000, + PW_TYPE_INTERFACE_ClientNode, + }; #define PW_TYPE_INFO_BASE "PipeWire:" diff --git a/src/tests/test-remote.c b/src/tests/test-remote.c index ee3d63dad..592149fed 100644 --- a/src/tests/test-remote.c +++ b/src/tests/test-remote.c @@ -40,7 +40,7 @@ static void test_abi(void) void (*destroy) (void *data); void (*state_changed) (void *data, enum pw_remote_state old, enum pw_remote_state state, const char *error); - void (*exported) (void *data, uint32_t id); + void (*exported) (void *data, uint32_t proxy_id, uint32_t remote_id); } test = { PW_VERSION_REMOTE_EVENTS, NULL }; TEST_FUNC(ev, test, destroy); @@ -70,7 +70,7 @@ static void remote_state_changed_error(void *data, enum pw_remote_state old, { spa_assert_not_reached(); } -static void remote_exported_error(void *data, uint32_t id) +static void remote_exported_error(void *data, uint32_t proxy_id, uint32_t global_id) { spa_assert_not_reached(); } diff --git a/src/tools/pipewire-cli.c b/src/tools/pipewire-cli.c index a607760f2..0c679958e 100644 --- a/src/tools/pipewire-cli.c +++ b/src/tools/pipewire-cli.c @@ -1149,7 +1149,7 @@ static bool do_export_node(struct data *data, const char *cmd, char *args, char return false; } node = pw_global_get_object(global); - proxy = pw_remote_export(rd->remote, node); + proxy = pw_remote_export(rd->remote, PW_TYPE_INTERFACE_Node, NULL, node); id = pw_map_insert_new(&data->vars, proxy); fprintf(stdout, "%d = @proxy:%d\n", id, pw_proxy_get_id((struct pw_proxy*)proxy));