impl-node: partially revert target rework

We can't let a client decrement the required state because if it crashes
or fails to decrement, the graph becomes unschedulable.
This commit is contained in:
Wim Taymans 2024-06-13 16:01:16 +02:00
parent b421331275
commit b7af52e3fb
5 changed files with 37 additions and 86 deletions

View file

@ -278,7 +278,6 @@ struct link {
struct pw_memmap *mem;
struct pw_node_activation *activation;
int signalfd;
bool active;
};
struct context {
@ -1909,29 +1908,8 @@ static void trigger_link(struct link *l, uint64_t nsec)
}
}
static inline void activate_link(struct client *c, struct link *l)
{
if (SPA_UNLIKELY(!l->active)) {
if (!c->async) {
struct pw_node_activation_state *state = &l->activation->state[0];
SPA_ATOMIC_INC(state->required);
SPA_ATOMIC_INC(state->pending);
}
l->active = true;
}
}
static inline void deactivate_link(struct client *c, struct link *l, uint64_t trigger)
{
if (SPA_UNLIKELY(l->active)) {
if (!c->async) {
struct pw_node_activation_state *state = &l->activation->state[0];
if (trigger != 0)
trigger_link(l, trigger);
SPA_ATOMIC_DEC(state->required);
}
l->active = false;
}
}
static inline void signal_sync(struct client *c)
@ -1954,7 +1932,6 @@ static inline void signal_sync(struct client *c)
if (SPA_UNLIKELY(l->activation == NULL))
continue;
activate_link(c, l);
trigger_link(l, nsec);
}
}
@ -2261,6 +2238,7 @@ static int do_prepare_client(struct spa_loop *loop, bool async, uint32_t seq,
{
struct client *c = user_data;
pw_log_debug("%p prepared:%d ", c, c->rt.prepared);
if (c->rt.prepared)
return 0;
@ -2283,6 +2261,7 @@ static int do_unprepare_client(struct spa_loop *loop, bool async, uint32_t seq,
uint64_t trigger = 0;
struct link *l;
pw_log_debug("%p prepared:%d ", c, c->rt.prepared);
if (!c->rt.prepared)
return 0;
@ -2290,8 +2269,10 @@ static int do_unprepare_client(struct spa_loop *loop, bool async, uint32_t seq,
if (old_state != PW_NODE_ACTIVATION_FINISHED)
trigger = get_time_ns(c->l->system);
spa_list_for_each(l, &c->rt.target_links, target_link)
deactivate_link(c, l, trigger);
spa_list_for_each(l, &c->rt.target_links, target_link) {
if (!c->async && trigger != 0)
trigger_link(l, trigger);
}
pw_loop_update_io(c->l,
c->socket_source, SPA_IO_ERR | SPA_IO_HUP);
@ -2980,7 +2961,7 @@ do_add_link(struct spa_loop *loop,
{
struct link *link = user_data;
struct client *c = link->client;
pw_log_trace("link %p activate", link);
pw_log_trace("link %p", link);
spa_list_append(&c->rt.target_links, &link->target_link);
return 0;
}
@ -2992,7 +2973,7 @@ do_remove_link(struct spa_loop *loop,
struct link *link = user_data;
struct client *c = link->client;
pw_log_trace("link %p activate", link);
pw_log_trace("link %p", link);
spa_list_remove(&link->target_link);
if (c->rt.prepared) {

View file

@ -1802,10 +1802,6 @@ struct pw_impl_client_node *pw_impl_client_node_new(struct pw_resource *resource
if (resource->version < 5) {
pw_log_warn("using server side driver for old client version %d", resource->version);
}
if (resource->version < 6) {
this->node->server_prepare = true;
pw_log_warn("using server side prepare for old client version %d", resource->version);
}
pw_resource_add_listener(this->resource,
&impl->resource_listener,

View file

@ -25,9 +25,8 @@ extern "C" {
/*
* version 4: new port_set_mix_info event added
* version 5: driver nodes are scheduled on the client
* version 6: nodes activate peer links themselves when ready
*/
#define PW_VERSION_CLIENT_NODE 6
#define PW_VERSION_CLIENT_NODE 5
struct pw_client_node;
#define PW_EXTENSION_MODULE_CLIENT_NODE PIPEWIRE_MODULE_PREFIX "module-client-node"

View file

@ -74,7 +74,7 @@ static inline void activate_target(struct pw_impl_node *node, struct pw_node_tar
{
struct pw_node_activation_state *state = &t->activation->state[0];
if (!t->active) {
if (!node->async || node->driving) {
if (!node->async && !node->exported) {
SPA_ATOMIC_INC(state->required);
SPA_ATOMIC_INC(state->pending);
}
@ -88,14 +88,15 @@ static inline void deactivate_target(struct pw_impl_node *node, struct pw_node_t
{
if (t->active) {
struct pw_node_activation_state *state = &t->activation->state[0];
if (!node->async || node->driving) {
if (!node->async) {
/* the driver copies the required to the pending state
* so first try to resume the node and then decrement the
* required state. This way we either resume with the old value
* or we don't when the driver has not yet copied */
if (trigger != 0)
trigger_target(t, trigger);
SPA_ATOMIC_DEC(state->required);
if (!node->exported)
SPA_ATOMIC_DEC(state->required);
}
t->active = false;
pw_log_debug("%p: target state:%p id:%d pending:%d/%d trigger:%"PRIu64,
@ -115,21 +116,6 @@ static inline void trigger_targets(struct pw_impl_node *node, int status, uint64
trigger_target(ta, nsec);
}
static inline void trigger_activate_targets(struct pw_impl_node *node, int status, uint64_t nsec)
{
struct pw_node_target *ta;
pw_log_trace_fp("%p: (%s-%u) activate and trigger targets %"PRIu64,
node, node->name, node->info.id, nsec);
spa_list_for_each(ta, &node->rt.target_list, link) {
activate_target(node, ta);
trigger_target(ta, nsec);
}
node->rt.trigger_targets = trigger_targets;
}
/** \endcond */
/* Called from the node data loop when a node needs to be scheduled by
@ -154,27 +140,29 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq,
if (this->rt.prepared)
return 0;
if (!this->server_prepare) {
if (!this->remote) {
/* clear the eventfd in case it was written to while the node was stopped */
res = spa_system_eventfd_read(this->rt.target.system, this->source.fd, &dummy);
if (SPA_UNLIKELY(res != -EAGAIN && res != 0))
pw_log_warn("%p: read failed %m", this);
spa_loop_add_source(loop, &this->source);
}
spa_list_for_each(t, &this->rt.target_list, link) {
/* we can now trigger ourself */
if (t->id == this->info.id || this->server_prepare)
activate_target(this, t);
}
SPA_ATOMIC_STORE(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED);
this->rt.trigger_targets = trigger_activate_targets;
SPA_ATOMIC_STORE(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED);
}
spa_list_for_each(t, &this->rt.target_list, link)
activate_target(this, t);
this->rt.prepared = true;
return 0;
}
static void add_node_to_graph(struct pw_impl_node *node)
{
pw_loop_invoke(node->data_loop, do_node_prepare, 1, NULL, 0, true, node);
}
/* called from the node data loop and undoes the changes done in do_node_prepare. */
static int
do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq,
@ -190,37 +178,26 @@ do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq,
if (!this->rt.prepared)
return 0;
/* We mark ourself as finished now, this will avoid going further into the process loop
* in case our fd was ready (removing ourselfs from the loop should avoid that as well).
* If we were supposed to be scheduled make sure we continue the graph for the peers we
* were supposed to trigger */
old_state = SPA_ATOMIC_XCHG(this->rt.target.activation->status, PW_NODE_ACTIVATION_INACTIVE);
if (old_state != PW_NODE_ACTIVATION_FINISHED)
trigger = get_time_ns(this->rt.target.system);
if (!this->remote) {
/* We mark ourself as finished now, this will avoid going further into the process loop
* in case our fd was ready (removing ourselfs from the loop should avoid that as well).
* If we were supposed to be scheduled make sure we continue the graph for the peers we
* were supposed to trigger */
old_state = SPA_ATOMIC_XCHG(this->rt.target.activation->status, PW_NODE_ACTIVATION_INACTIVE);
if (old_state != PW_NODE_ACTIVATION_FINISHED)
trigger = get_time_ns(this->rt.target.system);
spa_loop_remove_source(loop, &this->source);
}
spa_list_for_each(t, &this->rt.target_list, link)
deactivate_target(this, t, trigger);
if (!this->server_prepare)
spa_loop_remove_source(loop, &this->source);
this->rt.prepared = false;
return 0;
}
static void add_node_to_graph(struct pw_impl_node *node)
{
if (node->remote && !node->server_prepare)
return;
pw_loop_invoke(node->data_loop, do_node_prepare, 1, NULL, 0, true, node);
}
static void remove_node_from_graph(struct pw_impl_node *node)
{
if (node->remote && !node->server_prepare)
return;
pw_loop_invoke(node->data_loop, do_node_unprepare, 1, NULL, 0, true, node);
}
@ -806,7 +783,7 @@ do_add_target(struct spa_loop *loop,
spa_list_append(&node->rt.target_list, &t->link);
t->added = true;
if (node->rt.prepared)
node->rt.trigger_targets = trigger_activate_targets;
activate_target(node, t);
}
return 0;
}
@ -1468,7 +1445,7 @@ static inline int process_node(void *data, uint64_t nsec)
* graph because that means we finished the graph. */
if (SPA_LIKELY(!this->driving)) {
if (!this->async && old_status == PW_NODE_ACTIVATION_AWAKE)
this->rt.trigger_targets(this, status, nsec);
trigger_targets(this, status, nsec);
} else {
/* calculate CPU time when finished */
a->signal_time = this->driver_start;
@ -2118,7 +2095,7 @@ retry_status:
spa_node_process_fast(p->mix);
}
/* now signal all the nodes we drive */
node->rt.trigger_targets(node, status, nsec);
trigger_targets(node, status, nsec);
return 0;
}

View file

@ -740,7 +740,6 @@ struct pw_impl_node {
unsigned int sync:1; /**< the sync-groups are active */
unsigned int transport:1; /**< the transport is active */
unsigned int async:1; /**< async processing, one cycle latency */
unsigned int server_prepare:1; /**< prepare links server side for old clients */
uint32_t port_user_data_size; /**< extra size for port user data */
@ -791,7 +790,6 @@ struct pw_impl_node {
struct spa_ratelimit rate_limit;
bool prepared; /**< the node was added to loop */
void (*trigger_targets) (struct pw_impl_node *node, int status, uint64_t nsec);
} rt;
struct spa_fraction target_rate;
uint64_t target_quantum;