node: improve position and transport

Reorganize some things, let the clients update the segment info
in their own activation, then let the server merge it. This avoids
clients stepping on eachother. When looping through the clients,
copy the segment info when we encounter its owner.

Remove the list of segment owners to the activation. This is better
than in the activation because we can then just keep one list of
owners.

Remove the NONBLOCK flag from the eventfd so that we can do blocking
reads as well.

Just keep a reposition owner in the driver activation. This points
to the node that has the reposition info. This avoid complicated
synchronization to keep multiple nodes from stepping on eachother.
Now they can just prepare the reposition info in their activation and
set themselves as the reposition owner. The last one who succeeds
wins.
This commit is contained in:
Wim Taymans 2019-09-02 12:05:05 +02:00
parent ca34a75173
commit faaf84286b
5 changed files with 97 additions and 86 deletions

@ -1 +1 @@
Subproject commit 926b500ceb030348a97adf33f391075cafaf6616
Subproject commit fc0b15d17b9ce7c9d5241fa4f92d989c46129f76

View file

@ -111,10 +111,9 @@ struct spa_io_sequence {
/** bar and beat segment */
struct spa_io_segment_bar {
uint32_t owner; /**< owner id */
#define SPA_IO_SEGMENT_BAR_FLAG_VALID (1<<0)
uint32_t flags; /**< extra flags */
uint64_t offset; /**< offset in segment of this beat */
uint32_t offset; /**< offset in segment of this beat */
float signature_num; /**< time signature numerator */
float signature_denom; /**< time signature denominator */
double bpm; /**< beats per minute */
@ -124,13 +123,12 @@ struct spa_io_segment_bar {
/** video frame segment */
struct spa_io_segment_video {
uint32_t owner; /**< owner id */
#define SPA_IO_SEGMENT_VIDEO_FLAG_VALID (1<<0)
#define SPA_IO_SEGMENT_VIDEO_FLAG_DROP_FRAME (1<<1)
#define SPA_IO_SEGMENT_VIDEO_FLAG_PULL_DOWN (1<<2)
#define SPA_IO_SEGMENT_VIDEO_FLAG_INTERLACED (1<<3)
uint32_t flags; /**< flags */
uint64_t offset; /**< offset in segment */
uint32_t offset; /**< offset in segment */
struct spa_fraction framerate;
uint32_t hours;
uint32_t minutes;
@ -161,6 +159,13 @@ struct spa_io_segment_video {
* and filling up the corresponding structures.
*/
struct spa_io_segment {
uint32_t version;
#define SPA_IO_SEGMENT_FLAG_LOOPING (1<<0) /**< after the duration, the segment repeats */
#define SPA_IO_SEGMENT_FLAG_NO_POSITION (1<<1) /**< position is invalid. The position can be invalid
* after a seek, for example, when the exact mapping
* of the extra segment info (bar, video, ...) to
* position has not been determined yet */
uint32_t flags; /**< extra flags */
uint64_t start; /**< value of running time when this
* info is active. Can be in the future for
* pending changes. It does not have to be in
@ -172,12 +177,6 @@ struct spa_io_segment {
* set, the segment repeats. */
double rate; /**< overal rate of the segment, can be negative for
* backwards time reporting. */
#define SPA_IO_SEGMENT_FLAG_LOOPING (1<<0) /**< after the duration, the segment repeats */
#define SPA_IO_SEGMENT_FLAG_NO_POSITION (1<<1) /**< position is invalid. The position can be invalid
* after a seek, for example, when the exact mapping
* of the extra segment info (bar, video, ...) to
* position has not been determined yet */
uint32_t flags; /**< extra flags */
uint64_t position; /**< The position when the running time == start.
* can be invalid when the owner of the extra segment
* infomation has not yet made the mapping. */

View file

@ -1254,8 +1254,8 @@ static void node_initialized(void *data)
struct spa_system *data_system = impl->node.data_system;
size_t size;
impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK);
impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC);
impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC);
impl->other_fds[0] = impl->fds[1];
impl->other_fds[1] = impl->fds[0];
node->data_source.fd = impl->fds[0];

View file

@ -640,8 +640,8 @@ do_move_nodes(struct spa_loop *loop,
static void remove_segment_master(struct pw_node *driver, uint32_t node_id)
{
struct pw_node_activation *a = driver->rt.activation;
ATOMIC_CAS(a->pending.segment.bar.owner, node_id, 0);
ATOMIC_CAS(a->pending.segment.video.owner, node_id, 0);
ATOMIC_CAS(a->segment_owner[0], node_id, 0);
ATOMIC_CAS(a->segment_owner[1], node_id, 0);
}
SPA_EXPORT
@ -1205,44 +1205,19 @@ static const struct spa_node_events node_events = {
#define SYNC_START 1
#define SYNC_STOP 2
static int check_updates(struct pw_node *node)
static int check_updates(struct pw_node *node, uint32_t *reposition_owner)
{
int res = SYNC_CHECK;
struct pw_node_activation *a = node->rt.activation;
struct spa_io_segment segment, reposition, *seg;
uint32_t seq1, seq2, change_mask, command;
uint32_t command;
if (a->position.offset == INT64_MIN)
a->position.offset = a->position.clock.position;
seq1 = SEQ_READ(a->pending.seq);
change_mask = a->pending.change_mask;
command = a->pending.command;
reposition = a->pending.reposition;
segment = a->pending.segment;
seq2 = SEQ_READ(a->pending.seq);
command = ATOMIC_XCHG(a->command, PW_NODE_ACTIVATION_COMMAND_NONE);
*reposition_owner = ATOMIC_XCHG(a->reposition_owner, 0);
/* if we can't read a stable update, just ignore it and process it
* in the next cycle. */
if (SEQ_READ_SUCCESS(seq1, seq2))
a->pending.change_mask = 0;
else
change_mask = 0;
seg = &a->position.segments[0];
/* update extra segment info if there is an owner */
if (segment.bar.owner)
seg->bar = segment.bar;
else
seg->bar.owner = 0;
if (segment.video.owner)
seg->video = segment.video;
else
seg->video.owner = 0;
if (change_mask & PW_NODE_ACTIVATION_UPDATE_COMMAND) {
if (command != PW_NODE_ACTIVATION_COMMAND_NONE) {
pw_log_debug(NAME" %p: update command:%u", node, command);
switch (command) {
case PW_NODE_ACTIVATION_COMMAND_STOP:
@ -1258,29 +1233,42 @@ static int check_updates(struct pw_node *node)
break;
}
}
if (change_mask & PW_NODE_ACTIVATION_UPDATE_REPOSITION) {
pw_log_debug(NAME" %p: update position:%lu", node, reposition.position);
seg->flags = reposition.flags;
seg->start = reposition.start;
seg->duration = reposition.duration;
seg->position = reposition.position;
seg->rate = reposition.rate;
if (seg->start == 0)
seg->start = a->position.clock.position - a->position.offset;
switch (a->position.state) {
case SPA_IO_POSITION_STATE_RUNNING:
a->position.state = SPA_IO_POSITION_STATE_STARTING;
a->sync_left = a->sync_timeout /
((a->position.clock.duration * SPA_NSEC_PER_SEC) /
a->position.clock.rate.denom);
break;
}
if (*reposition_owner)
res = SYNC_START;
}
return res;
}
static void do_reposition(struct pw_node *driver, struct pw_node *node)
{
struct pw_node_activation *a = driver->rt.activation;
struct spa_io_segment *dst, *src;
src = &node->rt.activation->reposition;
dst = &a->position.segments[0];
pw_log_debug(NAME" %p: update position:%lu", node, src->position);
memcpy(dst, src, sizeof(struct spa_io_segment));
dst->flags = src->flags;
dst->start = src->start;
dst->duration = src->duration;
dst->rate = src->rate;
dst->position = src->position;
if (dst->start == 0)
dst->start = a->position.clock.position - a->position.offset;
switch (a->position.state) {
case SPA_IO_POSITION_STATE_RUNNING:
a->position.state = SPA_IO_POSITION_STATE_STARTING;
a->sync_left = a->sync_timeout /
((a->position.clock.duration * SPA_NSEC_PER_SEC) /
a->position.clock.rate.denom);
break;
}
}
static void update_position(struct pw_node *node, int all_ready)
{
struct pw_node_activation *a = node->rt.activation;
@ -1300,7 +1288,7 @@ static void update_position(struct pw_node *node, int all_ready)
static int node_ready(void *data, int status)
{
struct pw_node *node = data;
struct pw_node *node = data, *reposition_node = NULL;
struct pw_node *driver = node->driver_node;
struct pw_node_target *t;
struct pw_port *p;
@ -1311,6 +1299,7 @@ static int node_ready(void *data, int status)
if (node == driver) {
struct pw_node_activation *a = node->rt.activation;
int sync_type, all_ready, update_sync, target_sync;
uint32_t owner[2], reposition_owner;
if (a->state[0].pending != 0) {
pw_log_warn(NAME" %p: graph not finished", node);
@ -1318,23 +1307,42 @@ static int node_ready(void *data, int status)
node->rt.target.signal(node->rt.target.data);
}
sync_type = check_updates(node);
sync_type = check_updates(node, &reposition_owner);
owner[0] = ATOMIC_LOAD(a->segment_owner[0]);
owner[1] = ATOMIC_LOAD(a->segment_owner[1]);
all_ready = sync_type == SYNC_CHECK;
update_sync = !all_ready;
target_sync = sync_type == SYNC_START ? true : false;
spa_list_for_each(t, &driver->rt.target_list, link) {
pw_node_activation_state_reset(&t->activation->state[0]);
t->activation->status = PW_NODE_ACTIVATION_NOT_TRIGGERED;
struct pw_node_activation *ta = t->activation;
uint32_t id = t->node->info.id;
ta->status = PW_NODE_ACTIVATION_NOT_TRIGGERED;
pw_node_activation_state_reset(&ta->state[0]);
/* this is the node with reposition info */
if (id == reposition_owner)
reposition_node = t->node;
/* update extra segment info if it is the owner */
if (id == owner[0])
a->position.segments[0].bar = ta->segment.bar;
if (id == owner[1])
a->position.segments[0].video = ta->segment.video;
if (update_sync) {
t->activation->pending_sync = target_sync;
t->activation->pending_new_pos = target_sync;
ta->pending_sync = target_sync;
ta->pending_new_pos = target_sync;
} else {
all_ready &= t->activation->pending_sync == false;
all_ready &= ta->pending_sync == false;
}
}
a->prev_signal_time = a->signal_time;
if (reposition_node)
do_reposition(node, reposition_node);
update_position(node, all_ready);
}
if (node->driver && !node->master)

View file

@ -366,8 +366,19 @@ struct pw_node_activation {
uint64_t finish_time;
uint64_t prev_signal_time;
/* for drivers */
struct spa_io_position position;
/* updates */
struct spa_io_segment reposition; /* reposition info, used when driver reposition_owner
* has this node id */
struct spa_io_segment segment; /* update for the extra segment info fields.
* used when driver segment_owner has this node id */
/* for drivers, shared with all nodes */
uint32_t segment_owner[32]; /* id of owners for each segment info struct.
* nodes that want to update segment info need to
* CAS their node id in this array. */
struct spa_io_position position; /* contains current position and segment info.
* extra info is updated by nodes that have set
* themselves as owner in the segment structs */
uint64_t sync_timeout; /* sync timeout in nanoseconds
* position goes to RUNNING without waiting any
@ -381,20 +392,12 @@ struct pw_node_activation {
uint64_t xrun_delay; /* delay of last xrun in microseconds */
uint64_t max_delay; /* max of all xruns in microseconds */
struct {
uint32_t seq;
#define PW_NODE_ACTIVATION_UPDATE_COMMAND (1<<0)
#define PW_NODE_ACTIVATION_UPDATE_REPOSITION (1<<2)
#define PW_NODE_ACTIVATION_UPDATE_FLUSH (1<<3) /* flush out current segments and immediately
* start the new one */
uint32_t change_mask;
#define PW_NODE_ACTIVATION_COMMAND_START 0
#define PW_NODE_ACTIVATION_COMMAND_STOP 1
uint32_t command; /* when change_mask & PW_NODE_ACTIVATION_UPDATE_COMMAND */
struct spa_io_segment reposition; /* reposition information */
struct spa_io_segment segment; /* update for the extra segment info
* fields. */
} pending;
#define PW_NODE_ACTIVATION_COMMAND_NONE 0
#define PW_NODE_ACTIVATION_COMMAND_START 1
#define PW_NODE_ACTIVATION_COMMAND_STOP 2
uint32_t command; /* next command */
uint32_t reposition_owner; /* owner id with new reposition info, last one
* to update wins */
};
#define ATOMIC_CAS(v,ov,nv) \
@ -408,6 +411,7 @@ struct pw_node_activation {
#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST)
#define ATOMIC_STORE(s,v) __atomic_store_n(&(s), (v), __ATOMIC_SEQ_CST)
#define ATOMIC_XCHG(s,v) __atomic_exchange_n(&(s), (v), __ATOMIC_SEQ_CST)
#define SEQ_WRITE(s) ATOMIC_INC(s)
#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0)