mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-09 13:30:06 -05:00
impl-node: add some comments
This commit is contained in:
parent
f996249fff
commit
26e9a4ce13
1 changed files with 70 additions and 30 deletions
|
|
@ -67,6 +67,19 @@ struct resource_data {
|
||||||
|
|
||||||
/** \endcond */
|
/** \endcond */
|
||||||
|
|
||||||
|
/* Called from the node data loop when a node needs to be scheduled by
|
||||||
|
* the given driver. 3 things needs to happen:
|
||||||
|
*
|
||||||
|
* - the node is added to the driver target list and the required state
|
||||||
|
* is incremented. This makes sure the node is woken up when the driver
|
||||||
|
* starts a new cycle.
|
||||||
|
* - the node needs to trigger the driver when it completes. This means
|
||||||
|
* the driver is added to the target list.
|
||||||
|
* - the node targets (including the driver we added above) have their
|
||||||
|
* required state incremented.
|
||||||
|
*
|
||||||
|
* This code is called from the data-loop to ensure synchronization
|
||||||
|
*/
|
||||||
static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
|
static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
|
||||||
{
|
{
|
||||||
struct pw_node_activation_state *dstate, *nstate;
|
struct pw_node_activation_state *dstate, *nstate;
|
||||||
|
|
@ -78,13 +91,7 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
|
||||||
pw_log_trace("%p: add to driver %p %p %p", this, driver,
|
pw_log_trace("%p: add to driver %p %p %p", this, driver,
|
||||||
driver->rt.activation, this->rt.activation);
|
driver->rt.activation, this->rt.activation);
|
||||||
|
|
||||||
/* signal the driver */
|
/* let the driver trigger us as part of the processing cycle */
|
||||||
this->rt.driver_target.activation = driver->rt.activation;
|
|
||||||
this->rt.driver_target.node = driver;
|
|
||||||
this->rt.driver_target.system = driver->data_system;
|
|
||||||
this->rt.driver_target.fd = driver->source.fd;
|
|
||||||
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
|
|
||||||
|
|
||||||
spa_list_append(&driver->rt.target_list, &this->rt.target.link);
|
spa_list_append(&driver->rt.target_list, &this->rt.target.link);
|
||||||
nstate = &this->rt.activation->state[0];
|
nstate = &this->rt.activation->state[0];
|
||||||
if (!this->rt.target.active) {
|
if (!this->rt.target.active) {
|
||||||
|
|
@ -92,6 +99,15 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
|
||||||
this->rt.target.active = true;
|
this->rt.target.active = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* trigger the driver when we complete */
|
||||||
|
this->rt.driver_target.activation = driver->rt.activation;
|
||||||
|
this->rt.driver_target.node = driver;
|
||||||
|
this->rt.driver_target.system = driver->data_system;
|
||||||
|
this->rt.driver_target.fd = driver->source.fd;
|
||||||
|
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
|
||||||
|
|
||||||
|
/* now increment the required states of all this node targets, including
|
||||||
|
* the driver we added above */
|
||||||
spa_list_for_each(t, &this->rt.target_list, link) {
|
spa_list_for_each(t, &this->rt.target_list, link) {
|
||||||
dstate = &t->activation->state[0];
|
dstate = &t->activation->state[0];
|
||||||
if (!t->active) {
|
if (!t->active) {
|
||||||
|
|
@ -104,6 +120,7 @@ static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* called from the data loop and undoes the changes done in add_node. */
|
||||||
static void remove_node(struct pw_impl_node *this)
|
static void remove_node(struct pw_impl_node *this)
|
||||||
{
|
{
|
||||||
struct pw_node_activation_state *dstate, *nstate;
|
struct pw_node_activation_state *dstate, *nstate;
|
||||||
|
|
@ -156,6 +173,7 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq,
|
||||||
pw_log_warn("%p: read failed %m", this);
|
pw_log_warn("%p: read failed %m", this);
|
||||||
|
|
||||||
this->added = true;
|
this->added = true;
|
||||||
|
/* remote nodes have their source added in client-node instead */
|
||||||
if (!this->remote)
|
if (!this->remote)
|
||||||
spa_loop_add_source(loop, &this->source);
|
spa_loop_add_source(loop, &this->source);
|
||||||
add_node(this, driver);
|
add_node(this, driver);
|
||||||
|
|
@ -1097,18 +1115,19 @@ static inline uint64_t get_time_ns(struct spa_system *system)
|
||||||
return SPA_TIMESPEC_TO_NSEC(&ts);
|
return SPA_TIMESPEC_TO_NSEC(&ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void node_signal(struct pw_impl_node *this)
|
static inline void node_trigger(struct pw_impl_node *this)
|
||||||
{
|
{
|
||||||
pw_log_trace_fp("node %p %s", this, this->name);
|
pw_log_trace_fp("node %p %s", this, this->name);
|
||||||
if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->source.fd, 1) < 0))
|
if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->source.fd, 1) < 0))
|
||||||
pw_log_warn("node %p: write failed %m", this);
|
pw_log_warn("node %p: write failed %m", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int resume_node(struct pw_impl_node *this, int status, uint64_t nsec)
|
/* called from data-loop when all the targets of a node need to be triggered */
|
||||||
|
static inline int trigger_targets(struct pw_impl_node *this, int status, uint64_t nsec)
|
||||||
{
|
{
|
||||||
struct pw_node_target *t;
|
struct pw_node_target *t;
|
||||||
|
|
||||||
pw_log_trace_fp("%p: %s trigger peers %"PRIu64, this, this->name, nsec);
|
pw_log_trace_fp("%p: %s trigger targets %"PRIu64, this, this->name, nsec);
|
||||||
|
|
||||||
spa_list_for_each(t, &this->rt.target_list, link) {
|
spa_list_for_each(t, &this->rt.target_list, link) {
|
||||||
struct pw_node_activation *a = t->activation;
|
struct pw_node_activation *a = t->activation;
|
||||||
|
|
@ -1127,20 +1146,11 @@ static inline int resume_node(struct pw_impl_node *this, int status, uint64_t ns
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_activation *a)
|
/* The main processing entry point of a node. This is called from the data-loop and usually
|
||||||
{
|
* as a result of signaling the eventfd of the node.
|
||||||
uint64_t signal_time = a->signal_time;
|
*
|
||||||
uint64_t prev_signal_time = a->prev_signal_time;
|
* This code runs on the client and the server, depending on where the node is.
|
||||||
if (SPA_LIKELY(signal_time > prev_signal_time)) {
|
*/
|
||||||
uint64_t process_time = a->finish_time - a->signal_time;
|
|
||||||
uint64_t period_time = signal_time - prev_signal_time;
|
|
||||||
float load = (float) process_time / (float) period_time;
|
|
||||||
a->cpu_load[0] = (a->cpu_load[0] + load) / 2.0f;
|
|
||||||
a->cpu_load[1] = (a->cpu_load[1] * 7.0f + load) / 8.0f;
|
|
||||||
a->cpu_load[2] = (a->cpu_load[2] * 31.0f + load) / 32.0f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline int process_node(void *data)
|
static inline int process_node(void *data)
|
||||||
{
|
{
|
||||||
struct pw_impl_node *this = data;
|
struct pw_impl_node *this = data;
|
||||||
|
|
@ -1161,11 +1171,14 @@ static inline int process_node(void *data)
|
||||||
a->pending_sync = false;
|
a->pending_sync = false;
|
||||||
|
|
||||||
if (SPA_LIKELY(this->added)) {
|
if (SPA_LIKELY(this->added)) {
|
||||||
|
/* process input mixers */
|
||||||
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
|
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
|
||||||
spa_node_process_fast(p->mix);
|
spa_node_process_fast(p->mix);
|
||||||
|
|
||||||
|
/* process the actual node */
|
||||||
status = spa_node_process_fast(this->node);
|
status = spa_node_process_fast(this->node);
|
||||||
|
|
||||||
|
/* process output tee */
|
||||||
if (status & SPA_STATUS_HAVE_DATA) {
|
if (status & SPA_STATUS_HAVE_DATA) {
|
||||||
spa_list_for_each(p, &this->rt.output_mix, rt.node_link)
|
spa_list_for_each(p, &this->rt.output_mix, rt.node_link)
|
||||||
spa_node_process_fast(p->mix);
|
spa_node_process_fast(p->mix);
|
||||||
|
|
@ -1173,7 +1186,7 @@ static inline int process_node(void *data)
|
||||||
} else {
|
} else {
|
||||||
/* This can happen when we deactivated the node but some links are
|
/* This can happen when we deactivated the node but some links are
|
||||||
* still not shut down. We simply don't schedule the node and make
|
* still not shut down. We simply don't schedule the node and make
|
||||||
* sure we trigger the peers in resume_node below. */
|
* sure we trigger the peers in trigger_targets below. */
|
||||||
pw_log_debug("%p: scheduling non-active node %s", this, this->name);
|
pw_log_debug("%p: scheduling non-active node %s", this, this->name);
|
||||||
status = SPA_STATUS_HAVE_DATA;
|
status = SPA_STATUS_HAVE_DATA;
|
||||||
}
|
}
|
||||||
|
|
@ -1185,8 +1198,12 @@ static inline int process_node(void *data)
|
||||||
a->status = PW_NODE_ACTIVATION_FINISHED;
|
a->status = PW_NODE_ACTIVATION_FINISHED;
|
||||||
a->finish_time = nsec;
|
a->finish_time = nsec;
|
||||||
|
|
||||||
|
/* we don't need to trigger targets when the node was driving the
|
||||||
|
* graph because that means we finished the graph. Also don't schedule
|
||||||
|
* peers when the node returns OK, because that means the resume will
|
||||||
|
* happen asynchronously later (unimplemented though). */
|
||||||
if (SPA_LIKELY(!this->driving && status != SPA_STATUS_OK))
|
if (SPA_LIKELY(!this->driving && status != SPA_STATUS_OK))
|
||||||
resume_node(this, status, nsec);
|
trigger_targets(this, status, nsec);
|
||||||
|
|
||||||
if (SPA_UNLIKELY(status & SPA_STATUS_DRAINED))
|
if (SPA_UNLIKELY(status & SPA_STATUS_DRAINED))
|
||||||
pw_context_driver_emit_drained(this->context, this);
|
pw_context_driver_emit_drained(this->context, this);
|
||||||
|
|
@ -1203,7 +1220,7 @@ int pw_impl_node_trigger(struct pw_impl_node *node)
|
||||||
uint64_t nsec = get_time_ns(node->data_system);
|
uint64_t nsec = get_time_ns(node->data_system);
|
||||||
a->status = PW_NODE_ACTIVATION_TRIGGERED;
|
a->status = PW_NODE_ACTIVATION_TRIGGERED;
|
||||||
a->signal_time = nsec;
|
a->signal_time = nsec;
|
||||||
node_signal(node);
|
node_trigger(node);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -1298,6 +1315,7 @@ struct pw_impl_node *pw_context_create_node(struct pw_context *context,
|
||||||
|
|
||||||
this->properties = properties;
|
this->properties = properties;
|
||||||
|
|
||||||
|
/* the eventfd used to signal the node */
|
||||||
if ((res = spa_system_eventfd_create(this->data_system,
|
if ((res = spa_system_eventfd_create(this->data_system,
|
||||||
SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
|
SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
|
||||||
goto error_clean;
|
goto error_clean;
|
||||||
|
|
@ -1584,6 +1602,20 @@ static const struct spa_node_events node_events = {
|
||||||
.event = node_event,
|
.event = node_event,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_activation *a)
|
||||||
|
{
|
||||||
|
uint64_t signal_time = a->signal_time;
|
||||||
|
uint64_t prev_signal_time = a->prev_signal_time;
|
||||||
|
if (SPA_LIKELY(signal_time > prev_signal_time)) {
|
||||||
|
uint64_t process_time = a->finish_time - a->signal_time;
|
||||||
|
uint64_t period_time = signal_time - prev_signal_time;
|
||||||
|
float load = (float) process_time / (float) period_time;
|
||||||
|
a->cpu_load[0] = (a->cpu_load[0] + load) / 2.0f;
|
||||||
|
a->cpu_load[1] = (a->cpu_load[1] * 7.0f + load) / 8.0f;
|
||||||
|
a->cpu_load[2] = (a->cpu_load[2] * 31.0f + load) / 32.0f;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define SYNC_CHECK 0
|
#define SYNC_CHECK 0
|
||||||
#define SYNC_START 1
|
#define SYNC_START 1
|
||||||
#define SYNC_STOP 2
|
#define SYNC_STOP 2
|
||||||
|
|
@ -1672,6 +1704,9 @@ static inline void update_position(struct pw_impl_node *node, int all_ready)
|
||||||
a->position.offset += a->position.clock.duration;
|
a->position.offset += a->position.clock.duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Called from the data-loop and it is the starting point for driver nodes.
|
||||||
|
* Most of the logic here is to check for reposition updates and transport changes.
|
||||||
|
*/
|
||||||
static int node_ready(void *data, int status)
|
static int node_ready(void *data, int status)
|
||||||
{
|
{
|
||||||
struct pw_impl_node *node = data, *reposition_node = NULL;
|
struct pw_impl_node *node = data, *reposition_node = NULL;
|
||||||
|
|
@ -1712,7 +1747,7 @@ static int node_ready(void *data, int status)
|
||||||
state->pending, state->required);
|
state->pending, state->required);
|
||||||
dump_states(node);
|
dump_states(node);
|
||||||
}
|
}
|
||||||
node_signal(node);
|
node_trigger(node);
|
||||||
} else {
|
} else {
|
||||||
uint64_t signal_time = a->signal_time;
|
uint64_t signal_time = a->signal_time;
|
||||||
/* old nodes set the TRIGGERED status on node_ready, patch this
|
/* old nodes set the TRIGGERED status on node_ready, patch this
|
||||||
|
|
@ -1788,6 +1823,8 @@ again:
|
||||||
}
|
}
|
||||||
|
|
||||||
a->status = PW_NODE_ACTIVATION_TRIGGERED;
|
a->status = PW_NODE_ACTIVATION_TRIGGERED;
|
||||||
|
/* remote nodes set the signal_time before writing the ready
|
||||||
|
* eventfd */
|
||||||
if (!node->remote)
|
if (!node->remote)
|
||||||
a->signal_time = nsec;
|
a->signal_time = nsec;
|
||||||
impl->prev_signal_time = a->prev_signal_time;
|
impl->prev_signal_time = a->prev_signal_time;
|
||||||
|
|
@ -1807,6 +1844,8 @@ again:
|
||||||
|
|
||||||
pw_context_driver_emit_start(node->context, node);
|
pw_context_driver_emit_start(node->context, node);
|
||||||
}
|
}
|
||||||
|
/* this should not happen, driver nodes that are not currently driving
|
||||||
|
* should not emit the ready callback */
|
||||||
if (SPA_UNLIKELY(node->driver && !node->driving))
|
if (SPA_UNLIKELY(node->driver && !node->driving))
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
@ -1818,11 +1857,12 @@ again:
|
||||||
}
|
}
|
||||||
if (!node->remote && (status & SPA_STATUS_HAVE_DATA)) {
|
if (!node->remote && (status & SPA_STATUS_HAVE_DATA)) {
|
||||||
/* remote nodes have done the output mix already before
|
/* remote nodes have done the output mix already before
|
||||||
* they triggered the ready event */
|
* they wrote the ready eventfd */
|
||||||
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
|
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
|
||||||
spa_node_process_fast(p->mix);
|
spa_node_process_fast(p->mix);
|
||||||
}
|
}
|
||||||
return resume_node(node, status, nsec);
|
/* now signal all the nodes we drive */
|
||||||
|
return trigger_targets(node, status, nsec);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
|
static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue