client-node: keep target fd up to date

Keep the target and source fd in sync when we get the new fd from the
server in remote-node.

This makes it possible to refactor some things and only schedule nodes
by triggering the target.
This commit is contained in:
Wim Taymans 2023-11-28 16:20:39 +01:00
parent be17edbcfd
commit b9e5dde4ac
2 changed files with 21 additions and 29 deletions

View file

@ -216,8 +216,8 @@ static int client_node_transport(void *_data,
proxy, readfd, writefd, data->remote_id, data->activation->ptr);
data->rtwritefd = writefd;
spa_system_close(data->data_system, node->source.fd);
node->source.fd = readfd;
spa_system_close(node->data_system, node->source.fd);
node->rt.target.fd = node->source.fd = readfd;
data->have_transport = true;

View file

@ -1150,11 +1150,22 @@ static inline uint64_t get_time_ns(struct spa_system *system)
return SPA_TIMESPEC_TO_NSEC(&ts);
}
static inline void node_trigger(struct pw_impl_node *this)
/* called from data-loop decrement the dependency counter of the target and when
* there are no more dependencies, trigger the node. */
static inline void trigger_target(struct pw_node_target *t, uint64_t nsec)
{
pw_log_trace_fp("node %p %s", this, this->name);
if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->source.fd, 1) < 0))
pw_log_warn("node %p: write failed %m", this);
struct pw_node_activation *a = t->activation;
struct pw_node_activation_state *state = &a->state[0];
pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node,
t->name, t->id, state, state->pending, state->required);
if (pw_node_activation_state_dec(state)) {
a->status = PW_NODE_ACTIVATION_TRIGGERED;
a->signal_time = nsec;
if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0))
pw_log_warn("%p: write failed %m", t->node);
}
}
/* called from data-loop when all the targets of a node need to be triggered */
@ -1164,20 +1175,9 @@ static inline int trigger_targets(struct pw_impl_node *this, int status, uint64_
pw_log_trace_fp("%p: %s trigger targets %"PRIu64, this, this->name, nsec);
spa_list_for_each(t, &this->rt.target_list, link) {
struct pw_node_activation *a = t->activation;
struct pw_node_activation_state *state = &a->state[0];
spa_list_for_each(t, &this->rt.target_list, link)
trigger_target(t, nsec);
pw_log_trace_fp("%p: (%s-%u) state:%p pending:%d/%d", t->node,
t->name, t->id, state, state->pending, state->required);
if (pw_node_activation_state_dec(state)) {
a->status = PW_NODE_ACTIVATION_TRIGGERED;
a->signal_time = nsec;
if (SPA_UNLIKELY(spa_system_eventfd_write(t->system, t->fd, 1) < 0))
pw_log_warn("node %p: write failed %m", this);
}
}
return 0;
}
@ -1263,7 +1263,6 @@ static inline int process_node(void *data)
a->signal_time = this->driver_start;
calculate_stats(this, a);
pw_impl_node_rt_emit_complete(this);
// pw_context_driver_emit_complete(this->context, this);
}
if (SPA_UNLIKELY(status & SPA_STATUS_DRAINED))
@ -1274,15 +1273,8 @@ static inline int process_node(void *data)
int pw_impl_node_trigger(struct pw_impl_node *node)
{
struct pw_node_activation *a = node->rt.target.activation;
struct pw_node_activation_state *state = &a->state[0];
if (pw_node_activation_state_dec(state)) {
uint64_t nsec = get_time_ns(node->data_system);
a->status = PW_NODE_ACTIVATION_TRIGGERED;
a->signal_time = nsec;
node_trigger(node);
}
uint64_t nsec = get_time_ns(node->data_system);
trigger_target(&node->rt.target, nsec);
return 0;
}