impl-node: optimize peer activation

Before this patch, every link between node A and B would increment the
activation counter of B and add a new target to A. If there are N links
between A and B, resuming node A would then do N decrements of the
activation counter of B (by iterating the target_list) before finally
activating B.

This is not optimal, we can share the same activation count for all the
links between A and B.

Add a new pw_node_peer struct to keep track of links between A and B.
Activating a link between A and B activates the single activation of B,
deactivating all links deactivates B again. Waking up B after A finished
now no longer depends on the number of links between A and B.

This is particularly important for remote nodes because before the patch
they would get the activation memory and the eventfd of the peer __for
each link__. With huge amounts of links (like in stress tests) this
would result in too many fds. filtering out the fds for the same peer
was not easily possible because the server would still increment the
counters for each link and sharing the eventfd would require refcounting
it and closing duplicates.

After this patch the remote node receives 1 activation memory and eventfd
for each peer node, independent of the number of links between them. Even
for stereo streams this saves half of the memory and fds.
This commit is contained in:
Wim Taymans 2023-05-10 16:56:38 +02:00
parent 742039ff3f
commit 67fcd9646d
3 changed files with 96 additions and 35 deletions

View file

@ -52,6 +52,81 @@ struct impl {
/** \endcond */ /** \endcond */
static struct pw_node_peer *pw_node_peer_ref(struct pw_impl_node *onode, struct pw_impl_node *inode)
{
struct pw_node_peer *peer;
spa_list_for_each(peer, &onode->peer_list, link) {
if (peer->target.node == inode) {
pw_log_debug("exiting peer %p from %p to %p", peer, onode, inode);
peer->ref++;
return peer;
}
}
peer = calloc(1, sizeof(*peer));
if (peer == NULL)
return NULL;
peer->ref = 1;
peer->output = onode;
peer->active_count = 0;
peer->target.node = inode;
peer->target.activation = inode->rt.activation;
peer->target.signal_func = inode->rt.target.signal_func;
peer->target.data = inode->rt.target.data;
spa_list_append(&onode->peer_list, &peer->link);
pw_log_debug("new peer %p from %p to %p", peer, onode, inode);
pw_impl_node_emit_peer_added(onode, inode);
return peer;
}
static void pw_node_peer_unref(struct pw_node_peer *peer)
{
if (--peer->ref > 0)
return;
spa_list_remove(&peer->link);
pw_log_debug("remove peer %p from %p to %p", peer, peer->output, peer->target.node);
pw_impl_node_emit_peer_removed(peer->output, peer->target.node);
free(peer);
}
static void pw_node_peer_activate(struct pw_node_peer *peer)
{
struct pw_node_activation_state *state;
state = &peer->target.activation->state[0];
if (peer->active_count++ == 0) {
spa_list_append(&peer->output->rt.target_list, &peer->target.link);
if (!peer->target.active && peer->output->rt.driver_target.node != NULL) {
state->required++;
peer->target.active = true;
}
}
pw_log_trace("%p: node:%p state:%p pending:%d/%d", peer->output,
peer->target.node, state, state->pending, state->required);
}
static void pw_node_peer_deactivate(struct pw_node_peer *peer)
{
struct pw_node_activation_state *state;
state = &peer->target.activation->state[0];
if (--peer->active_count == 0) {
spa_list_remove(&peer->target.link);
if (peer->target.active) {
state->required--;
peer->target.active = false;
}
}
pw_log_trace("%p: node:%p state:%p pending:%d/%d", peer->output,
peer->target.node, state, state->pending, state->required);
}
static void info_changed(struct pw_impl_link *link) static void info_changed(struct pw_impl_link *link)
{ {
struct pw_resource *resource; struct pw_resource *resource;
@ -566,28 +641,15 @@ do_activate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{ {
struct pw_impl_link *this = user_data; struct pw_impl_link *this = user_data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_trace("%p: activate", this); pw_log_trace("%p: activate", this);
spa_list_append(&this->output->rt.mix_list, &this->rt.out_mix.rt_link); spa_list_append(&this->output->rt.mix_list, &this->rt.out_mix.rt_link);
spa_list_append(&this->input->rt.mix_list, &this->rt.in_mix.rt_link); spa_list_append(&this->input->rt.mix_list, &this->rt.in_mix.rt_link);
if (impl->inode != impl->onode) { if (this->peer)
struct pw_node_activation_state *state; pw_node_peer_activate(this->peer);
this->rt.target.activation = impl->inode->rt.activation;
spa_list_append(&impl->onode->rt.target_list, &this->rt.target.link);
state = &this->rt.target.activation->state[0];
if (!this->rt.target.active && impl->onode->rt.driver_target.node != NULL) {
state->required++;
this->rt.target.active = true;
}
pw_log_trace("%p: node:%p state:%p pending:%d/%d", this, impl->inode,
state, state->pending, state->required);
}
return 0; return 0;
} }
@ -782,26 +844,14 @@ do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{ {
struct pw_impl_link *this = user_data; struct pw_impl_link *this = user_data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_trace("%p: disable %p and %p", this, &this->rt.in_mix, &this->rt.out_mix); pw_log_trace("%p: disable %p and %p", this, &this->rt.in_mix, &this->rt.out_mix);
spa_list_remove(&this->rt.out_mix.rt_link); spa_list_remove(&this->rt.out_mix.rt_link);
spa_list_remove(&this->rt.in_mix.rt_link); spa_list_remove(&this->rt.in_mix.rt_link);
if (impl->inode != impl->onode) { if (this->peer)
struct pw_node_activation_state *state; pw_node_peer_deactivate(this->peer);
spa_list_remove(&this->rt.target.link);
state = &this->rt.target.activation->state[0];
if (this->rt.target.active) {
state->required--;
this->rt.target.active = false;
}
pw_log_trace("%p: node:%p state:%p pending:%d/%d", this, impl->inode,
state, state->pending, state->required);
}
return 0; return 0;
} }
@ -1252,9 +1302,6 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
impl->inode = input_node; impl->inode = input_node;
} }
this->rt.target.signal_func = impl->inode->rt.target.signal_func;
this->rt.target.data = impl->inode->rt.target.data;
pw_log_debug("%p: constructed out:%p:%d.%d -> in:%p:%d.%d", impl, pw_log_debug("%p: constructed out:%p:%d.%d -> in:%p:%d.%d", impl,
output_node, output->port_id, this->rt.out_mix.port.port_id, output_node, output->port_id, this->rt.out_mix.port.port_id,
input_node, input->port_id, this->rt.in_mix.port.port_id); input_node, input->port_id, this->rt.in_mix.port.port_id);
@ -1272,7 +1319,8 @@ struct pw_impl_link *pw_context_create_link(struct pw_context *context,
pw_impl_port_recalc_latency(output); pw_impl_port_recalc_latency(output);
pw_impl_port_recalc_latency(input); pw_impl_port_recalc_latency(input);
pw_impl_node_emit_peer_added(impl->onode, impl->inode); if (impl->onode != impl->inode)
this->peer = pw_node_peer_ref(impl->onode, impl->inode);
return this; return this;
@ -1407,7 +1455,8 @@ void pw_impl_link_destroy(struct pw_impl_link *link)
if (link->registered) if (link->registered)
spa_list_remove(&link->link); spa_list_remove(&link->link);
pw_impl_node_emit_peer_removed(impl->onode, impl->inode); if (link->peer)
pw_node_peer_unref(link->peer);
try_unlink_controls(impl, link->output, link->input); try_unlink_controls(impl, link->output, link->input);

View file

@ -1331,6 +1331,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
impl->pending_id = SPA_ID_INVALID; impl->pending_id = SPA_ID_INVALID;
spa_list_init(&this->follower_list); spa_list_init(&this->follower_list);
spa_list_init(&this->peer_list);
spa_hook_list_init(&this->listener_list); spa_hook_list_init(&this->listener_list);

View file

@ -738,6 +738,8 @@ struct pw_impl_node {
struct spa_list sort_link; /**< link used to sort nodes */ struct spa_list sort_link; /**< link used to sort nodes */
struct spa_list peer_list; /* list of peers */
struct spa_node *node; /**< SPA node implementation */ struct spa_node *node; /**< SPA node implementation */
struct spa_hook listener; struct spa_hook listener;
@ -909,6 +911,14 @@ struct pw_control_link {
unsigned int valid:1; unsigned int valid:1;
}; };
struct pw_node_peer {
int ref;
int active_count;
struct spa_list link; /**< link in peer list */
struct pw_impl_node *output; /**< the output node */
struct pw_node_target target; /**< target of the input node */
};
#define pw_impl_link_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_impl_link_events, m, v, ##__VA_ARGS__) #define pw_impl_link_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_impl_link_events, m, v, ##__VA_ARGS__)
#define pw_impl_link_emit_destroy(l) pw_impl_link_emit(l, destroy, 0) #define pw_impl_link_emit_destroy(l) pw_impl_link_emit(l, destroy, 0)
#define pw_impl_link_emit_free(l) pw_impl_link_emit(l, free, 0) #define pw_impl_link_emit_free(l) pw_impl_link_emit(l, free, 0)
@ -940,10 +950,11 @@ struct pw_impl_link {
struct pw_control_link control; struct pw_control_link control;
struct pw_control_link notify; struct pw_control_link notify;
struct pw_node_peer *peer;
struct { struct {
struct pw_impl_port_mix out_mix; /**< port added to the output mixer */ struct pw_impl_port_mix out_mix; /**< port added to the output mixer */
struct pw_impl_port_mix in_mix; /**< port added to the input mixer */ struct pw_impl_port_mix in_mix; /**< port added to the input mixer */
struct pw_node_target target; /**< target to trigger the input node */
} rt; } rt;
void *user_data; void *user_data;