mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
node: improve sync
Place the requested sync and position update flag in the node activation. This way we can use our existing loop to update the node sync states and check if the node is ready. Implement sync timeout, when the client can't start or seek within the timeout, we start RUNNING anyway and hope the client catches up.
This commit is contained in:
parent
5363d3352c
commit
35c5cf9b52
3 changed files with 88 additions and 35 deletions
|
|
@ -1 +1 @@
|
|||
Subproject commit 0c2c2c72d04335a7013159ca8c7e284f4676e5df
|
||||
Subproject commit b4243991b3599237953611cd27ab5c3a62a426bd
|
||||
|
|
@ -746,7 +746,7 @@ static void dump_states(struct pw_node *driver)
|
|||
if (t->node == NULL)
|
||||
continue;
|
||||
pw_log_warn(NAME" %p (%s): pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64
|
||||
" waiting:%"PRIu64" process:%"PRIu64" status:%d",
|
||||
" waiting:%"PRIu64" process:%"PRIu64" status:%d sync:%d",
|
||||
t->node, t->node->name,
|
||||
a->state[0].pending, a->state[0].required,
|
||||
a->signal_time,
|
||||
|
|
@ -754,7 +754,7 @@ static void dump_states(struct pw_node *driver)
|
|||
a->finish_time,
|
||||
a->awake_time - a->signal_time,
|
||||
a->finish_time - a->awake_time,
|
||||
t->activation->status);
|
||||
t->activation->status, t->activation->pending_sync);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -817,6 +817,10 @@ static inline int process_node(void *data)
|
|||
|
||||
pw_log_trace_fp(NAME" %p: process %"PRIu64, this, a->awake_time);
|
||||
|
||||
/* not implemented yet, just clear the flags */
|
||||
a->pending_sync = false;
|
||||
a->pending_new_pos = false;
|
||||
|
||||
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
|
||||
spa_node_process(p->mix);
|
||||
|
||||
|
|
@ -985,6 +989,8 @@ struct pw_node *pw_node_new(struct pw_core *core,
|
|||
this->rt.driver_target.signal = process_node;
|
||||
|
||||
reset_position(&this->rt.activation->position);
|
||||
this->rt.activation->sync_timeout = 5 * SPA_NSEC_PER_SEC;
|
||||
this->rt.activation->sync_left = 0;
|
||||
|
||||
check_properties(this);
|
||||
|
||||
|
|
@ -1182,19 +1188,23 @@ static const struct spa_node_events node_events = {
|
|||
.event = node_event,
|
||||
};
|
||||
|
||||
static void update_position(struct pw_node *node)
|
||||
#define SYNC_CHECK 0
|
||||
#define SYNC_START 1
|
||||
#define SYNC_STOP 2
|
||||
|
||||
static int check_updates(struct pw_node *node)
|
||||
{
|
||||
int res = SYNC_CHECK;
|
||||
struct pw_node_activation *a = node->rt.activation;
|
||||
struct spa_io_segment segment, *seg;
|
||||
uint32_t seq1, seq2, change_mask;
|
||||
enum spa_io_position_state state;
|
||||
uint32_t seq1, seq2, change_mask, command;
|
||||
|
||||
if (a->position.offset == INT64_MIN)
|
||||
a->position.offset = a->position.clock.position;
|
||||
|
||||
seq1 = SEQ_READ(a->pending.seq);
|
||||
change_mask = a->pending.change_mask;
|
||||
state = a->pending.state;
|
||||
command = a->pending.command;
|
||||
segment = a->pending.segment;
|
||||
seq2 = SEQ_READ(a->pending.seq);
|
||||
|
||||
|
|
@ -1208,7 +1218,7 @@ static void update_position(struct pw_node *node)
|
|||
seg = &a->position.segments[0];
|
||||
|
||||
if (change_mask & PW_NODE_ACTIVATION_UPDATE_SEGMENT) {
|
||||
pw_log_debug("update segment");
|
||||
pw_log_trace(NAME" %p: update segment %08x", node, segment.valid);
|
||||
if (segment.valid & SPA_IO_SEGMENT_VALID_BAR) {
|
||||
seg->bar = segment.bar;
|
||||
seg->valid |= SPA_IO_SEGMENT_VALID_BAR;
|
||||
|
|
@ -1218,8 +1228,24 @@ static void update_position(struct pw_node *node)
|
|||
seg->valid |= SPA_IO_SEGMENT_VALID_BAR;
|
||||
}
|
||||
}
|
||||
if (change_mask & PW_NODE_ACTIVATION_UPDATE_COMMAND) {
|
||||
pw_log_debug(NAME" %p: update command:%u", node, command);
|
||||
switch (command) {
|
||||
case PW_NODE_ACTIVATION_COMMAND_STOP:
|
||||
a->position.state = SPA_IO_POSITION_STATE_STOPPED;
|
||||
res = SYNC_STOP;
|
||||
break;
|
||||
case PW_NODE_ACTIVATION_COMMAND_START:
|
||||
a->position.state = SPA_IO_POSITION_STATE_STARTING;
|
||||
a->sync_left = a->sync_timeout /
|
||||
((a->position.clock.duration * SPA_NSEC_PER_SEC) /
|
||||
a->position.clock.rate.denom);
|
||||
res = SYNC_START;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (change_mask & PW_NODE_ACTIVATION_UPDATE_REPOSITION) {
|
||||
pw_log_debug("update position:%lu", segment.position);
|
||||
pw_log_debug(NAME" %p: update position:%lu", node, segment.position);
|
||||
seg->flags = segment.flags;
|
||||
seg->start = segment.start;
|
||||
seg->duration = segment.duration;
|
||||
|
|
@ -1228,16 +1254,30 @@ static void update_position(struct pw_node *node)
|
|||
if (seg->start == 0)
|
||||
seg->start = a->position.clock.position - a->position.offset;
|
||||
|
||||
a->position.state = SPA_IO_POSITION_STATE_STARTING;
|
||||
ATOMIC_STORE(a->sync_pending, a->sync_total);
|
||||
ATOMIC_INC(a->sync_version);
|
||||
}
|
||||
if (change_mask & PW_NODE_ACTIVATION_UPDATE_STATE) {
|
||||
a->position.state = state;
|
||||
switch (a->position.state) {
|
||||
case SPA_IO_POSITION_STATE_RUNNING:
|
||||
a->position.state = SPA_IO_POSITION_STATE_STARTING;
|
||||
a->sync_left = a->sync_timeout /
|
||||
((a->position.clock.duration * SPA_NSEC_PER_SEC) /
|
||||
a->position.clock.rate.denom);
|
||||
break;
|
||||
}
|
||||
res = SYNC_START;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static void update_position(struct pw_node *node, int all_ready)
|
||||
{
|
||||
struct pw_node_activation *a = node->rt.activation;
|
||||
|
||||
if (a->position.state == SPA_IO_POSITION_STATE_STARTING) {
|
||||
if (ATOMIC_LOAD(a->sync_pending) == 0)
|
||||
if (!all_ready && --a->sync_left == 0) {
|
||||
pw_log_warn(NAME" %p: sync timeout, going to RUNNING", node);
|
||||
dump_states(node);
|
||||
all_ready = true;
|
||||
}
|
||||
if (all_ready)
|
||||
a->position.state = SPA_IO_POSITION_STATE_RUNNING;
|
||||
}
|
||||
if (a->position.state != SPA_IO_POSITION_STATE_RUNNING)
|
||||
|
|
@ -1256,19 +1296,32 @@ static int node_ready(void *data, int status)
|
|||
|
||||
if (node == driver) {
|
||||
struct pw_node_activation *a = node->rt.activation;
|
||||
int sync_type, all_ready, update_sync, target_sync;
|
||||
|
||||
if (a->state[0].pending != 0) {
|
||||
pw_log_warn(NAME" %p: graph not finished", node);
|
||||
dump_states(node);
|
||||
node->rt.target.signal(node->rt.target.data);
|
||||
}
|
||||
|
||||
sync_type = check_updates(node);
|
||||
all_ready = sync_type == SYNC_CHECK;
|
||||
update_sync = !all_ready;
|
||||
target_sync = sync_type == SYNC_START ? true : false;
|
||||
|
||||
spa_list_for_each(t, &driver->rt.target_list, link) {
|
||||
pw_node_activation_state_reset(&t->activation->state[0]);
|
||||
t->activation->status = PW_NODE_ACTIVATION_NOT_TRIGGERED;
|
||||
if (update_sync) {
|
||||
t->activation->pending_sync = target_sync;
|
||||
t->activation->pending_new_pos = target_sync;
|
||||
} else {
|
||||
all_ready &= t->activation->pending_sync == false;
|
||||
}
|
||||
}
|
||||
a->prev_signal_time = a->signal_time;
|
||||
|
||||
update_position(node);
|
||||
update_position(node, all_ready);
|
||||
}
|
||||
if (node->driver && !node->master)
|
||||
return 0;
|
||||
|
|
|
|||
|
|
@ -353,48 +353,48 @@ struct pw_node_activation {
|
|||
#define PW_NODE_ACTIVATION_TRIGGERED 1
|
||||
#define PW_NODE_ACTIVATION_AWAKE 2
|
||||
#define PW_NODE_ACTIVATION_FINISHED 3
|
||||
int status;
|
||||
uint32_t status;
|
||||
|
||||
unsigned int version:1;
|
||||
unsigned int pending_sync:1; /* a sync is pending */
|
||||
unsigned int pending_new_pos:1; /* a new position is pending */
|
||||
|
||||
struct pw_node_activation_state state[2]; /* one current state and one next state,
|
||||
* as version flag */
|
||||
uint64_t signal_time;
|
||||
uint64_t awake_time;
|
||||
uint64_t finish_time;
|
||||
uint64_t prev_signal_time;
|
||||
|
||||
/* for drivers */
|
||||
struct spa_io_position position;
|
||||
uint32_t segment_master[32]; /* unique id (client id usually) of client
|
||||
* that will update extra segment info, There
|
||||
* can be one master for each segment
|
||||
uint32_t segment_master[32]; /* node id that will update extra segment info,
|
||||
* There can be one master for each segment
|
||||
* bitfield. 0 means no master for the
|
||||
* given segment info */
|
||||
|
||||
uint64_t sync_timeout;
|
||||
uint32_t sync_version; /* version updates whenever new sync is
|
||||
* required */
|
||||
uint32_t sync_total; /* number of clients that have want to
|
||||
* explicitly signal when they are ready to
|
||||
* process the pending segment */
|
||||
uint32_t sync_pending; /* pending number of clients preparing for
|
||||
* the current segment */
|
||||
uint64_t sync_timeout; /* sync timeout in nanoseconds
|
||||
* position goes to RUNNING without waiting any
|
||||
* longer for sync clients. */
|
||||
uint64_t sync_left; /* number of cycles before timeout */
|
||||
|
||||
uint32_t version;
|
||||
struct pw_node_activation_state state[2]; /* one current state and one next state,
|
||||
* as low bit of version */
|
||||
float cpu_load[3]; /* averaged over short, medium, long time */
|
||||
uint32_t xrun_count; /* number of xruns */
|
||||
uint64_t xrun_time; /* time of last xrun in microseconds */
|
||||
uint64_t xrun_delay; /* delay of last xrun in microseconds */
|
||||
uint64_t max_delay; /* max of all xruns in microseconds */
|
||||
|
||||
|
||||
struct {
|
||||
uint32_t seq;
|
||||
#define PW_NODE_ACTIVATION_UPDATE_STATE (1<<0)
|
||||
#define PW_NODE_ACTIVATION_UPDATE_COMMAND (1<<0)
|
||||
#define PW_NODE_ACTIVATION_UPDATE_SEGMENT (1<<1)
|
||||
#define PW_NODE_ACTIVATION_UPDATE_REPOSITION (1<<2)
|
||||
#define PW_NODE_ACTIVATION_UPDATE_FLUSH (1<<3) /* flush out current segments and immediately
|
||||
* start the new one */
|
||||
uint32_t change_mask;
|
||||
enum spa_io_position_state state; /* when change_mask & PW_NODE_ACTIVATION_UPDATE_STATE */
|
||||
#define PW_NODE_ACTIVATION_COMMAND_START 0
|
||||
#define PW_NODE_ACTIVATION_COMMAND_STOP 1
|
||||
uint32_t command; /* when change_mask & PW_NODE_ACTIVATION_UPDATE_COMMAND */
|
||||
struct spa_io_segment segment; /* update for the extra segment info
|
||||
* fields. When REPOSITION update, the segment
|
||||
* position field will contain the desired
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue