node: add fields to support sync

Sync is enabled when clients need time to move to a new location.
It's a bit like GStreamer preroll after a seek. Clients that need
time, increment the sync_total. Whenever a seek is done, the server
waits in the Starting state until the sync_pending is 0 (or timeout
later).

Improve atomic operations
This commit is contained in:
Wim Taymans 2019-08-28 13:56:23 +02:00
parent 0a15e1f804
commit 84405dae2a
5 changed files with 70 additions and 34 deletions

@ -1 +1 @@
Subproject commit a52ad27a169ffd6abb4090e1d77385bd879a1d43
Subproject commit 0c2c2c72d04335a7013159ca8c7e284f4676e5df

View file

@ -111,7 +111,7 @@ struct spa_io_sequence {
/** bar and beat segment */
struct spa_io_segment_bar {
uint32_t offset; /**< offset in samples of this beat */
uint64_t offset; /**< offset in segment of this beat */
float signature_num; /**< time signature numerator */
float signature_denom; /**< time signature denominator */
double bpm; /**< beats per minute */
@ -121,7 +121,7 @@ struct spa_io_segment_bar {
/** video frame segment */
struct spa_io_segment_video {
uint32_t offset; /**< offset of frame against current segment */
uint64_t offset; /** offset in segment */
struct spa_fraction framerate;
#define SPA_IO_SEGMENT_VIDEO_FLAG_DROP_FRAME (1<<0)
#define SPA_IO_SEGMENT_VIDEO_FLAG_PULL_DOWN (1<<1)

View file

@ -874,6 +874,29 @@ static void node_on_fd_events(struct spa_source *source)
}
}
static void reset_segment(struct spa_io_segment *seg)
{
seg->flags = 0;
seg->valid = SPA_IO_SEGMENT_VALID_POSITION;
seg->start = 0;
seg->duration = 0;
seg->position = 0;
seg->rate = 1.0;
}
static void reset_position(struct spa_io_position *pos)
{
uint32_t i;
pos->clock.rate = SPA_FRACTION(1, 48000);
pos->clock.duration = DEFAULT_QUANTUM;
pos->offset = INT64_MIN;
pos->n_segments = 1;
for (i = 0; i < SPA_IO_POSITION_MAX_SEGMENTS; i++)
reset_segment(&pos->segments[i]);
}
SPA_EXPORT
struct pw_node *pw_node_new(struct pw_core *core,
struct pw_properties *properties,
@ -961,16 +984,7 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->rt.target.data = this;
this->rt.driver_target.signal = process_node;
this->rt.activation->position.clock.rate = SPA_FRACTION(1, 48000);
this->rt.activation->position.clock.duration = DEFAULT_QUANTUM;
this->rt.activation->position.offset = INT64_MIN;
this->rt.activation->position.n_segments = 1;
this->rt.activation->position.segments[0].flags = 0;
this->rt.activation->position.segments[0].valid = SPA_IO_SEGMENT_VALID_POSITION;
this->rt.activation->position.segments[0].start = 0;
this->rt.activation->position.segments[0].duration = 0;
this->rt.activation->position.segments[0].position = 0;
this->rt.activation->position.segments[0].rate = 1.0;
reset_position(&this->rt.activation->position);
check_properties(this);
@ -1178,11 +1192,11 @@ static void update_position(struct pw_node *node)
if (a->position.offset == INT64_MIN)
a->position.offset = a->position.clock.position;
seq1 = SEQ_READ(&a->pending.seq);
seq1 = SEQ_READ(a->pending.seq);
change_mask = a->pending.change_mask;
state = a->pending.state;
segment = a->pending.segment;
seq2 = SEQ_READ(&a->pending.seq);
seq2 = SEQ_READ(a->pending.seq);
/* if we can't read a stable update, just ignore it and process it
* in the next cycle. */
@ -1213,20 +1227,20 @@ static void update_position(struct pw_node *node)
seg->rate = segment.rate;
if (seg->start == 0)
seg->start = a->position.clock.position - a->position.offset;
a->position.state = SPA_IO_POSITION_STATE_STARTING;
ATOMIC_STORE(a->sync_pending, a->sync_total);
ATOMIC_INC(a->sync_version);
}
if (change_mask & PW_NODE_ACTIVATION_UPDATE_STATE) {
switch (state) {
case SPA_IO_POSITION_STATE_STOPPED:
case SPA_IO_POSITION_STATE_RUNNING:
a->position.state = state;
break;
case SPA_IO_POSITION_STATE_STARTING:
a->position.state = SPA_IO_POSITION_STATE_RUNNING;
break;
}
a->position.state = state;
}
if (a->position.state == SPA_IO_POSITION_STATE_STOPPED)
if (a->position.state == SPA_IO_POSITION_STATE_STARTING) {
if (ATOMIC_LOAD(a->sync_pending) == 0)
a->position.state = SPA_IO_POSITION_STATE_RUNNING;
}
if (a->position.state != SPA_IO_POSITION_STATE_RUNNING)
a->position.offset += a->position.clock.duration;
}

View file

@ -364,7 +364,17 @@ struct pw_node_activation {
uint32_t segment_master[32]; /* unique id (client id usually) of client
* that will update extra segment info, There
* can be one master for each segment
* bitfield */
* bitfield. 0 means no master for the
* given segment info */
uint64_t sync_timeout;
uint32_t sync_version; /* version updates whenever new sync is
* required */
uint32_t sync_total; /* number of clients that have want to
* explicitly signal when they are ready to
* process the pending segment */
uint32_t sync_pending; /* pending number of clients preparing for
* the current segment */
uint32_t version;
struct pw_node_activation_state state[2]; /* one current state and one next state,
@ -392,11 +402,23 @@ struct pw_node_activation {
} pending;
};
#define SEQ_WRITE(s) __atomic_add_fetch((s), 1, __ATOMIC_SEQ_CST)
#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && (s2 & 1) == 0)
#define ATOMIC_CAS(v,ov,nv) \
({ \
__typeof__(v) __ov = (ov); \
__atomic_compare_exchange_n(&(v), &__ov, (nv), \
0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); \
})
#define SEQ_READ(s) __atomic_load_n((s), __ATOMIC_SEQ_CST)
#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && (s2 & 1) == 0)
#define ATOMIC_DEC(s) __atomic_sub_fetch(&(s), 1, __ATOMIC_SEQ_CST)
#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST)
#define ATOMIC_STORE(s,v) __atomic_store_n(&(s), (v), __ATOMIC_SEQ_CST)
#define SEQ_WRITE(s) ATOMIC_INC(s)
#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0)
#define SEQ_READ(s) ATOMIC_LOAD(s)
#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && ((s2) & 1) == 0)
#define pw_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_node_events, m, v, ##__VA_ARGS__)
#define pw_node_emit_destroy(n) pw_node_emit(n, destroy, 0)

View file

@ -729,13 +729,13 @@ static inline void copy_position(struct stream *impl, int64_t queued)
{
struct spa_io_position *p = impl->position;
if (p != NULL) {
SEQ_WRITE(&impl->seq);
SEQ_WRITE(impl->seq);
impl->time.now = p->clock.nsec;
impl->time.rate = p->clock.rate;
impl->time.ticks = p->clock.position;
impl->time.delay = p->clock.delay;
impl->time.queued = queued;
SEQ_WRITE(&impl->seq);
SEQ_WRITE(impl->seq);
}
}
@ -1638,9 +1638,9 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
uintptr_t seq1, seq2;
do {
seq1 = SEQ_READ(&impl->seq);
seq1 = SEQ_READ(impl->seq);
*time = impl->time;
seq2 = SEQ_READ(&impl->seq);
seq2 = SEQ_READ(impl->seq);
} while (!SEQ_READ_SUCCESS(seq1, seq2));
if (impl->direction == SPA_DIRECTION_INPUT)