From 84405dae2ad11d97deb33c1cfe986bb805a0ef17 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 28 Aug 2019 13:56:23 +0200 Subject: [PATCH] node: add fields to support sync Sync is enabled when clients need time to move to a new location. It's a bit like GStreamer preroll after a seek. Clients that need time, increment the sync_total. Whenever a seek is done, the server waits in the Starting state until the sync_pending is 0 (or timeout later). Improve atomic operations --- pipewire-jack | 2 +- spa/include/spa/node/io.h | 4 +-- src/pipewire/node.c | 58 ++++++++++++++++++++++++--------------- src/pipewire/private.h | 32 +++++++++++++++++---- src/pipewire/stream.c | 8 +++--- 5 files changed, 70 insertions(+), 34 deletions(-) diff --git a/pipewire-jack b/pipewire-jack index a52ad27a1..0c2c2c72d 160000 --- a/pipewire-jack +++ b/pipewire-jack @@ -1 +1 @@ -Subproject commit a52ad27a169ffd6abb4090e1d77385bd879a1d43 +Subproject commit 0c2c2c72d04335a7013159ca8c7e284f4676e5df diff --git a/spa/include/spa/node/io.h b/spa/include/spa/node/io.h index 116c30ea6..a630f0d84 100644 --- a/spa/include/spa/node/io.h +++ b/spa/include/spa/node/io.h @@ -111,7 +111,7 @@ struct spa_io_sequence { /** bar and beat segment */ struct spa_io_segment_bar { - uint32_t offset; /**< offset in samples of this beat */ + uint64_t offset; /**< offset in segment of this beat */ float signature_num; /**< time signature numerator */ float signature_denom; /**< time signature denominator */ double bpm; /**< beats per minute */ @@ -121,7 +121,7 @@ struct spa_io_segment_bar { /** video frame segment */ struct spa_io_segment_video { - uint32_t offset; /**< offset of frame against current segment */ + uint64_t offset; /** offset in segment */ struct spa_fraction framerate; #define SPA_IO_SEGMENT_VIDEO_FLAG_DROP_FRAME (1<<0) #define SPA_IO_SEGMENT_VIDEO_FLAG_PULL_DOWN (1<<1) diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 46968feba..37846f843 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -874,6 +874,29 @@ static void node_on_fd_events(struct spa_source *source) } } +static void reset_segment(struct spa_io_segment *seg) +{ + seg->flags = 0; + seg->valid = SPA_IO_SEGMENT_VALID_POSITION; + seg->start = 0; + seg->duration = 0; + seg->position = 0; + seg->rate = 1.0; +} + +static void reset_position(struct spa_io_position *pos) +{ + uint32_t i; + + pos->clock.rate = SPA_FRACTION(1, 48000); + pos->clock.duration = DEFAULT_QUANTUM; + pos->offset = INT64_MIN; + + pos->n_segments = 1; + for (i = 0; i < SPA_IO_POSITION_MAX_SEGMENTS; i++) + reset_segment(&pos->segments[i]); +} + SPA_EXPORT struct pw_node *pw_node_new(struct pw_core *core, struct pw_properties *properties, @@ -961,16 +984,7 @@ struct pw_node *pw_node_new(struct pw_core *core, this->rt.target.data = this; this->rt.driver_target.signal = process_node; - this->rt.activation->position.clock.rate = SPA_FRACTION(1, 48000); - this->rt.activation->position.clock.duration = DEFAULT_QUANTUM; - this->rt.activation->position.offset = INT64_MIN; - this->rt.activation->position.n_segments = 1; - this->rt.activation->position.segments[0].flags = 0; - this->rt.activation->position.segments[0].valid = SPA_IO_SEGMENT_VALID_POSITION; - this->rt.activation->position.segments[0].start = 0; - this->rt.activation->position.segments[0].duration = 0; - this->rt.activation->position.segments[0].position = 0; - this->rt.activation->position.segments[0].rate = 1.0; + reset_position(&this->rt.activation->position); check_properties(this); @@ -1178,11 +1192,11 @@ static void update_position(struct pw_node *node) if (a->position.offset == INT64_MIN) a->position.offset = a->position.clock.position; - seq1 = SEQ_READ(&a->pending.seq); + seq1 = SEQ_READ(a->pending.seq); change_mask = a->pending.change_mask; state = a->pending.state; segment = a->pending.segment; - seq2 = SEQ_READ(&a->pending.seq); + seq2 = SEQ_READ(a->pending.seq); /* if we can't read a stable update, just ignore it and process it * in the next cycle. */ @@ -1213,20 +1227,20 @@ static void update_position(struct pw_node *node) seg->rate = segment.rate; if (seg->start == 0) seg->start = a->position.clock.position - a->position.offset; + + a->position.state = SPA_IO_POSITION_STATE_STARTING; + ATOMIC_STORE(a->sync_pending, a->sync_total); + ATOMIC_INC(a->sync_version); } if (change_mask & PW_NODE_ACTIVATION_UPDATE_STATE) { - switch (state) { - case SPA_IO_POSITION_STATE_STOPPED: - case SPA_IO_POSITION_STATE_RUNNING: - a->position.state = state; - break; - case SPA_IO_POSITION_STATE_STARTING: - a->position.state = SPA_IO_POSITION_STATE_RUNNING; - break; - } + a->position.state = state; } - if (a->position.state == SPA_IO_POSITION_STATE_STOPPED) + if (a->position.state == SPA_IO_POSITION_STATE_STARTING) { + if (ATOMIC_LOAD(a->sync_pending) == 0) + a->position.state = SPA_IO_POSITION_STATE_RUNNING; + } + if (a->position.state != SPA_IO_POSITION_STATE_RUNNING) a->position.offset += a->position.clock.duration; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 0583437bd..d14f08738 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -364,7 +364,17 @@ struct pw_node_activation { uint32_t segment_master[32]; /* unique id (client id usually) of client * that will update extra segment info, There * can be one master for each segment - * bitfield */ + * bitfield. 0 means no master for the + * given segment info */ + + uint64_t sync_timeout; + uint32_t sync_version; /* version updates whenever new sync is + * required */ + uint32_t sync_total; /* number of clients that have want to + * explicitly signal when they are ready to + * process the pending segment */ + uint32_t sync_pending; /* pending number of clients preparing for + * the current segment */ uint32_t version; struct pw_node_activation_state state[2]; /* one current state and one next state, @@ -392,11 +402,23 @@ struct pw_node_activation { } pending; }; -#define SEQ_WRITE(s) __atomic_add_fetch((s), 1, __ATOMIC_SEQ_CST) -#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && (s2 & 1) == 0) +#define ATOMIC_CAS(v,ov,nv) \ +({ \ + __typeof__(v) __ov = (ov); \ + __atomic_compare_exchange_n(&(v), &__ov, (nv), \ + 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); \ +}) -#define SEQ_READ(s) __atomic_load_n((s), __ATOMIC_SEQ_CST) -#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && (s2 & 1) == 0) +#define ATOMIC_DEC(s) __atomic_sub_fetch(&(s), 1, __ATOMIC_SEQ_CST) +#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST) +#define ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST) +#define ATOMIC_STORE(s,v) __atomic_store_n(&(s), (v), __ATOMIC_SEQ_CST) + +#define SEQ_WRITE(s) ATOMIC_INC(s) +#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0) + +#define SEQ_READ(s) ATOMIC_LOAD(s) +#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && ((s2) & 1) == 0) #define pw_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_node_events, m, v, ##__VA_ARGS__) #define pw_node_emit_destroy(n) pw_node_emit(n, destroy, 0) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index f9f8f2c0e..a190a1c43 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -729,13 +729,13 @@ static inline void copy_position(struct stream *impl, int64_t queued) { struct spa_io_position *p = impl->position; if (p != NULL) { - SEQ_WRITE(&impl->seq); + SEQ_WRITE(impl->seq); impl->time.now = p->clock.nsec; impl->time.rate = p->clock.rate; impl->time.ticks = p->clock.position; impl->time.delay = p->clock.delay; impl->time.queued = queued; - SEQ_WRITE(&impl->seq); + SEQ_WRITE(impl->seq); } } @@ -1638,9 +1638,9 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) uintptr_t seq1, seq2; do { - seq1 = SEQ_READ(&impl->seq); + seq1 = SEQ_READ(impl->seq); *time = impl->time; - seq2 = SEQ_READ(&impl->seq); + seq2 = SEQ_READ(impl->seq); } while (!SEQ_READ_SUCCESS(seq1, seq2)); if (impl->direction == SPA_DIRECTION_INPUT)