From 658c1da52fc8f142f0d6603f0f1ba712badf82e7 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 7 Feb 2019 12:34:54 +0100 Subject: [PATCH] node: allocate shared mem for activation Allocate a per node a piece of shared memory where we place the activation structure with the graph state and io_position. We can then give this info to nodes so that they can get the position in the graph directly but also later, activate the next node in the graph. --- pipewire-jack | 2 +- src/modules/module-client-node/client-node.c | 27 +---- .../module-client-node/client-stream.c | 13 ++- src/modules/module-client-node/remote-node.c | 16 +-- src/pipewire/mem.c | 4 +- src/pipewire/mem.h | 2 +- src/pipewire/node.c | 102 +++++++++--------- src/pipewire/private.h | 6 +- 8 files changed, 83 insertions(+), 89 deletions(-) diff --git a/pipewire-jack b/pipewire-jack index 96460ab2d..25cc424d7 160000 --- a/pipewire-jack +++ b/pipewire-jack @@ -1 +1 @@ -Subproject commit 96460ab2d1260df22698e2a08509ee657b26d4dd +Subproject commit 25cc424d70b578df0fd305255f833e48b3e4e7af diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index bd3c3e02f..b2bd9eec9 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -170,8 +170,6 @@ struct impl { int fds[2]; int other_fds[2]; - - struct spa_io_position *position; }; static int @@ -295,7 +293,8 @@ static struct io *update_io(struct node *this, io = f; io->id = id; io->memid = memid; - spa_log_debug(this->log, "node %p: add io %p %d %d", this, io, id, memid); + spa_log_debug(this->log, "node %p: add io %p %s %d", this, io, + spa_debug_type_find_name(spa_type_io, id), memid); found: return io; @@ -430,6 +429,7 @@ static int impl_node_set_io(struct spa_node *node, uint32_t id, void *data, size return -EINVAL; mem_offset += mem->offset; + mem_size = size; m = ensure_mem(impl, mem->fd, SPA_DATA_MemFd, mem->flags); memid = m->id; } @@ -1011,15 +1011,10 @@ static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); struct impl *impl = this->impl; - struct spa_io_position *q, *rq; uint64_t cmd = 1; spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node); - q = impl->this.node->driver_node->rt.position; - rq = impl->position; - *rq = *q; - if (write(this->writefd, &cmd, 8) != 8) spa_log_warn(this->log, "node %p: error %s", this, strerror(errno)); @@ -1297,8 +1292,7 @@ static void node_initialized(void *data) struct pw_client_node *this = &impl->this; struct pw_node *node = this->node; struct pw_global *global; - uint32_t area_size, size; - struct mem *m; + size_t size; if (this->resource == NULL) return; @@ -1313,8 +1307,7 @@ static void node_initialized(void *data) spa_loop_add_source(impl->node.data_loop, &impl->node.data_source); pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); - area_size = sizeof(struct spa_io_buffers) * MAX_AREAS; - size = area_size + sizeof(struct spa_io_position); + size = sizeof(struct spa_io_buffers) * MAX_AREAS; if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | PW_MEMBLOCK_FLAG_MAP_READWRITE | @@ -1323,18 +1316,8 @@ static void node_initialized(void *data) &impl->io_areas) < 0) return; - impl->position = SPA_MEMBER(impl->io_areas->ptr, - area_size, struct spa_io_position); - - m = ensure_mem(impl, impl->io_areas->fd, SPA_DATA_MemFd, impl->io_areas->flags); pw_log_debug("client-node %p: io areas %p", node, impl->io_areas->ptr); - pw_client_node_resource_set_io(this->resource, - SPA_IO_Position, - m->id, - area_size, - sizeof(struct spa_io_position)); - if ((global = pw_node_get_global(node)) != NULL) pw_client_node_registered(this, pw_global_get_id(global)); } diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index bebd27c2a..b321da7ad 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -245,7 +245,14 @@ static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flag static int impl_node_set_io(struct spa_node *node, uint32_t id, void *data, size_t size) { - return 0; + struct node *this; + struct impl *impl; + spa_return_val_if_fail(node != NULL, -EINVAL); + + this = SPA_CONTAINER_OF(node, struct node, node); + impl = this->impl; + + return spa_node_set_io(impl->cnode, id, data, size); } static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) @@ -949,8 +956,6 @@ static void client_node_initialized(void *data) pw_log_debug("client-stream %p: initialized", &impl->this); - impl->cnode = pw_node_get_implementation(impl->client_node->node); - if ((res = spa_node_get_n_ports(impl->cnode, &n_input_ports, &max_input_ports, @@ -1235,6 +1240,8 @@ struct pw_client_stream *pw_client_stream_new(struct pw_resource *resource, if (impl->client_node == NULL) goto error_no_node; + impl->cnode = pw_node_get_implementation(impl->client_node->node); + support = pw_core_get_support(impl->core, &n_support); node_init(&impl->node, NULL, support, n_support); diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index 40bc6f395..1bae6b232 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -183,23 +183,25 @@ static void *mem_map(struct node_data *data, struct mapping *map, pw_map_range_init(&m.map, offset, size, data->core->sc_pagesize); if (map->ptr == NULL || map->map.offset != m.map.offset || map->map.size != m.map.size) { - map->ptr = mmap(map->ptr, m.map.size, prot, MAP_SHARED, fd, m.map.offset); - if (map->ptr == MAP_FAILED) { + m.ptr = mmap(map->ptr, m.map.size, prot, MAP_SHARED, fd, m.map.offset); + if (m.ptr == MAP_FAILED) { pw_log_error("remote %p: Failed to mmap memory %d: %m", data, size); return NULL; } map->map = m.map; + map->ptr = m.ptr; + pw_log_debug("remote %p: fd %d map %d %d %p", data, fd, m.map.offset, m.map.size, m.ptr); } ptr = SPA_MEMBER(map->ptr, map->map.start, void); - pw_log_debug("remote %p: fd %d mapped %d %d %p", data, fd, offset, size, ptr); + pw_log_debug("remote %p: fd %d ptr %p (%d %d)", data, fd, ptr, offset, size); return ptr; } -static void *mem_unmap(struct node_data *data, void *ptr, struct pw_map_range *range) +static void *mem_unmap(struct node_data *data, struct mapping *map) { - if (ptr != NULL) { - if (munmap(SPA_MEMBER(ptr, -range->start, void), range->size) < 0) + if (map->ptr != NULL) { + if (munmap(map->ptr, map->map.size) < 0) pw_log_warn("failed to unmap: %m"); } return NULL; @@ -225,7 +227,7 @@ static void clear_mem(struct node_data *data, struct mem *m) } } if (!has_ref) { - m->map.ptr = mem_unmap(data, m->map.ptr, &m->map.map); + m->map.ptr = mem_unmap(data, &m->map); close(fd); } } diff --git a/src/pipewire/mem.c b/src/pipewire/mem.c index 2a5b64ce9..ed89bbd3e 100644 --- a/src/pipewire/mem.c +++ b/src/pipewire/mem.c @@ -283,8 +283,10 @@ struct pw_memblock * pw_memblock_find(const void *ptr) struct memblock *m; spa_list_for_each(m, &_memblocks, link) { - if (ptr >= m->mem.ptr && ptr < SPA_MEMBER(m->mem.ptr, m->mem.size, void)) + if (ptr >= m->mem.ptr && ptr < SPA_MEMBER(m->mem.ptr, m->mem.size, void)) { + pw_log_debug("mem %p: found for %p", &m->mem, ptr); return &m->mem; + } } return NULL; } diff --git a/src/pipewire/mem.h b/src/pipewire/mem.h index afccb5d15..56252020a 100644 --- a/src/pipewire/mem.h +++ b/src/pipewire/mem.h @@ -87,7 +87,7 @@ static inline void pw_map_range_init(struct pw_map_range *range, { range->offset = SPA_ROUND_DOWN_N(offset, page_size); range->start = offset - range->offset; - range->size = offset + size - range->offset; + range->size = SPA_ROUND_UP_N(range->start + size, page_size); } diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 1ba16ae5f..8a0b0ebbd 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -39,10 +39,6 @@ #include "pipewire/type.h" #include "pipewire/work-queue.h" -#ifndef spa_debug -#define spa_debug pw_log_trace -#endif - #define DEFAULT_QUANTUM 1024u #define MIN_QUANTUM 64u @@ -59,11 +55,11 @@ struct impl { struct spa_graph graph; struct spa_graph_state graph_state; - struct pw_node_activation root_activation; struct pw_node_activation node_activation; - struct spa_io_position position; uint32_t next_position; + + struct pw_memblock *activation; }; struct resource_data { @@ -463,7 +459,6 @@ int pw_node_register(struct pw_node *this, struct pw_global *parent, struct pw_properties *properties) { - struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_core *core = this->core; const char *str; @@ -496,7 +491,7 @@ int pw_node_register(struct pw_node *this, return -ENOMEM; this->info.id = this->global->id; - impl->position.clock.id = this->info.id; + this->rt.activation->position.clock.id = this->info.id; pw_properties_setf(this->properties, "node.id", "%d", this->info.id); pw_node_initialized(this); @@ -524,7 +519,6 @@ do_move_nodes(struct spa_loop *loop, struct pw_node *this = &src->this; struct impl *dst = *(struct impl **)data; struct spa_graph_node *n, *t; - int res; pw_log_trace("node %p: root %p driver:%p->%p", this, &this->rt.root, &src->driver_graph, &dst->driver_graph); @@ -534,17 +528,6 @@ do_move_nodes(struct spa_loop *loop, spa_graph_node_add(&dst->driver_graph, &this->rt.root); } - if (this->node) { - if ((res = spa_node_set_io(this->node, - SPA_IO_Position, - &dst->position, sizeof(struct spa_io_position))) < 0) { - pw_log_warn("node %p: set position %s", this, spa_strerror(res)); - } else { - pw_log_debug("node %p: set position %p", this, &dst->position); - this->rt.position = &dst->position; - } - } - if (&src->driver_graph == &dst->driver_graph) return 0; @@ -576,6 +559,7 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); struct pw_node *n, *t, *old; + int res; old = node->driver_node; @@ -584,9 +568,6 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) if (driver == NULL) driver = node; - spa_list_remove(&node->driver_link); - spa_list_append(&driver->driver_list, &node->driver_link); - spa_list_for_each_safe(n, t, &node->driver_list, driver_link) { pw_log_debug("driver %p: add %p old %p", driver, n, n->driver_node); @@ -597,6 +578,16 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) spa_list_append(&driver->driver_list, &n->driver_link); n->driver_node = driver; pw_node_events_driver_changed(n, driver); + + if ((res = spa_node_set_io(n->node, + SPA_IO_Position, + &driver->rt.activation->position, + sizeof(struct spa_io_position))) < 0) { + pw_log_warn("node %p: set position %s", n, spa_strerror(res)); + } else { + pw_log_trace("node %p: set position %p", n, &driver->rt.activation->position); + n->rt.position = &driver->rt.activation->position; + } } recalc_quantum(driver); @@ -605,11 +596,6 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver) do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *), true, impl); - if (old != driver) { - node->driver_node = driver; - pw_node_events_driver_changed(node, driver); - } - return 0; } @@ -660,10 +646,11 @@ struct pw_node *pw_node_new(struct pw_core *core, { struct impl *impl; struct pw_node *this; + size_t size; impl = calloc(1, sizeof(struct impl) + user_data_size); if (impl == NULL) - return NULL; + goto error; if (name == NULL) name = "node"; @@ -678,12 +665,24 @@ struct pw_node *pw_node_new(struct pw_core *core, if (properties == NULL) properties = pw_properties_new(NULL, NULL); if (properties == NULL) - goto no_mem; + goto clean_impl; this->enabled = true; this->properties = properties; + size = sizeof(struct pw_node_activation); + + if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | + PW_MEMBLOCK_FLAG_MAP_READWRITE | + PW_MEMBLOCK_FLAG_SEAL, + size, + &impl->activation) < 0) + goto clean_impl; + impl->work = pw_work_queue_new(this->core->main_loop); + if (impl->work == NULL) + goto clean_impl; + this->info.name = strdup(name); this->data_loop = core->data_loop; @@ -703,8 +702,8 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_graph_init(&impl->driver_graph, &impl->driver_state); this->rt.driver = &impl->driver_graph; - this->rt.activation = &impl->root_activation; - spa_graph_node_init(&this->rt.root, &this->rt.activation->state); + this->rt.activation = impl->activation->ptr; + spa_graph_node_init(&this->rt.root, &this->rt.activation->state[0]); spa_graph_init(&impl->graph, &impl->graph_state); @@ -712,12 +711,12 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_graph_node_set_callbacks(&this->rt.root, &spa_graph_node_sub_impl_default, this); - impl->node_activation.state.status = SPA_STATUS_NEED_BUFFER; - spa_graph_node_init(&this->rt.node, &impl->node_activation.state); + impl->node_activation.state[0].status = SPA_STATUS_NEED_BUFFER; + spa_graph_node_init(&this->rt.node, &impl->node_activation.state[0]); spa_graph_node_add(&impl->graph, &this->rt.node); - impl->position.clock.rate = SPA_FRACTION(1, 48000); - impl->position.size = DEFAULT_QUANTUM; + this->rt.activation->position.clock.rate = SPA_FRACTION(1, 48000); + this->rt.activation->position.size = DEFAULT_QUANTUM; check_properties(this); @@ -727,8 +726,11 @@ struct pw_node *pw_node_new(struct pw_core *core, return this; - no_mem: + clean_impl: + if (properties) + pw_properties_free(properties); free(impl); + error: return NULL; } @@ -827,7 +829,7 @@ static void node_process(void *data, int status) { struct pw_node *node = data, *driver, *root; struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - bool is_driving = false; + bool is_driving; driver = node->driver_node; root = node->driver_root ? node->driver_root : driver; @@ -835,12 +837,7 @@ static void node_process(void *data, int status) pw_log_trace("node %p: process driver:%d exported:%d %p %p", node, node->driver, node->exported, driver, driver->rt.driver); - if (node->driver) { - if (driver == node) - is_driving = true; - else if (node->rt.position && driver->rt.position) - *node->rt.position = *driver->rt.position; - } + is_driving = (node->driver && driver == node); if (is_driving && (root->rt.driver->state->pending == 0 || !node->remote)) { struct timespec ts; @@ -897,23 +894,24 @@ SPA_EXPORT void pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node) { - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - node->node = spa_node; + pw_log_debug("node %p: implementation %p", node, spa_node); spa_node_set_callbacks(node->node, &node_callbacks, node); spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node); if (spa_node_set_io(node->node, SPA_IO_Position, - &impl->position, sizeof(struct spa_io_position)) >= 0) { - pw_log_debug("node %p: set position %p", node, &impl->position); - node->rt.position = &impl->position; + &node->rt.activation->position, + sizeof(struct spa_io_position)) >= 0) { + pw_log_debug("node %p: set position %p", node, &node->rt.activation->position); + node->rt.position = &node->rt.activation->position; } if (spa_node_set_io(node->node, SPA_IO_Clock, - &impl->position.clock, sizeof(struct spa_io_position)) >= 0) { - pw_log_debug("node %p: set clock %p", node, &impl->position.clock); - node->rt.clock = &impl->position.clock; + &node->rt.activation->position.clock, + sizeof(struct spa_io_clock)) >= 0) { + pw_log_debug("node %p: set clock %p", node, &node->rt.activation->position.clock); + node->rt.clock = &node->rt.activation->position.clock; } } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 20ee7b700..91569c617 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -39,12 +39,13 @@ extern "C" { #include "pipewire/stream.h" #include "pipewire/log.h" +#include + #ifndef spa_debug #define spa_debug pw_log_trace #endif #include -#include struct pw_command; @@ -298,7 +299,8 @@ struct pw_node_activation { uint64_t awake_time; uint64_t finish_time; - struct spa_graph_state state; + struct spa_io_position position; + struct spa_graph_state state[2]; /* one current state and one next state */ }; #define pw_node_events_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_node_events, m, v, ##__VA_ARGS__)