impl-node: run the remote driver node logic remotely

Don't signal the pipewire daemon to run the driver. We can transfer the
complete driver state to the client and run everything there.
This commit is contained in:
Wim Taymans 2023-05-19 18:26:59 +02:00
parent 3f46044c39
commit 2f67a6a5b4
3 changed files with 64 additions and 120 deletions

View file

@ -1197,6 +1197,56 @@ static void client_node_resource_pong(void *data, int seq)
spa_node_emit_result(&impl->hooks, seq, 0, 0, NULL);
}
static void node_peer_added(void *data, struct pw_impl_node *peer)
{
struct impl *impl = data;
struct pw_memblock *m;
m = pw_mempool_import_block(impl->client->pool, peer->activation);
if (m == NULL) {
pw_log_debug("%p: can't ensure mem: %m", impl);
return;
}
pw_log_debug("%p: peer %p id:%u added mem_id:%u", &impl->this, peer,
peer->info.id, m->id);
if (impl->resource == NULL)
return;
pw_client_node_resource_set_activation(impl->resource,
peer->info.id,
peer->source.fd,
m->id,
0,
sizeof(struct pw_node_activation));
}
static void node_peer_removed(void *data, struct pw_impl_node *peer)
{
struct impl *impl = data;
struct pw_memblock *m;
m = pw_mempool_find_fd(impl->client->pool, peer->activation->fd);
if (m == NULL) {
pw_log_warn("%p: unknown peer %p fd:%d", impl, peer,
peer->source.fd);
return;
}
pw_log_debug("%p: peer %p %u removed", impl, peer,
peer->info.id);
if (impl->resource != NULL) {
pw_client_node_resource_set_activation(impl->resource,
peer->info.id,
-1,
SPA_ID_INVALID,
0,
0);
}
pw_memblock_unref(m);
}
void pw_impl_client_node_registered(struct pw_impl_client_node *this, struct pw_global *global)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
@ -1225,6 +1275,8 @@ void pw_impl_client_node_registered(struct pw_impl_client_node *this, struct pw_
0,
sizeof(struct pw_node_activation));
node_peer_added(impl, node);
if (impl->bind_node_id) {
pw_global_bind(global, client, PW_PERM_ALL,
impl->bind_node_version, impl->bind_node_id);
@ -1544,62 +1596,6 @@ static void node_port_removed(void *data, struct pw_impl_port *port)
clear_port(impl, p);
}
static void node_peer_added(void *data, struct pw_impl_node *peer)
{
struct impl *impl = data;
struct pw_memblock *m;
if (peer == impl->this.node)
return;
m = pw_mempool_import_block(impl->client->pool, peer->activation);
if (m == NULL) {
pw_log_debug("%p: can't ensure mem: %m", impl);
return;
}
pw_log_debug("%p: peer %p id:%u added mem_id:%u", &impl->this, peer,
peer->info.id, m->id);
if (impl->resource == NULL)
return;
pw_client_node_resource_set_activation(impl->resource,
peer->info.id,
peer->source.fd,
m->id,
0,
sizeof(struct pw_node_activation));
}
static void node_peer_removed(void *data, struct pw_impl_node *peer)
{
struct impl *impl = data;
struct pw_memblock *m;
if (peer == impl->this.node)
return;
m = pw_mempool_find_fd(impl->client->pool, peer->activation->fd);
if (m == NULL) {
pw_log_warn("%p: unknown peer %p fd:%d", impl, peer,
peer->source.fd);
return;
}
pw_log_debug("%p: peer %p %u removed", impl, peer,
peer->info.id);
if (impl->resource != NULL) {
pw_client_node_resource_set_activation(impl->resource,
peer->info.id,
-1,
SPA_ID_INVALID,
0,
0);
}
pw_memblock_unref(m);
}
static void node_driver_changed(void *data, struct pw_impl_node *old, struct pw_impl_node *driver)
{
struct impl *impl = data;

View file

@ -880,13 +880,6 @@ client_node_set_activation(void *_data,
struct link *link;
int res = 0;
if (data->remote_id == node_id) {
pw_log_debug("node %p: our activation %u: %u %u %u", node, node_id,
memid, offset, size);
spa_system_close(data->data_system, signalfd);
return 0;
}
if (memid == SPA_ID_INVALID) {
mm = ptr = NULL;
size = 0;
@ -899,7 +892,13 @@ client_node_set_activation(void *_data,
}
ptr = mm->ptr;
}
pw_log_debug("node %p: set activation %d %p %u %u", node, node_id, ptr, offset, size);
if (data->remote_id == node_id) {
pw_log_debug("node %p: our activation %u: %u %p %u %u", node, node_id,
memid, ptr, offset, size);
} else {
pw_log_debug("node %p: set activation %d %u %p %u %u", node, node_id,
memid, ptr, offset, size);
}
if (ptr) {
link = calloc(1, sizeof(struct link));
@ -1171,61 +1170,6 @@ static inline uint64_t get_time_ns(struct spa_system *system)
spa_system_clock_gettime(system, CLOCK_MONOTONIC, &ts);
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static int node_ready(void *d, int status)
{
struct node_data *data = d;
struct pw_impl_node *node = data->node;
struct pw_node_activation *a = node->rt.activation;
struct spa_system *data_system = data->data_system;
struct pw_impl_port *p;
pw_log_trace_fp("node %p: ready driver:%d exported:%d status:%d", node,
node->driver, node->exported, status);
if (status & SPA_STATUS_HAVE_DATA) {
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
spa_node_process_fast(p->mix);
}
a->state[0].status = status;
a->signal_time = get_time_ns(data_system);
if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, data->rtwritefd, 1) < 0))
pw_log_warn("node %p: write failed %m", node);
return 0;
}
static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
{
return 0;
}
static int node_xrun(void *d, uint64_t trigger, uint64_t delay, struct spa_pod *info)
{
struct node_data *data = d;
struct pw_impl_node *node = data->node;
struct pw_node_activation *a = node->rt.activation;
a->xrun_count++;
a->xrun_time = trigger;
a->xrun_delay = delay;
a->max_delay = SPA_MAX(a->max_delay, delay);
pw_log_debug("node %p: XRun! count:%u time:%"PRIu64" delay:%"PRIu64" max:%"PRIu64,
node, a->xrun_count, trigger, delay, a->max_delay);
pw_context_driver_emit_xrun(data->context, node);
return 0;
}
static const struct spa_node_callbacks node_callbacks = {
SPA_VERSION_NODE_CALLBACKS,
.ready = node_ready,
.reuse_buffer = node_reuse_buffer,
.xrun = node_xrun
};
static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_free,
size_t user_data_size)
@ -1275,7 +1219,6 @@ static struct pw_proxy *node_export(struct pw_core *core, void *object, bool do_
&data->proxy_client_node_listener,
&proxy_client_node_events, data);
spa_node_set_callbacks(node->node, &node_callbacks, data);
pw_impl_node_add_listener(node, &data->node_listener, &node_events, data);
pw_client_node_add_listener(data->client_node,

View file

@ -719,6 +719,8 @@ static void update_io(struct pw_impl_node *node)
node->target_rate = node->rt.position->clock.target_rate;
node->target_quantum = node->rt.position->clock.target_duration;
node->target_pending = false;
pw_impl_node_emit_peer_added(node, node);
} else if (node->driver) {
pw_log_warn("%p: can't set position on driver", node);
}
@ -901,6 +903,9 @@ int pw_impl_node_set_driver(struct pw_impl_node *node, struct pw_impl_node *driv
pw_impl_node_emit_driver_changed(node, old, driver);
pw_impl_node_emit_peer_removed(old, node);
pw_impl_node_emit_peer_added(driver, node);
return 0;
}