mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-04 13:30:12 -05:00
node: implement activation
Make an eventfd for each node and listen for events when the node is activated. Reorganize some graphs links to make it possible to activiate nodes by signaling the eventfd Pass the peer node to each remote node and let the remote node directly activate the peer when needed. Let each node signal the driver node when finished. With this we don't need to go through the daemon to schedule the graph, nodes will simply activate eachother. We only go to the server when there is a server node to schedule. Keep stats about the state of each node and the time it was triggered, running and finished.
This commit is contained in:
parent
f45e0b8966
commit
5de7898808
15 changed files with 470 additions and 173 deletions
|
|
@ -27,10 +27,7 @@
|
|||
#include <stdio.h>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <dlfcn.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/mman.h>
|
||||
#include <time.h>
|
||||
#include <sys/eventfd.h>
|
||||
|
||||
#include <spa/node/node.h>
|
||||
|
|
@ -162,6 +159,7 @@ struct impl {
|
|||
|
||||
struct pw_map io_map;
|
||||
struct pw_memblock *io_areas;
|
||||
struct pw_node_activation *activation;
|
||||
|
||||
struct spa_hook node_listener;
|
||||
struct spa_hook resource_listener;
|
||||
|
|
@ -1011,12 +1009,19 @@ static int impl_node_process(struct spa_node *node)
|
|||
{
|
||||
struct node *this = SPA_CONTAINER_OF(node, struct node, node);
|
||||
struct impl *impl = this->impl;
|
||||
uint64_t cmd = 1;
|
||||
struct pw_node *n = impl->this.node;
|
||||
struct timespec ts;
|
||||
uint64_t cmd = 1, nsec;
|
||||
|
||||
spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node);
|
||||
|
||||
if (write(this->writefd, &cmd, 8) != 8)
|
||||
spa_log_warn(this->log, "node %p: error %s", this, strerror(errno));
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
|
||||
n->rt.activation->status = TRIGGERED;
|
||||
n->rt.activation->signal_time = nsec;
|
||||
|
||||
if (write(this->writefd, &cmd, sizeof(cmd)) != sizeof(cmd))
|
||||
spa_log_warn(this->log, "node %p: error %m", this);
|
||||
|
||||
return SPA_STATUS_OK;
|
||||
}
|
||||
|
|
@ -1155,9 +1160,8 @@ static void node_on_data_fd_events(struct spa_source *source)
|
|||
if (source->rmask & SPA_IO_IN) {
|
||||
uint64_t cmd;
|
||||
|
||||
if (read(this->data_source.fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t))
|
||||
spa_log_warn(this->log, "node %p: error reading message: %s",
|
||||
this, strerror(errno));
|
||||
if (read(this->data_source.fd, &cmd, sizeof(cmd)) != sizeof(cmd) || cmd != 1)
|
||||
spa_log_warn(this->log, "node %p: read %"PRIu64" failed %m", this, cmd);
|
||||
|
||||
spa_log_trace(this->log, "node %p: got process", this);
|
||||
this->callbacks->process(this->callbacks_data, SPA_STATUS_HAVE_BUFFER);
|
||||
|
|
@ -1278,12 +1282,24 @@ static void client_node_resource_destroy(void *data)
|
|||
void pw_client_node_registered(struct pw_client_node *this, uint32_t node_id)
|
||||
{
|
||||
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
|
||||
struct pw_node *node = this->node;
|
||||
struct mem *m;
|
||||
|
||||
pw_log_debug("client-node %p: %d", this, node_id);
|
||||
pw_client_node_resource_transport(this->resource,
|
||||
node_id,
|
||||
impl->other_fds[0],
|
||||
impl->other_fds[1]);
|
||||
|
||||
|
||||
m = ensure_mem(impl, node->activation->fd, SPA_DATA_MemFd, node->activation->flags);
|
||||
|
||||
pw_client_node_resource_set_activation(this->resource,
|
||||
node_id,
|
||||
impl->other_fds[1],
|
||||
m->id,
|
||||
0,
|
||||
sizeof(struct pw_node_activation));
|
||||
}
|
||||
|
||||
static void node_initialized(void *data)
|
||||
|
|
@ -1572,6 +1588,50 @@ static void node_port_removed(void *data, struct pw_port *port)
|
|||
clear_port(this, p);
|
||||
}
|
||||
|
||||
static void node_peer_added(void *data, struct pw_node *peer)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
struct node *this = &impl->node;
|
||||
struct mem *m;
|
||||
|
||||
if (this->resource == NULL)
|
||||
return;
|
||||
|
||||
m = ensure_mem(impl, peer->activation->fd, SPA_DATA_MemFd, peer->activation->flags);
|
||||
|
||||
pw_log_debug("client-node %p: peer %p %u added %u", &impl->this, peer,
|
||||
peer->info.id, m->id);
|
||||
|
||||
pw_client_node_resource_set_activation(this->resource,
|
||||
peer->info.id,
|
||||
peer->source.fd,
|
||||
m->id,
|
||||
0,
|
||||
sizeof(struct pw_node_activation));
|
||||
}
|
||||
|
||||
static void node_peer_removed(void *data, struct pw_node *peer)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
struct node *this = &impl->node;
|
||||
|
||||
if (this->resource == NULL)
|
||||
return;
|
||||
|
||||
pw_client_node_resource_set_activation(this->resource,
|
||||
peer->info.id,
|
||||
-1,
|
||||
SPA_ID_INVALID,
|
||||
0,
|
||||
0);
|
||||
}
|
||||
|
||||
static void node_driver_changed(void *data, struct pw_node *old, struct pw_node *driver)
|
||||
{
|
||||
node_peer_removed(data, old);
|
||||
node_peer_added(data, driver);
|
||||
}
|
||||
|
||||
static const struct pw_node_events node_events = {
|
||||
PW_VERSION_NODE_EVENTS,
|
||||
.free = node_free,
|
||||
|
|
@ -1579,6 +1639,9 @@ static const struct pw_node_events node_events = {
|
|||
.port_init = node_port_init,
|
||||
.port_added = node_port_added,
|
||||
.port_removed = node_port_removed,
|
||||
.peer_added = node_peer_added,
|
||||
.peer_removed = node_peer_removed,
|
||||
.driver_changed = node_driver_changed,
|
||||
};
|
||||
|
||||
static const struct pw_resource_events resource_events = {
|
||||
|
|
@ -1586,6 +1649,18 @@ static const struct pw_resource_events resource_events = {
|
|||
.destroy = client_node_resource_destroy,
|
||||
};
|
||||
|
||||
static int root_impl_process(void *data, struct spa_graph_node *node)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_log_trace("client-node %p: process", impl);
|
||||
return spa_node_process(&impl->node.node);
|
||||
}
|
||||
|
||||
static const struct spa_graph_node_callbacks root_impl = {
|
||||
SPA_VERSION_GRAPH_NODE_CALLBACKS,
|
||||
.process = root_impl_process,
|
||||
};
|
||||
|
||||
/** Create a new client node
|
||||
* \param client an owner \ref pw_client
|
||||
* \param id an id
|
||||
|
|
@ -1646,6 +1721,8 @@ struct pw_client_node *pw_client_node_new(struct pw_resource *resource,
|
|||
|
||||
this->node->remote = true;
|
||||
|
||||
spa_graph_node_set_callbacks(&this->node->rt.root, &root_impl, this);
|
||||
|
||||
pw_resource_add_listener(this->resource,
|
||||
&impl->resource_listener,
|
||||
&resource_events,
|
||||
|
|
|
|||
|
|
@ -893,7 +893,7 @@ static int impl_node_process(struct spa_node *node)
|
|||
trigger = status & SPA_STATUS_HAVE_BUFFER;
|
||||
|
||||
if (trigger && !impl->this.node->driver)
|
||||
spa_graph_run(impl->client_node->node->rt.driver);
|
||||
spa_graph_node_process(&impl->client_node->node->rt.root);
|
||||
|
||||
return status;
|
||||
}
|
||||
|
|
@ -988,9 +988,6 @@ static void client_node_initialized(void *data)
|
|||
else
|
||||
monitor = false;
|
||||
|
||||
spa_graph_node_add(impl->client_node->node->rt.driver, &impl->client_node->node->rt.root);
|
||||
impl->client_node->node->driver_root = impl->this.node;
|
||||
|
||||
impl->client_port = pw_node_find_port(impl->client_node->node, impl->direction, 0);
|
||||
if (impl->client_port == NULL)
|
||||
return;
|
||||
|
|
@ -1185,17 +1182,32 @@ static void node_initialized(void *data)
|
|||
pw_client_node_registered(impl->client_node, impl->this.node->global->id);
|
||||
}
|
||||
|
||||
static void node_driver_changed(void *data, struct pw_node *driver)
|
||||
static void node_peer_added(void *data, struct pw_node *peer)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_log_debug("client-stream %p: driver changed %p", &impl->this, driver);
|
||||
pw_node_events_peer_added(impl->client_node->node, peer);
|
||||
}
|
||||
|
||||
static void node_peer_removed(void *data, struct pw_node *peer)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_node_events_peer_removed(impl->client_node->node, peer);
|
||||
}
|
||||
|
||||
static void node_driver_changed(void *data, struct pw_node *old, struct pw_node *driver)
|
||||
{
|
||||
struct impl *impl = data;
|
||||
pw_log_debug("client-stream %p: driver changed %p->%p", &impl->this, old, driver);
|
||||
impl->client_node->node->driver_node = driver;
|
||||
pw_node_events_driver_changed(impl->client_node->node, old, driver);
|
||||
}
|
||||
|
||||
static const struct pw_node_events node_events = {
|
||||
PW_VERSION_NODE_EVENTS,
|
||||
.free = node_free,
|
||||
.initialized = node_initialized,
|
||||
.peer_added = node_peer_added,
|
||||
.peer_removed = node_peer_removed,
|
||||
.driver_changed = node_driver_changed,
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -430,6 +430,34 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int client_node_demarshal_set_activation(void *object, void *data, size_t size)
|
||||
{
|
||||
struct pw_proxy *proxy = object;
|
||||
struct spa_pod_parser prs;
|
||||
uint32_t node_id, sigidx, memid, off, sz;
|
||||
int signalfd;
|
||||
|
||||
spa_pod_parser_init(&prs, data, size);
|
||||
if (spa_pod_parser_get_struct(&prs,
|
||||
SPA_POD_Int(&node_id),
|
||||
SPA_POD_Int(&sigidx),
|
||||
SPA_POD_Int(&memid),
|
||||
SPA_POD_Int(&off),
|
||||
SPA_POD_Int(&sz)) < 0)
|
||||
return -EINVAL;
|
||||
|
||||
signalfd = pw_protocol_native_get_proxy_fd(proxy, sigidx);
|
||||
if (signalfd == -1)
|
||||
return -EINVAL;
|
||||
|
||||
pw_proxy_notify(proxy, struct pw_client_node_proxy_events, set_activation, 0,
|
||||
node_id,
|
||||
signalfd,
|
||||
memid,
|
||||
off, sz);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int client_node_demarshal_set_io(void *object, void *data, size_t size)
|
||||
{
|
||||
struct pw_proxy *proxy = object;
|
||||
|
|
@ -692,6 +720,29 @@ client_node_marshal_port_set_io(void *object,
|
|||
pw_protocol_native_end_resource(resource, b);
|
||||
}
|
||||
|
||||
static void
|
||||
client_node_marshal_set_activation(void *object,
|
||||
uint32_t node_id,
|
||||
int signalfd,
|
||||
uint32_t memid,
|
||||
uint32_t offset,
|
||||
uint32_t size)
|
||||
{
|
||||
struct pw_resource *resource = object;
|
||||
struct spa_pod_builder *b;
|
||||
|
||||
b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_ACTIVATION);
|
||||
|
||||
spa_pod_builder_add_struct(b,
|
||||
SPA_POD_Int(node_id),
|
||||
SPA_POD_Int(pw_protocol_native_add_resource_fd(resource, signalfd)),
|
||||
SPA_POD_Int(memid),
|
||||
SPA_POD_Int(offset),
|
||||
SPA_POD_Int(size));
|
||||
|
||||
pw_protocol_native_end_resource(resource, b);
|
||||
}
|
||||
|
||||
static void
|
||||
client_node_marshal_set_io(void *object,
|
||||
uint32_t id,
|
||||
|
|
@ -899,6 +950,7 @@ static const struct pw_client_node_proxy_events pw_protocol_native_client_node_e
|
|||
&client_node_marshal_port_use_buffers,
|
||||
&client_node_marshal_port_command,
|
||||
&client_node_marshal_port_set_io,
|
||||
&client_node_marshal_set_activation,
|
||||
};
|
||||
|
||||
static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_event_demarshal[] = {
|
||||
|
|
@ -914,6 +966,7 @@ static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_
|
|||
{ &client_node_demarshal_port_use_buffers, 0 },
|
||||
{ &client_node_demarshal_port_command, 0 },
|
||||
{ &client_node_demarshal_port_set_io, 0 },
|
||||
{ &client_node_demarshal_set_activation, 0 }
|
||||
};
|
||||
|
||||
static const struct pw_protocol_marshal pw_protocol_native_client_node_marshal = {
|
||||
|
|
|
|||
|
|
@ -77,6 +77,13 @@ struct mix {
|
|||
bool active;
|
||||
};
|
||||
|
||||
struct link {
|
||||
struct spa_graph_link link;
|
||||
struct pw_node_activation *activation;
|
||||
int signalfd;
|
||||
uint32_t mem_id;
|
||||
};
|
||||
|
||||
struct node_data {
|
||||
struct pw_remote *remote;
|
||||
struct pw_core *core;
|
||||
|
|
@ -101,11 +108,7 @@ struct node_data {
|
|||
|
||||
struct spa_io_position *position;
|
||||
|
||||
struct spa_graph_node_callbacks callbacks;
|
||||
void *callbacks_data;
|
||||
|
||||
struct spa_graph_state state;
|
||||
struct spa_graph_link link;
|
||||
struct pw_array links;
|
||||
};
|
||||
|
||||
/** \endcond */
|
||||
|
|
@ -135,7 +138,6 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
|
|||
{
|
||||
struct pw_proxy *proxy = user_data;
|
||||
struct node_data *data = proxy->user_data;
|
||||
struct spa_graph_node *node = &data->node->rt.root;
|
||||
|
||||
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
|
||||
pw_log_warn("got error");
|
||||
|
|
@ -146,11 +148,15 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
|
|||
if (mask & SPA_IO_IN) {
|
||||
uint64_t cmd;
|
||||
|
||||
if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t) || cmd != 1)
|
||||
if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd) || cmd != 1)
|
||||
pw_log_warn("proxy %p: read %"PRIu64" failed %m", proxy, cmd);
|
||||
|
||||
pw_log_trace("remote %p: process %p", data->remote, proxy);
|
||||
spa_graph_run(node->graph);
|
||||
|
||||
|
||||
|
||||
|
||||
spa_graph_node_process(&data->node->rt.root);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -536,7 +542,7 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
|
|||
pw_loop_invoke(data->core->data_loop,
|
||||
do_pause_source, 1, NULL, 0, true, data);
|
||||
}
|
||||
if ((res = spa_node_send_command(data->node->node, command)) < 0)
|
||||
if ((res = pw_node_set_state(data->node, PW_NODE_STATE_IDLE)) < 0)
|
||||
pw_log_warn("node %p: pause failed", proxy);
|
||||
|
||||
pw_client_node_proxy_done(data->node_proxy, seq, res);
|
||||
|
|
@ -544,7 +550,7 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
|
|||
case SPA_NODE_COMMAND_Start:
|
||||
pw_log_debug("node %p: start %d", proxy, seq);
|
||||
|
||||
if ((res = spa_node_send_command(data->node->node, command)) < 0) {
|
||||
if ((res = pw_node_set_state(data->node, PW_NODE_STATE_RUNNING)) < 0) {
|
||||
pw_log_warn("node %p: start failed", proxy);
|
||||
}
|
||||
else if (data->rtsocket_source) {
|
||||
|
|
@ -867,6 +873,66 @@ client_node_port_set_io(void *object,
|
|||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int link_signal_func(void *user_data)
|
||||
{
|
||||
struct link *link = user_data;
|
||||
uint64_t cmd = 1;
|
||||
pw_log_trace("link %p: signal", link);
|
||||
if (write(link->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd))
|
||||
pw_log_warn("link %p: write failed %m", link);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static void
|
||||
client_node_set_activation(void *object,
|
||||
uint32_t node_id,
|
||||
int signalfd,
|
||||
uint32_t memid,
|
||||
uint32_t offset,
|
||||
uint32_t size)
|
||||
{
|
||||
struct pw_proxy *proxy = object;
|
||||
struct node_data *data = proxy->user_data;
|
||||
struct pw_node *node = data->node;
|
||||
struct mem *m;
|
||||
struct pw_node_activation *ptr;
|
||||
|
||||
if (memid == SPA_ID_INVALID) {
|
||||
ptr = NULL;
|
||||
size = 0;
|
||||
}
|
||||
else {
|
||||
m = find_mem(data, memid);
|
||||
if (m == NULL) {
|
||||
pw_log_warn("unknown memory id %u", memid);
|
||||
return;
|
||||
}
|
||||
ptr = mem_map(data, &m->map, m->fd,
|
||||
PROT_READ|PROT_WRITE, offset, size);
|
||||
if (ptr == NULL)
|
||||
return;
|
||||
m->ref++;
|
||||
}
|
||||
pw_log_debug("node %p: set activation %d", node, node_id);
|
||||
|
||||
#if 0
|
||||
if (ptr) {
|
||||
struct link *link;
|
||||
link = pw_array_add(&data->links, sizeof(struct link));
|
||||
link->activation = ptr;
|
||||
link->signalfd = signalfd;
|
||||
link->link.signal = link_signal_func;
|
||||
link->link.signal_data = link;
|
||||
spa_graph_link_add(&node->rt.root, &link->activation->state[0], &link->link);
|
||||
pw_log_debug("node %p: required %d, pending %d", node,
|
||||
link->link.state->required,
|
||||
link->link.state->pending);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static const struct pw_client_node_proxy_events client_node_events = {
|
||||
PW_VERSION_CLIENT_NODE_PROXY_EVENTS,
|
||||
.add_mem = client_node_add_mem,
|
||||
|
|
@ -881,6 +947,7 @@ static const struct pw_client_node_proxy_events client_node_events = {
|
|||
.port_use_buffers = client_node_port_use_buffers,
|
||||
.port_command = client_node_port_command,
|
||||
.port_set_io = client_node_port_set_io,
|
||||
.set_activation = client_node_set_activation,
|
||||
};
|
||||
|
||||
static void do_node_init(struct pw_proxy *proxy)
|
||||
|
|
@ -996,28 +1063,6 @@ static const struct pw_proxy_events proxy_events = {
|
|||
.destroy = node_proxy_destroy,
|
||||
};
|
||||
|
||||
static int remote_impl_signal(void *data)
|
||||
{
|
||||
struct node_data *d = data;
|
||||
uint64_t cmd = 1;
|
||||
pw_log_trace("remote %p: send process", data);
|
||||
write(d->rtwritefd, &cmd, 8);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int remote_process(void *data, struct spa_graph_node *node)
|
||||
{
|
||||
struct node_data *d = data;
|
||||
spa_debug("remote %p: begin graph", data);
|
||||
spa_graph_state_reset(&d->state);
|
||||
return d->callbacks.process(d->callbacks_data, node);
|
||||
}
|
||||
|
||||
static const struct spa_graph_node_callbacks impl_root = {
|
||||
SPA_VERSION_GRAPH_NODE_CALLBACKS,
|
||||
.process = remote_process,
|
||||
};
|
||||
|
||||
static struct pw_proxy *node_export(struct pw_remote *remote, void *object, bool do_free)
|
||||
{
|
||||
struct pw_node *node = object;
|
||||
|
|
@ -1042,13 +1087,6 @@ static struct pw_proxy *node_export(struct pw_remote *remote, void *object, bool
|
|||
data->node_proxy = (struct pw_client_node_proxy *)proxy;
|
||||
data->remote_id = SPA_ID_INVALID;
|
||||
|
||||
data->link.signal = remote_impl_signal;
|
||||
data->link.signal_data = data;
|
||||
data->callbacks = *node->rt.root.callbacks;
|
||||
spa_graph_node_set_callbacks(&node->rt.root, &impl_root, data);
|
||||
spa_graph_link_add(&node->rt.root, &data->state, &data->link);
|
||||
spa_graph_node_add(node->rt.driver, &node->rt.root);
|
||||
|
||||
node->exported = true;
|
||||
|
||||
spa_list_init(&data->free_mix);
|
||||
|
|
@ -1059,6 +1097,8 @@ static struct pw_proxy *node_export(struct pw_remote *remote, void *object, bool
|
|||
|
||||
pw_array_init(&data->mems, 64);
|
||||
pw_array_ensure_size(&data->mems, sizeof(struct mem) * 64);
|
||||
pw_array_init(&data->links, 64);
|
||||
pw_array_ensure_size(&data->links, sizeof(struct link) * 64);
|
||||
|
||||
pw_proxy_add_listener(proxy, &data->proxy_listener, &proxy_events, data);
|
||||
pw_node_add_listener(node, &data->node_listener, &node_events, data);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue