client-node: activate using the node eventfd

Don't make an extra eventfd for activating the remote-node, we can
use the server side eventfd and send them to the remote side using
the transport.

The remote node already adds the eventfd to the data-loop so avoids
doing the same on the server.

This makes driver nodes trigger all remote nodes directly instead of
going through an intermediate eventfd. For resuming nodes, we already
used the node eventfd directly so this only a small optimization
for the initial cycle start.
This commit is contained in:
Wim Taymans 2023-05-02 17:16:18 +02:00
parent 8f799af6df
commit f8663ab31c
3 changed files with 27 additions and 34 deletions

View file

@ -99,7 +99,6 @@ struct node {
struct pw_impl_client *client;
struct spa_source data_source;
int writefd;
struct pw_map ports[2];
@ -128,9 +127,6 @@ struct impl {
uint32_t bind_node_version;
uint32_t bind_node_id;
int fds[2];
int other_fds[2];
};
#define pw_client_node_resource(r,m,v,...) \
@ -903,18 +899,15 @@ static int impl_node_process(void *object)
struct pw_impl_node *n = impl->this.node;
struct timespec ts;
spa_log_trace_fp(this->log, "%p: send process driver:%p", this, impl->this.node->driver_node);
/* this should not be called, we call the exported node
* directly */
spa_log_warn(this->log, "exported node activation");
if (SPA_UNLIKELY(spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts) < 0))
spa_zero(ts);
n->rt.activation->status = PW_NODE_ACTIVATION_TRIGGERED;
n->rt.activation->signal_time = SPA_TIMESPEC_TO_NSEC(&ts);
if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->writefd, 1) < 0))
spa_log_warn(this->log, "%p: error %m", this);
return SPA_STATUS_OK;
return n->rt.target.signal_func(n->rt.target.data);
}
static struct pw_node *
@ -1259,8 +1252,8 @@ void pw_impl_client_node_registered(struct pw_impl_client_node *this, struct pw_
pw_resource_set_bound_id(this->resource, node_id);
pw_client_node_resource_transport(this->resource,
impl->other_fds[0],
impl->other_fds[1],
impl->this.node->source.fd,
impl->node.data_source.fd,
impl->activation->id,
0,
sizeof(struct pw_node_activation));
@ -1302,15 +1295,12 @@ static void node_initialized(void *data)
struct pw_global *global;
struct spa_system *data_system = impl->node.data_system;
impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[0];
node->data_source.fd = impl->fds[0];
node->writefd = impl->fds[1];
node->data_source.fd = spa_system_eventfd_create(data_system,
SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
spa_loop_add_source(node->data_loop, &node->data_source);
pw_log_debug("%p: transport read-fd:%d write-fd:%d", node, impl->fds[0], impl->fds[1]);
pw_log_debug("%p: transport read-fd:%d write-fd:%d", node,
node->data_source.fd, impl->this.node->source.fd);
if (add_area(impl) < 0)
return;
@ -1355,10 +1345,8 @@ static void node_free(void *data)
pw_map_clear(&impl->node.ports[1]);
pw_map_clear(&impl->io_map);
if (impl->fds[0] != -1)
spa_system_close(data_system, impl->fds[0]);
if (impl->fds[1] != -1)
spa_system_close(data_system, impl->fds[1]);
if (node->data_source.fd != -1)
spa_system_close(data_system, node->data_source.fd);
free(impl);
}
@ -1724,7 +1712,7 @@ struct pw_impl_client_node *pw_impl_client_node_new(struct pw_resource *resource
this = &impl->this;
impl->context = context;
impl->fds[0] = impl->fds[1] = -1;
impl->node.data_source.fd = -1;
pw_log_debug("%p: new", &impl->node);
support = pw_context_get_support(impl->context, &n_support);

View file

@ -247,6 +247,7 @@ static int client_node_transport(void *_data,
int readfd, int writefd, uint32_t mem_id, uint32_t offset, uint32_t size)
{
struct node_data *data = _data;
struct pw_impl_node *node = data->node;
struct pw_proxy *proxy = (struct pw_proxy*)data->client_node;
clean_transport(data);
@ -258,18 +259,18 @@ static int client_node_transport(void *_data,
return -errno;
}
data->node->rt.activation = data->activation->ptr;
node->rt.activation = data->activation->ptr;
pw_log_debug("remote-node %p: fds:%d %d node:%u activation:%p",
proxy, readfd, writefd, data->remote_id, data->activation->ptr);
data->rtwritefd = writefd;
spa_system_close(data->data_system, data->node->source.fd);
data->node->source.fd = readfd;
spa_system_close(data->data_system, node->source.fd);
node->source.fd = readfd;
data->have_transport = true;
if (data->node->active)
if (node->active)
pw_client_node_set_active(data->client_node, true);
return 0;

View file

@ -139,7 +139,8 @@ static void remove_node(struct pw_impl_node *this)
}
static int
do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
struct pw_impl_node *driver = this->driver_node;
@ -155,6 +156,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, s
pw_log_warn("%p: read failed %m", this);
this->added = true;
if (!this->remote)
spa_loop_add_source(loop, &this->source);
add_node(this, driver);
}
@ -162,10 +164,12 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, s
}
static int
do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data)
do_node_remove(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct pw_impl_node *this = user_data;
if (this->added) {
if (!this->remote)
spa_loop_remove_source(loop, &this->source);
remove_node(this);
this->added = false;
@ -1120,7 +1124,7 @@ static inline int node_signal_func(void *data)
if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, this->source.fd, 1) < 0))
pw_log_warn("node %p: write failed %m", this);
return 0;
return SPA_STATUS_OK;
}
static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_activation *a)