diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index 018a32fd1..68763384e 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -278,6 +278,7 @@ struct link { struct pw_memmap *mem; struct pw_node_activation *activation; int signalfd; + void (*trigger) (struct link *l, uint64_t nsec); }; struct context { @@ -1884,7 +1885,7 @@ static inline uint32_t cycle_wait(struct client *c) return nframes; } -static void trigger_link(struct link *l, uint64_t nsec) +static void trigger_link_v1(struct link *l, uint64_t nsec) { struct client *c = l->client; struct pw_node_activation *a = l->activation; @@ -1896,8 +1897,8 @@ static void trigger_link(struct link *l, uint64_t nsec) if (pw_node_activation_state_dec(state)) { if (SPA_ATOMIC_CAS(a->status, - PW_NODE_ACTIVATION_NOT_TRIGGERED, - PW_NODE_ACTIVATION_TRIGGERED)) { + PW_NODE_ACTIVATION_NOT_TRIGGERED, + PW_NODE_ACTIVATION_TRIGGERED)) { a->signal_time = nsec; pw_log_trace_fp("%p: signal %p %p", c, l, state); @@ -1908,8 +1909,31 @@ static void trigger_link(struct link *l, uint64_t nsec) } } +static void trigger_link_v0(struct link *l, uint64_t nsec) +{ + struct client *c = l->client; + struct pw_node_activation *a = l->activation; + struct pw_node_activation_state *state = &a->state[0]; + uint64_t cmd = 1; + + pw_log_trace_fp("%p: link %p-%d %p %d/%d", c, l, l->node_id, state, + state->pending, state->required); + + if (pw_node_activation_state_dec(state)) { + SPA_ATOMIC_STORE(a->status, PW_NODE_ACTIVATION_TRIGGERED); + a->signal_time = nsec; + + pw_log_trace_fp("%p: signal %p %p", c, l, state); + + if (SPA_UNLIKELY(write(l->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd))) + pw_log_warn("%p: write failed %m", c); + } +} + static inline void deactivate_link(struct client *c, struct link *l, uint64_t trigger) { + if (!c->async && trigger != 0) + l->trigger(l, trigger); } static inline void signal_sync(struct client *c) @@ -1928,12 +1952,8 @@ static inline void signal_sync(struct client *c) if (c->async || old_status != PW_NODE_ACTIVATION_AWAKE) return; - spa_list_for_each(l, &c->rt.target_links, target_link) { - if (SPA_UNLIKELY(l->activation == NULL)) - continue; - - trigger_link(l, nsec); - } + spa_list_for_each(l, &c->rt.target_links, target_link) + l->trigger(l, nsec); } static inline void cycle_signal(struct client *c, int status) @@ -2049,6 +2069,8 @@ static int client_node_transport(void *data, pw_log_debug("%p: create client transport with fds %d %d for node %u", c, readfd, writefd, c->node_id); + c->activation->client_version = PW_VERSION_NODE_ACTIVATION; + close(writefd); c->socket_source = pw_loop_add_io(c->l, readfd, @@ -2271,7 +2293,7 @@ static int do_unprepare_client(struct spa_loop *loop, bool async, uint32_t seq, spa_list_for_each(l, &c->rt.target_links, target_link) { if (!c->async && trigger != 0) - trigger_link(l, trigger); + l->trigger(l, trigger); } pw_loop_update_io(c->l, @@ -3060,6 +3082,7 @@ static int client_node_set_activation(void *data, link->mem = mm; link->activation = ptr; link->signalfd = signalfd; + link->trigger = link->activation->server_version < 1 ? trigger_link_v0 : trigger_link_v1; spa_list_append(&c->links, &link->link); pw_data_loop_invoke(c->loop, @@ -6958,6 +6981,14 @@ int jack_transport_reposition (jack_client_t *client, return 0; } +static void update_command(struct client *c, uint32_t command) +{ + struct pw_node_activation *a = c->rt.driver_activation; + if (!a) + return; + SPA_ATOMIC_STORE(a->command, command); +} + static int transport_update(struct client* c, int active) { pw_log_info("%p: transport %d", c, active); @@ -6984,7 +7015,10 @@ void jack_transport_start (jack_client_t *client) { struct client *c = (struct client *) client; return_if_fail(c != NULL); - transport_update(c, true); + if (c->activation->server_version < 1) + update_command(c, PW_NODE_ACTIVATION_COMMAND_START); + else + transport_update(c, true); } SPA_EXPORT @@ -6992,7 +7026,10 @@ void jack_transport_stop (jack_client_t *client) { struct client *c = (struct client *) client; return_if_fail(c != NULL); - transport_update(c, false); + if (c->activation->server_version < 1) + update_command(c, PW_NODE_ACTIVATION_COMMAND_STOP); + else + transport_update(c, false); } SPA_EXPORT diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 1b55e05d9..4fd1a379f 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -1799,12 +1799,10 @@ struct pw_impl_client_node *pw_impl_client_node_new(struct pw_resource *resource this->node->remote = true; this->flags = 0; - if (resource->version < 5) { - pw_log_warn("using server side driver for old client version %d", resource->version); - } - if (resource->version < 6) { - pw_log_warn("using server side status for old client version %d", resource->version); - this->node->server_status = true; + if (resource->version < PW_VERSION_CLIENT_NODE) { + pw_log_warn("detected old client version %d", resource->version); + if (resource->version < 6) + this->node->rt.target.activation->client_version = 0; } pw_resource_add_listener(this->resource, diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index a76307caa..8b5c7daf3 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -210,6 +210,8 @@ static int client_node_transport(void *_data, spa_system_close(node->rt.target.system, node->source.fd); node->rt.target.fd = node->source.fd = readfd; + node->rt.target.activation->client_version = PW_VERSION_NODE_ACTIVATION; + data->have_transport = true; if (node->active) @@ -839,6 +841,8 @@ client_node_set_activation(void *_data, link->target.activation = ptr; link->target.system = data->data_system; link->target.fd = signalfd; + link->target.trigger = link->target.activation->server_version < 1 ? + trigger_target_v0 : trigger_target_v1; spa_list_append(&data->links, &link->link); pw_impl_node_add_target(node, &link->target); diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index aea32b388..f5fb170b9 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -94,7 +94,7 @@ static inline void deactivate_target(struct pw_impl_node *node, struct pw_node_t * required state. This way we either resume with the old value * or we don't when the driver has not yet copied */ if (trigger != 0) - trigger_target(t, trigger); + t->trigger(t, trigger); if (!node->exported) SPA_ATOMIC_DEC(state->required); } @@ -113,7 +113,7 @@ static inline void trigger_targets(struct pw_impl_node *node, int status, uint64 node, node->name, node->info.id, nsec); spa_list_for_each(ta, &node->rt.target_list, link) - trigger_target(ta, nsec); + ta->trigger(ta, nsec); } /** \endcond */ @@ -148,7 +148,7 @@ do_node_prepare(struct spa_loop *loop, bool async, uint32_t seq, spa_loop_add_source(loop, &this->source); } - if (!this->remote || this->server_status) + if (!this->remote || this->rt.target.activation->client_version < 1) SPA_ATOMIC_STORE(this->rt.target.activation->status, PW_NODE_ACTIVATION_FINISHED); spa_list_for_each(t, &this->rt.target_list, link) @@ -179,7 +179,7 @@ do_node_unprepare(struct spa_loop *loop, bool async, uint32_t seq, if (!this->rt.prepared) return 0; - if (!this->remote || this->server_status) { + if (!this->remote || this->rt.target.activation->client_version < 1) { /* We mark ourself as finished now, this will avoid going further into the process loop * in case our fd was ready (removing ourselfs from the loop should avoid that as well). * If we were supposed to be scheduled make sure we continue the graph for the peers we @@ -1312,9 +1312,6 @@ static inline void debug_xrun_graph(struct pw_impl_node *driver, uint64_t nsec) struct pw_node_activation *a = t->activation; struct pw_node_activation_state *state = &a->state[0]; - if (t->id == driver->info.id) - continue; - if (a->status == PW_NODE_ACTIVATION_TRIGGERED || a->status == PW_NODE_ACTIVATION_AWAKE) { pw_log(level, "(%s-%u) xrun state:%p pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64 @@ -1443,7 +1440,7 @@ static inline int process_node(void *data, uint64_t nsec) /* we don't need to trigger targets when the node was driving the * graph because that means we finished the graph. */ if (SPA_LIKELY(!this->driving)) { - if (!this->async && old_status == PW_NODE_ACTIVATION_AWAKE) + if ((!this->async || a->server_version < 1) && old_status == PW_NODE_ACTIVATION_AWAKE) trigger_targets(this, status, nsec); } else { /* calculate CPU time when finished */ @@ -1461,7 +1458,8 @@ static inline int process_node(void *data, uint64_t nsec) int pw_impl_node_trigger(struct pw_impl_node *node) { uint64_t nsec = get_time_ns(node->rt.target.system); - trigger_target(&node->rt.target, nsec); + struct pw_node_target *t = &node->rt.target; + t->trigger(t, nsec); return 0; } @@ -1619,11 +1617,14 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context, this->rt.target.node = this; this->rt.target.system = this->data_loop->system; this->rt.target.fd = this->source.fd; + this->rt.target.trigger = trigger_target_v1; reset_position(this, &this->rt.target.activation->position); this->rt.target.activation->sync_timeout = DEFAULT_SYNC_TIMEOUT; this->rt.target.activation->sync_left = 0; this->rt.target.activation->status = PW_NODE_ACTIVATION_INACTIVE; + this->rt.target.activation->server_version = PW_VERSION_NODE_ACTIVATION; + this->rt.target.activation->client_version = PW_VERSION_NODE_ACTIVATION; this->rt.rate_limit.interval = 2 * SPA_NSEC_PER_SEC; this->rt.rate_limit.burst = 1; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 7517d4e73..8c4903907 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -539,6 +539,7 @@ struct pw_node_target { struct pw_node_activation *activation; struct spa_system *system; int fd; + void (*trigger)(struct pw_node_target *t, uint64_t nsec); unsigned int active:1; unsigned int added:1; }; @@ -551,8 +552,17 @@ static inline void copy_target(struct pw_node_target *dst, const struct pw_node_ dst->activation = src->activation; dst->system = src->system; dst->fd = src->fd; + dst->trigger = src->trigger; } +/* versions: + * 0 baseline + * 1 the activation status needs to be CAS + * async nodes, driver resumes async nodes + * transport with sync.group properties instead of client command + */ +#define PW_VERSION_NODE_ACTIVATION 1 + /* nodes start as INACTIVE, when they are ready to be scheduled, they add their * fd to the loop and change status to FINISHED. When the node shuts down, the * status is set back to INACTIVE. @@ -596,7 +606,11 @@ struct pw_node_activation { uint32_t segment_owner[16]; /* id of owners for each segment info struct. * nodes that want to update segment info need to * CAS their node id in this array. */ - uint32_t padding[14]; + uint32_t padding[11]; + uint32_t client_version; + uint32_t server_version; + + uint32_t active_driver_id; uint32_t driver_id; /* the current node driver id */ #define PW_NODE_ACTIVATION_FLAG_NONE 0 #define PW_NODE_ACTIVATION_FLAG_PROFILER (1<<0) /* the profiler is running */ @@ -632,22 +646,9 @@ static inline uint64_t get_time_ns(struct spa_system *system) return SPA_TIMESPEC_TO_NSEC(&ts); } -static inline void wake_target(struct pw_node_target *t, uint64_t nsec) -{ - struct pw_node_activation *a = t->activation; - - if (SPA_ATOMIC_CAS(a->status, - PW_NODE_ACTIVATION_NOT_TRIGGERED, - PW_NODE_ACTIVATION_TRIGGERED)) { - a->signal_time = nsec; - if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0)) - pw_log_warn("%p: write failed %m", t->node); - } -} - /* called from data-loop decrement the dependency counter of the target and when * there are no more dependencies, trigger the node. */ -static inline void trigger_target(struct pw_node_target *t, uint64_t nsec) +static inline void trigger_target_v1(struct pw_node_target *t, uint64_t nsec) { struct pw_node_activation *a = t->activation; struct pw_node_activation_state *state = &a->state[0]; @@ -655,8 +656,31 @@ static inline void trigger_target(struct pw_node_target *t, uint64_t nsec) pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node, t->name, t->id, state, state->pending, state->required); - if (pw_node_activation_state_dec(state)) - wake_target(t, nsec); + if (pw_node_activation_state_dec(state)) { + if (SPA_ATOMIC_CAS(a->status, + PW_NODE_ACTIVATION_NOT_TRIGGERED, + PW_NODE_ACTIVATION_TRIGGERED)) { + a->signal_time = nsec; + if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0)) + pw_log_warn("%p: write failed %m", t->node); + } + } +} + +static inline void trigger_target_v0(struct pw_node_target *t, uint64_t nsec) +{ + struct pw_node_activation *a = t->activation; + struct pw_node_activation_state *state = &a->state[0]; + + pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node, + t->name, t->id, state, state->pending, state->required); + + if (pw_node_activation_state_dec(state)) { + SPA_ATOMIC_STORE(a->status, PW_NODE_ACTIVATION_TRIGGERED); + a->signal_time = nsec; + if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0)) + pw_log_warn("%p: write failed %m", t->node); + } } struct pw_node_peer { @@ -748,7 +772,6 @@ struct pw_impl_node { unsigned int sync:1; /**< the sync-groups are active */ unsigned int transport:1; /**< the transport is active */ unsigned int async:1; /**< async processing, one cycle latency */ - unsigned int server_status:1; /**< old client, do status on server */ uint32_t port_user_data_size; /**< extra size for port user data */