spa: add atomic.h and port macros to it

This commit is contained in:
Wim Taymans 2023-07-13 12:25:22 +02:00
parent e86a770349
commit 0501ef165a
9 changed files with 76 additions and 66 deletions

View file

@ -17,6 +17,7 @@
#include <spa/support/plugin.h>
#include <spa/support/plugin-loader.h>
#include <spa/node/utils.h>
#include <spa/utils/atomic.h>
#include <spa/utils/names.h>
#include <spa/utils/string.h>
#include <spa/debug/types.h>
@ -1534,10 +1535,10 @@ again:
pw_log_debug("%p: apply duration:%"PRIu64" rate:%u/%u", context,
n->target_quantum, n->target_rate.num,
n->target_rate.denom);
SEQ_WRITE(n->rt.position->clock.target_seq);
SPA_SEQ_WRITE(n->rt.position->clock.target_seq);
n->rt.position->clock.target_duration = n->target_quantum;
n->rt.position->clock.target_rate = n->target_rate;
SEQ_WRITE(n->rt.position->clock.target_seq);
SPA_SEQ_WRITE(n->rt.position->clock.target_seq);
if (n->info.state < PW_NODE_STATE_RUNNING) {
n->rt.position->clock.duration = n->target_quantum;

View file

@ -846,8 +846,8 @@ do_move_nodes(struct spa_loop *loop,
static void remove_segment_owner(struct pw_impl_node *driver, uint32_t node_id)
{
struct pw_node_activation *a = driver->rt.target.activation;
ATOMIC_CAS(a->segment_owner[0], node_id, 0);
ATOMIC_CAS(a->segment_owner[1], node_id, 0);
SPA_ATOMIC_CAS(a->segment_owner[0], node_id, 0);
SPA_ATOMIC_CAS(a->segment_owner[1], node_id, 0);
}
SPA_EXPORT
@ -1661,8 +1661,8 @@ static inline int check_updates(struct pw_impl_node *node, uint32_t *reposition_
if (SPA_UNLIKELY(a->position.offset == INT64_MIN))
a->position.offset = a->position.clock.position;
command = ATOMIC_XCHG(a->command, PW_NODE_ACTIVATION_COMMAND_NONE);
*reposition_owner = ATOMIC_XCHG(a->reposition_owner, 0);
command = SPA_ATOMIC_XCHG(a->command, PW_NODE_ACTIVATION_COMMAND_NONE);
*reposition_owner = SPA_ATOMIC_XCHG(a->reposition_owner, 0);
if (SPA_UNLIKELY(command != PW_NODE_ACTIVATION_COMMAND_NONE)) {
pw_log_debug("%p: update command:%u", node, command);
@ -1790,8 +1790,8 @@ static int node_ready(void *data, int status)
}
sync_type = check_updates(node, &reposition_owner);
owner[0] = ATOMIC_LOAD(a->segment_owner[0]);
owner[1] = ATOMIC_LOAD(a->segment_owner[1]);
owner[0] = SPA_ATOMIC_LOAD(a->segment_owner[0]);
owner[1] = SPA_ATOMIC_LOAD(a->segment_owner[1]);
again:
all_ready = sync_type == SYNC_CHECK;
update_sync = !all_ready;

View file

@ -17,6 +17,7 @@ extern "C" {
#include <spa/support/plugin.h>
#include <spa/pod/builder.h>
#include <spa/param/latency-utils.h>
#include <spa/utils/atomic.h>
#include <spa/utils/ratelimit.h>
#include <spa/utils/result.h>
#include <spa/utils/type-info.h>
@ -549,7 +550,7 @@ static inline void pw_node_activation_state_reset(struct pw_node_activation_stat
state->pending = state->required;
}
#define pw_node_activation_state_dec(s) (ATOMIC_DEC(s->pending) == 0)
#define pw_node_activation_state_dec(s) (SPA_ATOMIC_DEC(s->pending) == 0)
struct pw_node_target {
struct spa_list link;
@ -631,25 +632,6 @@ struct pw_node_activation {
* to update wins */
};
#define ATOMIC_CAS(v,ov,nv) \
({ \
__typeof__(v) __ov = (ov); \
__atomic_compare_exchange_n(&(v), &__ov, (nv), \
0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST); \
})
#define ATOMIC_DEC(s) __atomic_sub_fetch(&(s), 1, __ATOMIC_SEQ_CST)
#define ATOMIC_INC(s) __atomic_add_fetch(&(s), 1, __ATOMIC_SEQ_CST)
#define ATOMIC_LOAD(s) __atomic_load_n(&(s), __ATOMIC_SEQ_CST)
#define ATOMIC_STORE(s,v) __atomic_store_n(&(s), (v), __ATOMIC_SEQ_CST)
#define ATOMIC_XCHG(s,v) __atomic_exchange_n(&(s), (v), __ATOMIC_SEQ_CST)
#define SEQ_WRITE(s) ATOMIC_INC(s)
#define SEQ_WRITE_SUCCESS(s1,s2) ((s1) + 1 == (s2) && ((s2) & 1) == 0)
#define SEQ_READ(s) ATOMIC_LOAD(s)
#define SEQ_READ_SUCCESS(s1,s2) ((s1) == (s2) && ((s2) & 1) == 0)
#define pw_impl_node_emit(o,m,v,...) spa_hook_list_call(&o->listener_list, struct pw_impl_node_events, m, v, ##__VA_ARGS__)
#define pw_impl_node_emit_destroy(n) pw_impl_node_emit(n, destroy, 0)
#define pw_impl_node_emit_free(n) pw_impl_node_emit(n, free, 0)

View file

@ -621,7 +621,7 @@ static inline void copy_position(struct stream *impl, int64_t queued)
{
struct spa_io_position *p = impl->rt.position;
SEQ_WRITE(impl->seq);
SPA_SEQ_WRITE(impl->seq);
if (SPA_LIKELY(p != NULL)) {
impl->time.now = p->clock.nsec;
impl->time.rate = p->clock.rate;
@ -636,7 +636,7 @@ static inline void copy_position(struct stream *impl, int64_t queued)
}
if (SPA_LIKELY(impl->rate_match != NULL))
impl->rate_queued = impl->rate_match->delay;
SEQ_WRITE(impl->seq);
SPA_SEQ_WRITE(impl->seq);
}
static int impl_send_command(void *object, const struct spa_command *command)
@ -859,7 +859,7 @@ static void clear_buffers(struct pw_stream *stream)
while ((b = queue_pop(impl, &impl->dequeued))) {
if (b->busy)
ATOMIC_DEC(b->busy->count);
SPA_ATOMIC_DEC(b->busy->count);
}
} else
clear_queue(impl, &impl->dequeued);
@ -1039,7 +1039,7 @@ static int impl_node_process_input(void *object)
pw_log_trace_fp("%p: push %d %p", stream, b->id, io);
if (queue_push(impl, &impl->dequeued, b) == 0) {
if (b->busy)
ATOMIC_INC(b->busy->count);
SPA_ATOMIC_INC(b->busy->count);
}
}
if (!queue_is_empty(impl, &impl->dequeued)) {
@ -2340,12 +2340,12 @@ int pw_stream_get_time_n(struct pw_stream *stream, struct pw_time *time, size_t
uint32_t buffered, quantum, index;
do {
seq1 = SEQ_READ(impl->seq);
seq1 = SPA_SEQ_READ(impl->seq);
memcpy(time, &impl->time, SPA_MIN(size, sizeof(struct pw_time)));
buffered = impl->rate_queued;
quantum = impl->quantum;
seq2 = SEQ_READ(impl->seq);
} while (!SEQ_READ_SUCCESS(seq1, seq2));
seq2 = SPA_SEQ_READ(impl->seq);
} while (!SPA_SEQ_READ_SUCCESS(seq1, seq2));
if (impl->direction == SPA_DIRECTION_INPUT)
time->queued = (int64_t)(time->queued - impl->dequeued.outcount);
@ -2397,8 +2397,8 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
pw_log_trace_fp("%p: dequeue buffer %d size:%"PRIu64, stream, b->id, b->this.size);
if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) {
if (ATOMIC_INC(b->busy->count) > 1) {
ATOMIC_DEC(b->busy->count);
if (SPA_ATOMIC_INC(b->busy->count) > 1) {
SPA_ATOMIC_DEC(b->busy->count);
queue_push(impl, &impl->dequeued, b);
pw_log_trace_fp("%p: buffer busy", stream);
errno = EBUSY;
@ -2416,7 +2416,7 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
int res;
if (b->busy)
ATOMIC_DEC(b->busy->count);
SPA_ATOMIC_DEC(b->busy->count);
pw_log_trace_fp("%p: queue buffer %d", stream, b->id);
if ((res = queue_push(impl, &impl->queued, b)) < 0)