From 0501ef165a46099eaeaeffe159c768c13a41d3cd Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 13 Jul 2023 12:25:22 +0200 Subject: [PATCH] spa: add atomic.h and port macros to it --- pipewire-alsa/alsa-plugins/pcm_pipewire.c | 20 ++++--------- pipewire-jack/src/pipewire-jack.c | 12 ++++---- pipewire-v4l2/src/pipewire-v4l2.c | 11 ++++--- spa/include/spa/graph/graph.h | 5 ++-- spa/include/spa/utils/atomic.h | 35 +++++++++++++++++++++++ src/pipewire/context.c | 5 ++-- src/pipewire/impl-node.c | 12 ++++---- src/pipewire/private.h | 22 ++------------ src/pipewire/stream.c | 20 ++++++------- 9 files changed, 76 insertions(+), 66 deletions(-) create mode 100644 spa/include/spa/utils/atomic.h diff --git a/pipewire-alsa/alsa-plugins/pcm_pipewire.c b/pipewire-alsa/alsa-plugins/pcm_pipewire.c index 19ae6ebcc..6aecbb70d 100644 --- a/pipewire-alsa/alsa-plugins/pcm_pipewire.c +++ b/pipewire-alsa/alsa-plugins/pcm_pipewire.c @@ -19,20 +19,12 @@ #include #include #include +#include #include #include #include -#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST) -#define ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST) - -#define SEQ_WRITE(s) ATOMIC_INC(s) -#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0) - -#define SEQ_READ(s) ATOMIC_LOAD(s) -#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && ((s2) & 1) == 0) - PW_LOG_TOPIC_STATIC(alsa_log_topic, "alsa.pcm"); #define PW_LOG_TOPIC_DEFAULT alsa_log_topic @@ -223,7 +215,7 @@ static int snd_pcm_pipewire_delay(snd_pcm_ioplug_t *io, snd_pcm_sframes_t *delay int64_t diff; do { - seq1 = SEQ_READ(pw->seq); + seq1 = SPA_SEQ_READ(pw->seq); delay = pw->delay + pw->transfered; now = pw->now; @@ -232,8 +224,8 @@ static int snd_pcm_pipewire_delay(snd_pcm_ioplug_t *io, snd_pcm_sframes_t *delay else avail = snd_pcm_ioplug_avail(io, pw->hw_ptr, io->appl_ptr); - seq2 = SEQ_READ(pw->seq); - } while (!SEQ_READ_SUCCESS(seq1, seq2)); + seq2 = SPA_SEQ_READ(pw->seq); + } while (!SPA_SEQ_READ_SUCCESS(seq1, seq2)); if (now != 0 && (io->state == SND_PCM_STATE_RUNNING || io->state == SND_PCM_STATE_DRAINING)) { @@ -437,7 +429,7 @@ static void on_stream_process(void *data) want = b->requested ? b->requested : hw_avail; - SEQ_WRITE(pw->seq); + SPA_SEQ_WRITE(pw->seq); if (pw->now != pwt.now) { pw->transfered = pw->buffered; @@ -455,7 +447,7 @@ static void on_stream_process(void *data) pw->buffered = (want == 0 || pw->transfered < want) ? 0 : (pw->transfered % want); pw->now = pwt.now; - SEQ_WRITE(pw->seq); + SPA_SEQ_WRITE(pw->seq); pw_log_trace("%p: avail-before:%lu avail:%lu want:%lu xfer:%lu hw:%lu appl:%lu", pw, before, hw_avail, want, xfer, pw->hw_ptr, io->appl_ptr); diff --git a/pipewire-jack/src/pipewire-jack.c b/pipewire-jack/src/pipewire-jack.c index f5aa3da06..52d1828c2 100644 --- a/pipewire-jack/src/pipewire-jack.c +++ b/pipewire-jack/src/pipewire-jack.c @@ -1898,18 +1898,18 @@ static int install_timeowner(struct client *c) pw_log_debug("%p: activation %p", c, a); /* was ok */ - owner = ATOMIC_LOAD(a->segment_owner[0]); + owner = SPA_ATOMIC_LOAD(a->segment_owner[0]); if (owner == c->node_id) return 0; /* try to become owner */ if (c->timeowner_conditional) { - if (!ATOMIC_CAS(a->segment_owner[0], 0, c->node_id)) { + if (!SPA_ATOMIC_CAS(a->segment_owner[0], 0, c->node_id)) { pw_log_debug("%p: owner:%u id:%u", c, owner, c->node_id); return -EBUSY; } } else { - ATOMIC_STORE(a->segment_owner[0], c->node_id); + SPA_ATOMIC_STORE(a->segment_owner[0], c->node_id); } pw_log_debug("%p: timebase installed for id:%u", c, c->node_id); @@ -6195,7 +6195,7 @@ int jack_release_timebase (jack_client_t *client) if ((a = c->driver_activation) == NULL) return -EIO; - if (!ATOMIC_CAS(a->segment_owner[0], c->node_id, 0)) + if (!SPA_ATOMIC_CAS(a->segment_owner[0], c->node_id, 0)) return -EINVAL; c->timebase_callback = NULL; @@ -6367,7 +6367,7 @@ int jack_transport_reposition (jack_client_t *client, na->reposition.duration = 0; na->reposition.position = pos->frame; na->reposition.rate = 1.0; - ATOMIC_STORE(a->reposition_owner, c->node_id); + SPA_ATOMIC_STORE(a->reposition_owner, c->node_id); return 0; } @@ -6377,7 +6377,7 @@ static void update_command(struct client *c, uint32_t command) struct pw_node_activation *a = c->rt.driver_activation; if (!a) return; - ATOMIC_STORE(a->command, command); + SPA_ATOMIC_STORE(a->command, command); } SPA_EXPORT diff --git a/pipewire-v4l2/src/pipewire-v4l2.c b/pipewire-v4l2/src/pipewire-v4l2.c index 972f12428..14c74a0f8 100644 --- a/pipewire-v4l2/src/pipewire-v4l2.c +++ b/pipewire-v4l2/src/pipewire-v4l2.c @@ -19,6 +19,7 @@ #include "pipewire-v4l2.h" +#include #include #include #include @@ -248,8 +249,6 @@ static void update_params(struct file *file) } } } -#define ATOMIC_DEC(s) __atomic_sub_fetch(&(s), 1, __ATOMIC_SEQ_CST) -#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST) static struct file *make_file(void) { @@ -301,7 +300,7 @@ static void free_file(struct file *file) static void unref_file(struct file *file) { pw_log_debug("file:%d ref:%d", file->fd, file->ref); - if (ATOMIC_DEC(file->ref) <= 0) + if (SPA_ATOMIC_DEC(file->ref) <= 0) free_file(file); } @@ -314,7 +313,7 @@ static int add_fd_map(int fd, struct file *file, uint32_t flags) map->fd = fd; map->flags = flags; map->file = file; - ATOMIC_INC(file->ref); + SPA_ATOMIC_INC(file->ref); pw_log_debug("fd:%d -> file:%d ref:%d", fd, file->fd, file->ref); } pthread_mutex_unlock(&globals.lock); @@ -349,7 +348,7 @@ static struct fd_map *find_fd_map_unlocked(int fd) struct fd_map *map; pw_array_for_each(map, &globals.fd_maps) { if (map->fd == fd) { - ATOMIC_INC(map->file->ref); + SPA_ATOMIC_INC(map->file->ref); pw_log_debug("fd:%d find:%d ref:%d", map->fd, fd, map->file->ref); return map; } @@ -384,7 +383,7 @@ static struct file *find_file_by_dev(uint32_t dev) if (tmp->file->dev_id == dev) { if (tmp->file->closed) tmp->file->fd = tmp->fd; - ATOMIC_INC(tmp->file->ref); + SPA_ATOMIC_INC(tmp->file->ref); map = tmp; pw_log_debug("dev:%d find:%d ref:%d", tmp->file->dev_id, dev, tmp->file->ref); diff --git a/spa/include/spa/graph/graph.h b/spa/include/spa/graph/graph.h index 4b967d92c..bfcb9b3d1 100644 --- a/spa/include/spa/graph/graph.h +++ b/spa/include/spa/graph/graph.h @@ -18,6 +18,7 @@ extern "C" { * \{ */ +#include #include #include #include @@ -53,7 +54,7 @@ struct spa_graph_link { #define spa_graph_link_signal(l) ((l)->signal((l)->signal_data)) -#define spa_graph_state_dec(s,c) (__atomic_sub_fetch(&(s)->pending, c, __ATOMIC_SEQ_CST) == 0) +#define spa_graph_state_dec(s) (SPA_ATOMIC_DEC(s->pending) == 0) static inline int spa_graph_link_trigger(struct spa_graph_link *link) { @@ -62,7 +63,7 @@ static inline int spa_graph_link_trigger(struct spa_graph_link *link) spa_debug("link %p: state %p: pending %d/%d", link, state, state->pending, state->required); - if (spa_graph_state_dec(state, 1)) + if (spa_graph_state_dec(state)) spa_graph_link_signal(link); return state->status; diff --git a/spa/include/spa/utils/atomic.h b/spa/include/spa/utils/atomic.h new file mode 100644 index 000000000..549420b89 --- /dev/null +++ b/spa/include/spa/utils/atomic.h @@ -0,0 +1,35 @@ +/* Atomic operations */ +/* SPDX-FileCopyrightText: Copyright © 2023 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +#ifndef SPA_ATOMIC_H +#define SPA_ATOMIC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define SPA_ATOMIC_CAS(v,ov,nv) \ +({ \ + __typeof__(v) __ov = (ov); \ + __atomic_compare_exchange_n(&(v), &__ov, (nv), \ + 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); \ +}) + +#define SPA_ATOMIC_DEC(s) __atomic_sub_fetch(&(s), 1, __ATOMIC_SEQ_CST) +#define SPA_ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST) +#define SPA_ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST) +#define SPA_ATOMIC_STORE(s,v) __atomic_store_n(&(s), (v), __ATOMIC_SEQ_CST) +#define SPA_ATOMIC_XCHG(s,v) __atomic_exchange_n(&(s), (v), __ATOMIC_SEQ_CST) + +#define SPA_SEQ_WRITE(s) SPA_ATOMIC_INC(s) +#define SPA_SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0) + +#define SPA_SEQ_READ(s) SPA_ATOMIC_LOAD(s) +#define SPA_SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && ((s2) & 1) == 0) + +#ifdef __cplusplus +} /* extern "C" */ +#endif + +#endif /* SPA_ATOMIC_H */ diff --git a/src/pipewire/context.c b/src/pipewire/context.c index e4aff6ea2..d7822c176 100644 --- a/src/pipewire/context.c +++ b/src/pipewire/context.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -1534,10 +1535,10 @@ again: pw_log_debug("%p: apply duration:%"PRIu64" rate:%u/%u", context, n->target_quantum, n->target_rate.num, n->target_rate.denom); - SEQ_WRITE(n->rt.position->clock.target_seq); + SPA_SEQ_WRITE(n->rt.position->clock.target_seq); n->rt.position->clock.target_duration = n->target_quantum; n->rt.position->clock.target_rate = n->target_rate; - SEQ_WRITE(n->rt.position->clock.target_seq); + SPA_SEQ_WRITE(n->rt.position->clock.target_seq); if (n->info.state < PW_NODE_STATE_RUNNING) { n->rt.position->clock.duration = n->target_quantum; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 89bea92af..a3a9ae905 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -846,8 +846,8 @@ do_move_nodes(struct spa_loop *loop, static void remove_segment_owner(struct pw_impl_node *driver, uint32_t node_id) { struct pw_node_activation *a = driver->rt.target.activation; - ATOMIC_CAS(a->segment_owner[0], node_id, 0); - ATOMIC_CAS(a->segment_owner[1], node_id, 0); + SPA_ATOMIC_CAS(a->segment_owner[0], node_id, 0); + SPA_ATOMIC_CAS(a->segment_owner[1], node_id, 0); } SPA_EXPORT @@ -1661,8 +1661,8 @@ static inline int check_updates(struct pw_impl_node *node, uint32_t *reposition_ if (SPA_UNLIKELY(a->position.offset == INT64_MIN)) a->position.offset = a->position.clock.position; - command = ATOMIC_XCHG(a->command, PW_NODE_ACTIVATION_COMMAND_NONE); - *reposition_owner = ATOMIC_XCHG(a->reposition_owner, 0); + command = SPA_ATOMIC_XCHG(a->command, PW_NODE_ACTIVATION_COMMAND_NONE); + *reposition_owner = SPA_ATOMIC_XCHG(a->reposition_owner, 0); if (SPA_UNLIKELY(command != PW_NODE_ACTIVATION_COMMAND_NONE)) { pw_log_debug("%p: update command:%u", node, command); @@ -1790,8 +1790,8 @@ static int node_ready(void *data, int status) } sync_type = check_updates(node, &reposition_owner); - owner[0] = ATOMIC_LOAD(a->segment_owner[0]); - owner[1] = ATOMIC_LOAD(a->segment_owner[1]); + owner[0] = SPA_ATOMIC_LOAD(a->segment_owner[0]); + owner[1] = SPA_ATOMIC_LOAD(a->segment_owner[1]); again: all_ready = sync_type == SYNC_CHECK; update_sync = !all_ready; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 8b260085b..d8977173e 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -17,6 +17,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -549,7 +550,7 @@ static inline void pw_node_activation_state_reset(struct pw_node_activation_stat state->pending = state->required; } -#define pw_node_activation_state_dec(s) (ATOMIC_DEC(s->pending) == 0) +#define pw_node_activation_state_dec(s) (SPA_ATOMIC_DEC(s->pending) == 0) struct pw_node_target { struct spa_list link; @@ -631,25 +632,6 @@ struct pw_node_activation { * to update wins */ }; -#define ATOMIC_CAS(v,ov,nv) \ -({ \ - __typeof__(v) __ov = (ov); \ - __atomic_compare_exchange_n(&(v), &__ov, (nv), \ - 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); \ -}) - -#define ATOMIC_DEC(s) __atomic_sub_fetch(&(s), 1, __ATOMIC_SEQ_CST) -#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST) -#define ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST) -#define ATOMIC_STORE(s,v) __atomic_store_n(&(s), (v), __ATOMIC_SEQ_CST) -#define ATOMIC_XCHG(s,v) __atomic_exchange_n(&(s), (v), __ATOMIC_SEQ_CST) - -#define SEQ_WRITE(s) ATOMIC_INC(s) -#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0) - -#define SEQ_READ(s) ATOMIC_LOAD(s) -#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && ((s2) & 1) == 0) - #define pw_impl_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_impl_node_events, m, v, ##__VA_ARGS__) #define pw_impl_node_emit_destroy(n) pw_impl_node_emit(n, destroy, 0) #define pw_impl_node_emit_free(n) pw_impl_node_emit(n, free, 0) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 668d1b15f..a40418400 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -621,7 +621,7 @@ static inline void copy_position(struct stream *impl, int64_t queued) { struct spa_io_position *p = impl->rt.position; - SEQ_WRITE(impl->seq); + SPA_SEQ_WRITE(impl->seq); if (SPA_LIKELY(p != NULL)) { impl->time.now = p->clock.nsec; impl->time.rate = p->clock.rate; @@ -636,7 +636,7 @@ static inline void copy_position(struct stream *impl, int64_t queued) } if (SPA_LIKELY(impl->rate_match != NULL)) impl->rate_queued = impl->rate_match->delay; - SEQ_WRITE(impl->seq); + SPA_SEQ_WRITE(impl->seq); } static int impl_send_command(void *object, const struct spa_command *command) @@ -859,7 +859,7 @@ static void clear_buffers(struct pw_stream *stream) while ((b = queue_pop(impl, &impl->dequeued))) { if (b->busy) - ATOMIC_DEC(b->busy->count); + SPA_ATOMIC_DEC(b->busy->count); } } else clear_queue(impl, &impl->dequeued); @@ -1039,7 +1039,7 @@ static int impl_node_process_input(void *object) pw_log_trace_fp("%p: push %d %p", stream, b->id, io); if (queue_push(impl, &impl->dequeued, b) == 0) { if (b->busy) - ATOMIC_INC(b->busy->count); + SPA_ATOMIC_INC(b->busy->count); } } if (!queue_is_empty(impl, &impl->dequeued)) { @@ -2340,12 +2340,12 @@ int pw_stream_get_time_n(struct pw_stream *stream, struct pw_time *time, size_t uint32_t buffered, quantum, index; do { - seq1 = SEQ_READ(impl->seq); + seq1 = SPA_SEQ_READ(impl->seq); memcpy(time, &impl->time, SPA_MIN(size, sizeof(struct pw_time))); buffered = impl->rate_queued; quantum = impl->quantum; - seq2 = SEQ_READ(impl->seq); - } while (!SEQ_READ_SUCCESS(seq1, seq2)); + seq2 = SPA_SEQ_READ(impl->seq); + } while (!SPA_SEQ_READ_SUCCESS(seq1, seq2)); if (impl->direction == SPA_DIRECTION_INPUT) time->queued = (int64_t)(time->queued - impl->dequeued.outcount); @@ -2397,8 +2397,8 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) pw_log_trace_fp("%p: dequeue buffer %d size:%"PRIu64, stream, b->id, b->this.size); if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) { - if (ATOMIC_INC(b->busy->count) > 1) { - ATOMIC_DEC(b->busy->count); + if (SPA_ATOMIC_INC(b->busy->count) > 1) { + SPA_ATOMIC_DEC(b->busy->count); queue_push(impl, &impl->dequeued, b); pw_log_trace_fp("%p: buffer busy", stream); errno = EBUSY; @@ -2416,7 +2416,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) int res; if (b->busy) - ATOMIC_DEC(b->busy->count); + SPA_ATOMIC_DEC(b->busy->count); pw_log_trace_fp("%p: queue buffer %d", stream, b->id); if ((res = queue_push(impl, &impl->queued, b)) < 0)