node: allocate shared mem for activation

Allocate a per node a piece of shared memory where we place the
activation structure with the graph state and io_position.
We can then give this info to nodes so that they can get the position
in the graph directly but also later, activate the next node in
the graph.
This commit is contained in:
Wim Taymans 2019-02-07 12:34:54 +01:00
parent db230fc136
commit 658c1da52f
8 changed files with 83 additions and 89 deletions

@ -1 +1 @@
Subproject commit 96460ab2d1260df22698e2a08509ee657b26d4dd
Subproject commit 25cc424d70b578df0fd305255f833e48b3e4e7af

View file

@ -170,8 +170,6 @@ struct impl {
int fds[2];
int other_fds[2];
struct spa_io_position *position;
};
static int
@ -295,7 +293,8 @@ static struct io *update_io(struct node *this,
io = f;
io->id = id;
io->memid = memid;
spa_log_debug(this->log, "node %p: add io %p %d %d", this, io, id, memid);
spa_log_debug(this->log, "node %p: add io %p %s %d", this, io,
spa_debug_type_find_name(spa_type_io, id), memid);
found:
return io;
@ -430,6 +429,7 @@ static int impl_node_set_io(struct spa_node *node, uint32_t id, void *data, size
return -EINVAL;
mem_offset += mem->offset;
mem_size = size;
m = ensure_mem(impl, mem->fd, SPA_DATA_MemFd, mem->flags);
memid = m->id;
}
@ -1011,15 +1011,10 @@ static int impl_node_process(struct spa_node *node)
{
struct node *this = SPA_CONTAINER_OF(node, struct node, node);
struct impl *impl = this->impl;
struct spa_io_position *q, *rq;
uint64_t cmd = 1;
spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node);
q = impl->this.node->driver_node->rt.position;
rq = impl->position;
*rq = *q;
if (write(this->writefd, &cmd, 8) != 8)
spa_log_warn(this->log, "node %p: error %s", this, strerror(errno));
@ -1297,8 +1292,7 @@ static void node_initialized(void *data)
struct pw_client_node *this = &impl->this;
struct pw_node *node = this->node;
struct pw_global *global;
uint32_t area_size, size;
struct mem *m;
size_t size;
if (this->resource == NULL)
return;
@ -1313,8 +1307,7 @@ static void node_initialized(void *data)
spa_loop_add_source(impl->node.data_loop, &impl->node.data_source);
pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]);
area_size = sizeof(struct spa_io_buffers) * MAX_AREAS;
size = area_size + sizeof(struct spa_io_position);
size = sizeof(struct spa_io_buffers) * MAX_AREAS;
if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD |
PW_MEMBLOCK_FLAG_MAP_READWRITE |
@ -1323,18 +1316,8 @@ static void node_initialized(void *data)
&impl->io_areas) < 0)
return;
impl->position = SPA_MEMBER(impl->io_areas->ptr,
area_size, struct spa_io_position);
m = ensure_mem(impl, impl->io_areas->fd, SPA_DATA_MemFd, impl->io_areas->flags);
pw_log_debug("client-node %p: io areas %p", node, impl->io_areas->ptr);
pw_client_node_resource_set_io(this->resource,
SPA_IO_Position,
m->id,
area_size,
sizeof(struct spa_io_position));
if ((global = pw_node_get_global(node)) != NULL)
pw_client_node_registered(this, pw_global_get_id(global));
}

View file

