Rework how targets are managed

Manage them like we do on the client and reuse logic. Make a node
function to safely add and remove a target.

Activate the targets from the process loop when we can be sure that we
can resume them. This avoids incrementing the pending state when we are
not going to be able to resume the nodes (like when the cycle is ongoing
and we have already been scheduled) and avoids glitches and xruns.

When a node is added to the poll loop, it can activate its own targets.
This is mostly for driver so that they have something to schedule and
can then activate the other targets.

Try to resume the target when it is removed and we are supposed to be
scheduled.

Also add targets to the target_list when the node is remote to make sure
the profiler can see the targets as well.

Keep the node in the INACTIVE state as long as the eventfd of the node
is not added to the loop. Skip nodes in the INACTIVE state from going to
the NOT_TRIGGERED status, which avoids scheduling the node.

Make sure we remove any local targets we have in a node when we export
it, we will receive new targets from the server.

This should eliminate any glitches when adding and removing nodes from
the graph.

See #4026, #2468
This commit is contained in:
Wim Taymans 2024-05-28 18:19:51 +02:00
parent ce985def73
commit 46f71b1cd2
6 changed files with 296 additions and 345 deletions

View file

@ -279,6 +279,7 @@ struct link {
struct pw_memmap *mem;
struct pw_node_activation *activation;
int signalfd;
bool active;
};
struct context {
@ -1888,9 +1889,58 @@ static inline uint32_t cycle_wait(struct client *c)
return nframes;
}
static void trigger_link(struct link *l, uint64_t nsec)
{
struct client *c = l->client;
struct pw_node_activation *a = l->activation;
struct pw_node_activation_state *state = &a->state[0];
uint64_t cmd = 1;
pw_log_trace_fp("%p: link %p-%d %p %d/%d", c, l, l->node_id, state,
state->pending, state->required);
if (pw_node_activation_state_dec(state)) {
if (SPA_ATOMIC_CAS(a->status,
PW_NODE_ACTIVATION_NOT_TRIGGERED,
PW_NODE_ACTIVATION_TRIGGERED)) {
a->signal_time = nsec;
pw_log_trace_fp("%p: signal %p %p", c, l, state);
if (SPA_UNLIKELY(write(l->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd)))
pw_log_warn("%p: write failed %m", c);
}
}
}
static inline void activate_link(struct client *c, struct link *l)
{
if (SPA_UNLIKELY(!l->active)) {
if (!c->async) {
struct pw_node_activation_state *state = &l->activation->state[0];
SPA_ATOMIC_INC(state->required);
SPA_ATOMIC_INC(state->pending);
}
l->active = true;
}
}
static inline void deactivate_link(struct client *c, struct link *l, uint64_t trigger)
{
if (SPA_UNLIKELY(l->active)) {
if (!c->async) {
struct pw_node_activation_state *state = &l->activation->state[0];
if (trigger != 0)
trigger_link(l, trigger);
SPA_ATOMIC_DEC(state->required);
}
l->active = false;
}
}
static inline void signal_sync(struct client *c)
{
uint64_t cmd, nsec;
uint64_t nsec;
struct link *l;
struct pw_node_activation *activation = c->activation;
int old_status;
@ -1904,30 +1954,12 @@ static inline void signal_sync(struct client *c)
if (c->async || old_status != PW_NODE_ACTIVATION_AWAKE)
return;
cmd = 1;
spa_list_for_each(l, &c->rt.target_links, target_link) {
struct pw_node_activation_state *state;
if (SPA_UNLIKELY(l->activation == NULL))
continue;
state = &l->activation->state[0];
pw_log_trace_fp("%p: link %p %p %d/%d", c, l, state,
state->pending, state->required);
if (pw_node_activation_state_dec(state)) {
if (SPA_ATOMIC_CAS(l->activation->status,
PW_NODE_ACTIVATION_NOT_TRIGGERED,
PW_NODE_ACTIVATION_TRIGGERED)) {
l->activation->signal_time = nsec;
pw_log_trace_fp("%p: signal %p %p", c, l, state);
if (SPA_UNLIKELY(write(l->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd)))
pw_log_warn("%p: write failed %m", c);
}
}
activate_link(c, l);
trigger_link(l, nsec);
}
}
@ -2203,6 +2235,7 @@ static int do_prepare_client(struct spa_loop *loop, bool async, uint32_t seq,
if (c->rt.prepared)
return 0;
SPA_ATOMIC_STORE(c->activation->status, PW_NODE_ACTIVATION_FINISHED);
pw_loop_update_io(c->l,
c->socket_source,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
@ -2217,10 +2250,20 @@ static int do_unprepare_client(struct spa_loop *loop, bool async, uint32_t seq,
const void *data, size_t size, void *user_data)
{
struct client *c = user_data;
int old_state;
uint64_t trigger = 0;
struct link *l;
if (!c->rt.prepared)
return 0;
old_state = SPA_ATOMIC_XCHG(c->activation->status, PW_NODE_ACTIVATION_INACTIVE);
if (old_state != PW_NODE_ACTIVATION_FINISHED)
trigger = get_time_ns(c->l->system);
spa_list_for_each(l, &c->rt.target_links, target_link)
deactivate_link(c, l, trigger);
pw_loop_update_io(c->l,
c->socket_source, SPA_IO_ERR | SPA_IO_HUP);
@ -2240,7 +2283,6 @@ static int client_node_command(void *data, const struct spa_command *command)
if (c->started) {
pw_data_loop_invoke(c->loop,
do_unprepare_client, SPA_ID_INVALID, NULL, 0, false, c);
c->started = false;
}
break;
@ -2944,8 +2986,18 @@ do_remove_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct link *link = user_data;
struct client *c = link->client;
pw_log_trace("link %p activate", link);
spa_list_remove(&link->target_link);
if (c->rt.prepared) {
int old_state = SPA_ATOMIC_LOAD(c->activation->status);
uint64_t trigger = 0;
if (old_state != PW_NODE_ACTIVATION_FINISHED)
trigger = get_time_ns(c->l->system);
deactivate_link(c, link, trigger);
}
free_link(link);
return 0;
}