diff --git a/src/pipewire-jack.c b/src/pipewire-jack.c index 0d1c071e1..c59396d66 100644 --- a/src/pipewire-jack.c +++ b/src/pipewire-jack.c @@ -287,6 +287,9 @@ struct client { unsigned int started:1; unsigned int active:1; unsigned int destroyed:1; + + jack_position_t jack_position; + jack_transport_state_t jack_state; }; static void init_port_pool(struct client *c, enum spa_direction direction) @@ -497,15 +500,6 @@ jack_get_version_string(void) return "0.0.0.0"; } -static void on_sync_reply(void *data, uint32_t id, int seq) -{ - struct client *client = data; - if (id != 0) - return; - client->last_sync = seq; - pw_thread_loop_signal(client->context.loop, false); -} - static void on_state_changed(void *data, enum pw_remote_state old, enum pw_remote_state state, const char *error) { @@ -535,6 +529,15 @@ static const struct pw_remote_events remote_events = { .state_changed = on_state_changed, }; +static void on_sync_reply(void *data, uint32_t id, int seq) +{ + struct client *client = data; + if (id != 0) + return; + client->last_sync = seq; + pw_thread_loop_signal(client->context.loop, false); +} + static const struct pw_core_proxy_events core_events = { PW_VERSION_CORE_EVENTS, .done = on_sync_reply, @@ -708,10 +711,9 @@ static inline void debug_position(struct client *c, jack_position_t *p) static inline void jack_to_position(jack_position_t *s, struct pw_node_activation *a) { - struct spa_io_segment *d = &a->pending.segment; + struct spa_io_segment *d = &a->segment; if (s->valid & JackPositionBBT) { - SEQ_WRITE(a->pending.seq); d->bar.flags = SPA_IO_SEGMENT_BAR_FLAG_VALID; if (s->valid & JackBBTFrameOffset) d->bar.offset = s->bbt_offset; @@ -722,12 +724,12 @@ static inline void jack_to_position(jack_position_t *s, struct pw_node_activatio d->bar.bpm = s->beats_per_minute; d->bar.beat = (s->bar - 1) * s->beats_per_bar + (s->beat - 1) + (s->tick / s->ticks_per_beat); - SEQ_WRITE(a->pending.seq); } } -static inline jack_transport_state_t position_to_jack(struct spa_io_position *s, jack_position_t *d) +static inline jack_transport_state_t position_to_jack(struct pw_node_activation *a, jack_position_t *d) { + struct spa_io_position *s = &a->position; jack_transport_state_t state; struct spa_io_segment *seg = &s->segments[0]; uint64_t running; @@ -764,7 +766,7 @@ static inline jack_transport_state_t position_to_jack(struct spa_io_position *s, d->frame = seg->position; d->valid = 0; - if (seg->bar.owner && SPA_FLAG_CHECK(seg->bar.flags, SPA_IO_SEGMENT_BAR_FLAG_VALID)) { + if (a->segment_owner[0] && SPA_FLAG_CHECK(seg->bar.flags, SPA_IO_SEGMENT_BAR_FLAG_VALID)) { double abs_beat; long beats; @@ -794,13 +796,145 @@ static inline jack_transport_state_t position_to_jack(struct spa_io_position *s, return state; } +static inline int wait_sync(struct client *c) +{ + uint64_t cmd, nsec; + int fd = c->socket_source->fd; + struct spa_io_position *pos = c->position; + struct pw_node_activation *activation = c->activation; + + /* this is blocking if nothing ready */ + if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) + pw_log_warn(NAME" %p: read failed %m", c); + if (cmd > 1) + pw_log_warn(NAME" %p: missed %"PRIu64" wakeups", c, cmd - 1); + + if (pos == NULL) { + pw_log_error(NAME" %p: missing position", c); + return -1; + } + + nsec = pos->clock.nsec; + activation->status = PW_NODE_ACTIVATION_AWAKE; + activation->awake_time = nsec; + + return 0; +} + +static inline uint32_t cycle_wait(struct client *c) +{ + uint32_t buffer_size, sample_rate; + struct spa_io_position *pos = c->position; + struct pw_node_activation *activation = c->activation; + struct pw_node_activation *driver = c->driver_activation; + + if (wait_sync(c) < 0) + return 0; + + buffer_size = pos->clock.duration; + if (buffer_size != c->buffer_size) { + pw_log_info(NAME" %p: buffersize %d", c, buffer_size); + c->buffer_size = buffer_size; + if (c->bufsize_callback) + c->bufsize_callback(c->buffer_size, c->bufsize_arg); + } + + sample_rate = pos->clock.rate.denom; + if (sample_rate != c->sample_rate) { + pw_log_info(NAME" %p: sample_rate %d", c, sample_rate); + c->sample_rate = sample_rate; + if (c->srate_callback) + c->srate_callback(c->sample_rate, c->srate_arg); + } + + c->jack_state = position_to_jack(driver, &c->jack_position); + + if (driver) { + if (activation->pending_sync) { + if (c->sync_callback == NULL || + c->sync_callback(c->jack_state, &c->jack_position, c->sync_arg)) + activation->pending_sync = false; + } + if (c->xrun_count != driver->xrun_count && + c->xrun_count != 0 && c->xrun_callback) + c->xrun_callback(c->xrun_arg); + c->xrun_count = driver->xrun_count; + } + pw_log_trace(NAME" %p: wait %"PRIu64" %d %d %d %"PRIi64" %f", c, + activation->awake_time, c->buffer_size, c->sample_rate, + c->jack_position.frame, pos->clock.delay, pos->clock.rate_diff); + + return buffer_size; +} + +static inline void signal_sync(struct client *c) +{ + struct timespec ts; + uint64_t cmd, nsec; + struct link *l; + struct pw_node_activation *activation = c->activation; + + process_tee(c); + + clock_gettime(CLOCK_MONOTONIC, &ts); + nsec = SPA_TIMESPEC_TO_NSEC(&ts); + activation->status = PW_NODE_ACTIVATION_FINISHED; + activation->finish_time = nsec; + + cmd = 1; + pw_array_for_each(l, &c->links) { + struct pw_node_activation_state *state; + + if (l->activation == NULL) + continue; + + state = &l->activation->state[0]; + + pw_log_trace(NAME" %p: link %p %p %d/%d", c, l, state, + state->pending, state->required); + + if (pw_node_activation_state_dec(state, 1)) { + l->activation->status = PW_NODE_ACTIVATION_TRIGGERED; + l->activation->signal_time = nsec; + + pw_log_trace(NAME" %p: signal %p %p", c, l, state); + + if (write(l->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd)) + pw_log_warn(NAME" %p: write failed %m", c); + } + } +} + +static inline void cycle_signal(struct client *c, int status) +{ + struct pw_node_activation *driver = c->driver_activation; + struct pw_node_activation *activation = c->activation; + + if (status == 0) { + if (c->timebase_callback && driver && driver->segment_owner[0] == c->node_id) { + if (activation->pending_new_pos || + c->jack_state == JackTransportRolling || + c->jack_state == JackTransportLooping) { + c->timebase_callback(c->jack_state, + c->buffer_size, + &c->jack_position, + activation->pending_new_pos, + c->timebase_arg); + + activation->pending_new_pos = false; + + debug_position(c, &c->jack_position); + jack_to_position(&c->jack_position, activation); + } + } + } + signal_sync(c); +} + static void on_rtsocket_condition(void *data, int fd, uint32_t mask) { struct client *c = data; - struct timespec ts; - jack_position_t jack_position; - jack_transport_state_t jack_state; if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_warn(NAME" %p: got error", c); @@ -809,109 +943,14 @@ on_rtsocket_condition(void *data, int fd, uint32_t mask) } if (mask & SPA_IO_IN) { - uint64_t cmd, nsec; - uint32_t buffer_size, sample_rate; - struct link *l; - struct spa_io_position *pos = c->position; - struct pw_node_activation *activation = c->activation; - struct pw_node_activation *driver = c->driver_activation; + uint32_t buffer_size; + int status; - if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) - pw_log_warn(NAME" %p: read failed %m", c); - if (cmd > 1) - pw_log_warn(NAME" %p: missed %"PRIu64" wakeups", c, cmd - 1); + buffer_size = cycle_wait(c); - if (pos == NULL) { - pw_log_error(NAME" %p: missing position", c); - return; - } + status = c->process_callback ? c->process_callback(buffer_size, c->process_arg) : 0; - nsec = pos->clock.nsec; - activation->status = PW_NODE_ACTIVATION_AWAKE; - activation->awake_time = nsec; - - buffer_size = pos->clock.duration; - if (buffer_size != c->buffer_size) { - pw_log_info(NAME" %p: buffersize %d", c, buffer_size); - c->buffer_size = buffer_size; - if (c->bufsize_callback) - c->bufsize_callback(c->buffer_size, c->bufsize_arg); - } - - sample_rate = pos->clock.rate.denom; - if (sample_rate != c->sample_rate) { - pw_log_info(NAME" %p: sample_rate %d", c, sample_rate); - c->sample_rate = sample_rate; - if (c->srate_callback) - c->srate_callback(c->sample_rate, c->srate_arg); - } - - jack_state = position_to_jack(pos, &jack_position); - - if (driver) { - if (activation->pending_sync) { - if (c->sync_callback == NULL || - c->sync_callback(jack_state, &jack_position, c->sync_arg)) - activation->pending_sync = false; - } - if (c->xrun_count != driver->xrun_count && - c->xrun_count != 0 && c->xrun_callback) - c->xrun_callback(c->xrun_arg); - c->xrun_count = driver->xrun_count; - } - - pw_log_trace(NAME" %p: do process %"PRIu64" %d %d %d %"PRIi64" %f", c, - nsec, c->buffer_size, c->sample_rate, - jack_position.frame, pos->clock.delay, pos->clock.rate_diff); - - if (c->process_callback) - c->process_callback(c->buffer_size, c->process_arg); - - if (c->timebase_callback && driver && driver->pending.segment.bar.owner == c->node_id) { - if (activation->pending_new_pos || - jack_state == JackTransportRolling || - jack_state == JackTransportLooping) { - c->timebase_callback(jack_state, - buffer_size, - &jack_position, - activation->pending_new_pos, - c->timebase_arg); - - activation->pending_new_pos = false; - - debug_position(c, &jack_position); - jack_to_position(&jack_position, driver); - } - } - process_tee(c); - - clock_gettime(CLOCK_MONOTONIC, &ts); - nsec = SPA_TIMESPEC_TO_NSEC(&ts); - activation->status = PW_NODE_ACTIVATION_FINISHED; - activation->finish_time = nsec; - - cmd = 1; - pw_array_for_each(l, &c->links) { - struct pw_node_activation_state *state; - - if (l->activation == NULL) - continue; - - state = &l->activation->state[0]; - - pw_log_trace(NAME" %p: link %p %p %d/%d", c, l, state, - state->pending, state->required); - - if (pw_node_activation_state_dec(state, 1)) { - l->activation->status = PW_NODE_ACTIVATION_TRIGGERED; - l->activation->signal_time = nsec; - - pw_log_trace(NAME" %p: signal %p %p", c, l, state); - - if (write(l->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd)) - pw_log_warn(NAME" %p: write failed %m", c); - } - } + cycle_signal(c, status); } } @@ -2151,21 +2190,22 @@ int jack_is_realtime (jack_client_t *client) SPA_EXPORT jack_nframes_t jack_thread_wait (jack_client_t *client, int status) { - pw_log_warn(NAME" %p: not implemented %d", client, status); + pw_log_error(NAME" %p: jack_thread_wait: deprecated, use jack_cycle_wait/jack_cycle_signal", client); return 0; } SPA_EXPORT jack_nframes_t jack_cycle_wait (jack_client_t* client) { - pw_log_warn(NAME" %p: not implemented", client); - return 0; + struct client *c = (struct client *) client; + return cycle_wait(c); } SPA_EXPORT void jack_cycle_signal (jack_client_t* client, int status) { - pw_log_warn(NAME" %p: not implemented %d", client, status); + struct client *c = (struct client *) client; + cycle_signal(c, status); } SPA_EXPORT @@ -3366,7 +3406,7 @@ int jack_release_timebase (jack_client_t *client) if (a == NULL) return -EIO; - if (!ATOMIC_CAS(a->pending.segment.bar.owner, c->node_id, 0)) + if (!ATOMIC_CAS(a->segment_owner[0], c->node_id, 0)) return -EINVAL; c->timebase_callback = NULL; @@ -3423,15 +3463,15 @@ int jack_set_timebase_callback (jack_client_t *client, return -EIO; /* was ok */ - if (ATOMIC_LOAD(a->pending.segment.bar.owner) == c->node_id) + if (ATOMIC_LOAD(a->segment_owner[0]) == c->node_id) return 0; /* try to become master */ if (conditional) { - if (!ATOMIC_CAS(a->pending.segment.bar.owner, 0, c->node_id)) + if (!ATOMIC_CAS(a->segment_owner[0], 0, c->node_id)) return -EBUSY; } else { - ATOMIC_STORE(a->pending.segment.bar.owner, c->node_id); + ATOMIC_STORE(a->segment_owner[0], c->node_id); } c->timebase_callback = timebase_callback; @@ -3464,7 +3504,7 @@ jack_transport_state_t jack_transport_query (const jack_client_t *client, jack_transport_state_t jack_state = JackTransportStopped; if (a != NULL) - jack_state = position_to_jack(&a->position, pos); + jack_state = position_to_jack(a, pos); else if (pos != NULL) memset(pos, 0, sizeof(jack_position_t)); @@ -3502,25 +3542,20 @@ int jack_transport_reposition (jack_client_t *client, { struct client *c = (struct client *) client; struct pw_node_activation *a = c->driver_activation; - uint32_t seq1, seq2; - if (!a) + struct pw_node_activation *na = c->activation; + if (!a || !na) return -EIO; if (pos->valid & ~(JackPositionBBT|JackPositionTimecode)) return -EINVAL; pw_log_debug("frame:%u", pos->frame); - - do { - seq1 = SEQ_WRITE(a->pending.seq); - a->pending.change_mask |= PW_NODE_ACTIVATION_UPDATE_REPOSITION; - a->pending.reposition.flags = 0; - a->pending.reposition.start = 0; - a->pending.reposition.duration = 0; - a->pending.reposition.position = pos->frame; - a->pending.reposition.rate = 1.0; - seq2 = SEQ_WRITE(a->pending.seq); - } while (!SEQ_WRITE_SUCCESS(seq1, seq2)); + na->reposition.flags = 0; + na->reposition.start = 0; + na->reposition.duration = 0; + na->reposition.position = pos->frame; + na->reposition.rate = 1.0; + ATOMIC_STORE(a->reposition_owner, c->node_id); return 0; } @@ -3528,17 +3563,9 @@ int jack_transport_reposition (jack_client_t *client, static void update_command(struct client *c, uint32_t command) { struct pw_node_activation *a = c->driver_activation; - uint32_t seq1, seq2; - if (!a) return; - - do { - seq1 = SEQ_WRITE(a->pending.seq); - a->pending.change_mask |= PW_NODE_ACTIVATION_UPDATE_COMMAND; - a->pending.command = command; - seq2 = SEQ_WRITE(a->pending.seq); - } while (!SEQ_WRITE_SUCCESS(seq1, seq2)); + ATOMIC_STORE(a->command, command); } SPA_EXPORT