@ -245,7 +245,14 @@ static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flag
static int impl_node_set_io(struct spa_node *node, uint32_t id, void *data, size_t size)
{
return 0;
struct node *this;
struct impl *impl;
spa_return_val_if_fail(node != NULL, -EINVAL);
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
return spa_node_set_io(impl->cnode, id, data, size);
}
static int impl_node_send_command(struct spa_node *node, const struct spa_command *command)
@ -949,8 +956,6 @@ static void client_node_initialized(void *data)
pw_log_debug("client-stream %p: initialized", &impl->this);
impl->cnode = pw_node_get_implementation(impl->client_node->node);
if ((res = spa_node_get_n_ports(impl->cnode,
&n_input_ports,
&max_input_ports,
@ -1235,6 +1240,8 @@ struct pw_client_stream *pw_client_stream_new(struct pw_resource *resource,
if (impl->client_node == NULL)
goto error_no_node;
impl->cnode = pw_node_get_implementation(impl->client_node->node);
support = pw_core_get_support(impl->core, &n_support);
node_init(&impl->node, NULL, support, n_support);

View file

@ -183,23 +183,25 @@ static void *mem_map(struct node_data *data, struct mapping *map,
pw_map_range_init(&m.map, offset, size, data->core->sc_pagesize);
if (map->ptr == NULL || map->map.offset != m.map.offset || map->map.size != m.map.size) {
map->ptr = mmap(map->ptr, m.map.size, prot, MAP_SHARED, fd, m.map.offset);
if (map->ptr == MAP_FAILED) {
m.ptr = mmap(map->ptr, m.map.size, prot, MAP_SHARED, fd, m.map.offset);
if (m.ptr == MAP_FAILED) {
pw_log_error("remote %p: Failed to mmap memory %d: %m", data, size);
return NULL;
}
map->map = m.map;
map->ptr = m.ptr;
pw_log_debug("remote %p: fd %d map %d %d %p", data, fd, m.map.offset, m.map.size, m.ptr);
}
ptr = SPA_MEMBER(map->ptr, map->map.start, void);
pw_log_debug("remote %p: fd %d mapped %d %d %p", data, fd, offset, size, ptr);
pw_log_debug("remote %p: fd %d ptr %p (%d %d)", data, fd, ptr, offset, size);
return ptr;
}
static void *mem_unmap(struct node_data *data, void *ptr, struct pw_map_range *range)
static void *mem_unmap(struct node_data *data, struct mapping *map)
{
if (ptr != NULL) {
if (munmap(SPA_MEMBER(ptr, -range->start, void), range->size) < 0)
if (map->ptr != NULL) {
if (munmap(map->ptr, map->map.size) < 0)
pw_log_warn("failed to unmap: %m");
}
return NULL;
@ -225,7 +227,7 @@ static void clear_mem(struct node_data *data, struct mem *m)
}
}
if (!has_ref) {
m->map.ptr = mem_unmap(data, m->map.ptr, &m->map.map);
m->map.ptr = mem_unmap(data, &m->map);
close(fd);
}
}

View file

@ -283,8 +283,10 @@ struct pw_memblock * pw_memblock_find(const void *ptr)
struct memblock *m;
spa_list_for_each(m, &_memblocks, link) {
if (ptr >= m->mem.ptr && ptr < SPA_MEMBER(m->mem.ptr, m->mem.size, void))
if (ptr >= m->mem.ptr && ptr < SPA_MEMBER(m->mem.ptr, m->mem.size, void)) {
pw_log_debug("mem %p: found for %p", &m->mem, ptr);
return &m->mem;
}
}
return NULL;
}

View file

@ -87,7 +87,7 @@ static inline void pw_map_range_init(struct pw_map_range *range,
{
range->offset = SPA_ROUND_DOWN_N(offset, page_size);
range->start = offset - range->offset;
range->size = offset + size - range->offset;
range->size = SPA_ROUND_UP_N(range->start + size, page_size);
}

View file

@ -39,10 +39,6 @@
#include "pipewire/type.h"
#include "pipewire/work-queue.h"
#ifndef spa_debug
#define spa_debug pw_log_trace
#endif
#define DEFAULT_QUANTUM 1024u
#define MIN_QUANTUM 64u
@ -59,11 +55,11 @@ struct impl {
struct spa_graph graph;
struct spa_graph_state graph_state;
struct pw_node_activation root_activation;
struct pw_node_activation node_activation;
struct spa_io_position position;
uint32_t next_position;
struct pw_memblock *activation;
};
struct resource_data {
@ -463,7 +459,6 @@ int pw_node_register(struct pw_node *this,
struct pw_global *parent,
struct pw_properties *properties)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_core *core = this->core;
const char *str;
@ -496,7 +491,7 @@ int pw_node_register(struct pw_node *this,
return -ENOMEM;
this->info.id = this->global->id;
impl->position.clock.id = this->info.id;
this->rt.activation->position.clock.id = this->info.id;
pw_properties_setf(this->properties, "node.id", "%d", this->info.id);
pw_node_initialized(this);
@ -524,7 +519,6 @@ do_move_nodes(struct spa_loop *loop,
struct pw_node *this = &src->this;
struct impl *dst = *(struct impl **)data;
struct spa_graph_node *n, *t;
int res;
pw_log_trace("node %p: root %p driver:%p->%p", this,
&this->rt.root, &src->driver_graph, &dst->driver_graph);
@ -534,17 +528,6 @@ do_move_nodes(struct spa_loop *loop,
spa_graph_node_add(&dst->driver_graph, &this->rt.root);
}
if (this->node) {
if ((res = spa_node_set_io(this->node,
SPA_IO_Position,
&dst->position, sizeof(struct spa_io_position))) < 0) {
pw_log_warn("node %p: set position %s", this, spa_strerror(res));
} else {
pw_log_debug("node %p: set position %p", this, &dst->position);
this->rt.position = &dst->position;
}
}
if (&src->driver_graph == &dst->driver_graph)
return 0;
@ -576,6 +559,7 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
struct pw_node *n, *t, *old;
int res;
old = node->driver_node;
@ -584,9 +568,6 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
if (driver == NULL)
driver = node;
spa_list_remove(&node->driver_link);
spa_list_append(&driver->driver_list, &node->driver_link);
spa_list_for_each_safe(n, t, &node->driver_list, driver_link) {
pw_log_debug("driver %p: add %p old %p", driver, n, n->driver_node);
@ -597,6 +578,16 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
spa_list_append(&driver->driver_list, &n->driver_link);
n->driver_node = driver;
pw_node_events_driver_changed(n, driver);
if ((res = spa_node_set_io(n->node,
SPA_IO_Position,
&driver->rt.activation->position,
sizeof(struct spa_io_position))) < 0) {
pw_log_warn("node %p: set position %s", n, spa_strerror(res));
} else {
pw_log_trace("node %p: set position %p", n, &driver->rt.activation->position);
n->rt.position = &driver->rt.activation->position;
}
}
recalc_quantum(driver);
@ -605,11 +596,6 @@ int pw_node_set_driver(struct pw_node *node, struct pw_node *driver)
do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_node *),
true, impl);
if (old != driver) {
node->driver_node = driver;
pw_node_events_driver_changed(node, driver);
}
return 0;
}
@ -660,10 +646,11 @@ struct pw_node *pw_node_new(struct pw_core *core,
{
struct impl *impl;
struct pw_node *this;
size_t size;
impl = calloc(1, sizeof(struct impl) + user_data_size);
if (impl == NULL)
return NULL;
goto error;
if (name == NULL)
name = "node";
@ -678,12 +665,24 @@ struct pw_node *pw_node_new(struct pw_core *core,
if (properties == NULL)
properties = pw_properties_new(NULL, NULL);
if (properties == NULL)
goto no_mem;
goto clean_impl;
this->enabled = true;
this->properties = properties;
size = sizeof(struct pw_node_activation);
if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD |
PW_MEMBLOCK_FLAG_MAP_READWRITE |
PW_MEMBLOCK_FLAG_SEAL,
size,
&impl->activation) < 0)
goto clean_impl;
impl->work = pw_work_queue_new(this->core->main_loop);
if (impl->work == NULL)
goto clean_impl;
this->info.name = strdup(name);
this->data_loop = core->data_loop;
@ -703,8 +702,8 @@ struct pw_node *pw_node_new(struct pw_core *core,
spa_graph_init(&impl->driver_graph, &impl->driver_state);
this->rt.driver = &impl->driver_graph;
this->rt.activation = &impl->root_activation;
spa_graph_node_init(&this->rt.root, &this->rt.activation->state);
this->rt.activation = impl->activation->ptr;
spa_graph_node_init(&this->rt.root, &this->rt.activation->state[0]);
spa_graph_init(&impl->graph, &impl->graph_state);
@ -712,12 +711,12 @@ struct pw_node *pw_node_new(struct pw_core *core,
spa_graph_node_set_callbacks(&this->rt.root,
&spa_graph_node_sub_impl_default, this);
impl->node_activation.state.status = SPA_STATUS_NEED_BUFFER;
spa_graph_node_init(&this->rt.node, &impl->node_activation.state);
impl->node_activation.state[0].status = SPA_STATUS_NEED_BUFFER;
spa_graph_node_init(&this->rt.node, &impl->node_activation.state[0]);
spa_graph_node_add(&impl->graph, &this->rt.node);
impl->position.clock.rate = SPA_FRACTION(1, 48000);
impl->position.size = DEFAULT_QUANTUM;
this->rt.activation->position.clock.rate = SPA_FRACTION(1, 48000);
this->rt.activation->position.size = DEFAULT_QUANTUM;
check_properties(this);
@ -727,8 +726,11 @@ struct pw_node *pw_node_new(struct pw_core *core,
return this;
no_mem:
clean_impl:
if (properties)
pw_properties_free(properties);
free(impl);
error:
return NULL;
}
@ -827,7 +829,7 @@ static void node_process(void *data, int status)
{
struct pw_node *node = data, *driver, *root;
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
bool is_driving = false;
bool is_driving;
driver = node->driver_node;
root = node->driver_root ? node->driver_root : driver;
@ -835,12 +837,7 @@ static void node_process(void *data, int status)
pw_log_trace("node %p: process driver:%d exported:%d %p %p", node,
node->driver, node->exported, driver, driver->rt.driver);
if (node->driver) {
if (driver == node)
is_driving = true;
else if (node->rt.position && driver->rt.position)
*node->rt.position = *driver->rt.position;
}
is_driving = (node->driver && driver == node);
if (is_driving && (root->rt.driver->state->pending == 0 || !node->remote)) {
struct timespec ts;
@ -897,23 +894,24 @@ SPA_EXPORT
void pw_node_set_implementation(struct pw_node *node,
struct spa_node *spa_node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
node->node = spa_node;
pw_log_debug("node %p: implementation %p", node, spa_node);
spa_node_set_callbacks(node->node, &node_callbacks, node);
spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node);
if (spa_node_set_io(node->node,
SPA_IO_Position,
&impl->position, sizeof(struct spa_io_position)) >= 0) {
pw_log_debug("node %p: set position %p", node, &impl->position);
node->rt.position = &impl->position;
&node->rt.activation->position,
sizeof(struct spa_io_position)) >= 0) {
pw_log_debug("node %p: set position %p", node, &node->rt.activation->position);
node->rt.position = &node->rt.activation->position;
}
if (spa_node_set_io(node->node,
SPA_IO_Clock,
&impl->position.clock, sizeof(struct spa_io_position)) >= 0) {
pw_log_debug("node %p: set clock %p", node, &impl->position.clock);
node->rt.clock = &impl->position.clock;
&node->rt.activation->position.clock,
sizeof(struct spa_io_clock)) >= 0) {
pw_log_debug("node %p: set clock %p", node, &node->rt.activation->position.clock);
node->rt.clock = &node->rt.activation->position.clock;
}
}

View file

@ -39,12 +39,13 @@ extern "C" {
#include "pipewire/stream.h"
#include "pipewire/log.h"
#include <spa/utils/type-info.h>
#ifndef spa_debug
#define spa_debug pw_log_trace
#endif
#include <spa/graph/graph.h>
#include <spa/utils/type-info.h>
struct pw_command;
@ -298,7 +299,8 @@ struct pw_node_activation {
uint64_t awake_time;
uint64_t finish_time;
struct spa_graph_state state;
struct spa_io_position position;
struct spa_graph_state state[2]; /* one current state and one next state */
};
#define pw_node_events_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_node_events, m, v, ##__VA_ARGS__)