node: add support for transport

Move some things around. Move the duration of the current cycle
to the clock. Also add the estimated next timeout to the clock.
Add a generic media specific counter to the clock.

Clean up the position_bar info. We can do with only a double beat
value and make the signature in floats.

Flesh out the io_position info. This has now the information needed
to convert a raw clock time into a stream time. It basically has
the same kind of features as GStreamer segments such as looping,
variable rate playback etc.. It also contains the state of the
timeline (paused/playing) and it can be used to update the position
and state from clients.

There is also extended information in the position field that
clients can update when they can.

Plugins basically only update the clock info they get (and use
the position info to check if they are slaved or not).

Before each cycle, check if there is a pending position update and
apply it.
This commit is contained in:
Wim Taymans 2019-08-27 14:41:47 +02:00
parent f36daaedea
commit b356c83d32
10 changed files with 168 additions and 60 deletions

@ -1 +1 @@
Subproject commit 42f144e0978c2c785cea9320e7b5e08f161567fb
Subproject commit f8c3126b525eb01d062d1744b0c8cd3d36126407

View file

@ -70,17 +70,31 @@ struct spa_io_range {
uint32_t max_size; /**< maximum size of data */
};
/** A time source. Nodes that can report clocking information will
* receive this. The application sets the id. */
/**
* Absolute time reporting.
*
* Nodes that can report clocking information will receive this io block.
* The application sets the id. This is usually set as part of the
* position information but can also be set separately.
*
* The clock counts the elapsed time according to the clock provider
* since the provider was last started.
*/
struct spa_io_clock {
uint32_t id; /**< unique clock id, set by application */
uint32_t flags; /**< clock flags */
uint64_t nsec; /**< time in nanoseconds */
struct spa_fraction rate; /**< rate for position/delay */
uint64_t count; /**< a media specific counter. Can be used to detect
* gaps in the media. It usually represents the amount
* of processed media units (packets, frames,
* samples, ...) */
struct spa_fraction rate; /**< rate for position/duration/delay */
uint64_t position; /**< current position */
uint64_t duration; /**< duration of current cycle */
int64_t delay; /**< delay between position and hardware,
* positive for capture, negative for playback */
double rate_diff; /**< rate difference between clock and monotonic time */
uint64_t next_nsec; /**< extimated next wakup time in nanoseconds */
};
/** latency reporting */
@ -97,21 +111,17 @@ struct spa_io_sequence {
/** bar and beat position */
struct spa_io_position_bar {
uint32_t size; /**< size of this structure */
uint32_t offset; /**< offset of last bar in samples against current cycle */
struct spa_fraction signature; /**< time signature */
uint32_t offset; /**< offset in samples of this beat */
float signature_num; /**< time signature numerator */
float signature_denom; /**< time signature denominator */
double bpm; /**< beats per minute */
double bar; /**< current bar in quarter notes */
double last_bar; /**< position of last bar in quarter notes */
double cycle_start; /**< cycle start in quarter notes */
double cycle_end; /**< cycle end in quarter notes */
double beat; /**< current beat in position */
uint32_t padding[16];
};
/** video frame position */
struct spa_io_position_video {
uint32_t size; /**< size of this structure */
uint32_t offset; /**< offset of frame against current cycle */
uint32_t offset; /**< offset of frame against current position */
struct spa_fraction framerate;
#define SPA_IO_POSITION_VIDEO_FLAG_DROP_FRAME (1<<0)
#define SPA_IO_POSITION_VIDEO_FLAG_PULL_DOWN (1<<1)
@ -125,18 +135,52 @@ struct spa_io_position_video {
uint32_t padding[16];
};
/** position reporting */
enum spa_io_position_state {
SPA_IO_POSITION_STATE_STOPPED,
SPA_IO_POSITION_STATE_STARTING,
SPA_IO_POSITION_STATE_RUNNING,
SPA_IO_POSITION_STATE_LOOPING,
};
/**
* The position information adds extra meaning to the raw clock times.
*
* It is set on all nodes and the clock id will contain the clock of the
* master node in the graph.
*
* The position is valid when the current clock position is between
* start_position and end_position. The position is then calculated as:
*
* (clock_start - clock.position) * rate + position;
*
* Support for looping is done by specifying an end_position. When the
* clock reaches the end_position, end_position is copied to start_position
* and start_position is incremented with the same duration as the previous
* loop.
*/
struct spa_io_position {
struct spa_io_clock clock; /**< clock position of driver, always valid and
* read only */
uint32_t version; /**< current graph version */
uint32_t size; /**< size of current cycle expressed in clock.rate */
struct spa_fraction rate; /**< overal rate of the graph */
#define SPA_IO_POSITION_FLAG_BAR (1<<0)
#define SPA_IO_POSITION_FLAG_VIDEO (1<<1)
uint64_t flags; /**< flags indicate what fields are valid */
struct spa_io_position_bar bar; /**< when mask & SPA_IO_POSITION_FLAG_BAR*/
struct spa_io_position_video video; /**< when mask & SPA_IO_POSITION_FLAG_VIDEO */
uint32_t flags; /**< extra flags */
uint32_t state; /**< one of enum spa_io_position_state */
uint64_t clock_start; /**< position against clock position when this
* info is active. Can be in the future for
* pending changes. It does not have to be in
* exact multiples of the clock duration. */
uint64_t clock_duration; /**< duration when this info becomes invalid. If
* RUNNING, the state will transition to STOPPED.
* If the state was LOOPING, the clock_start will
* be updated with the duration of this info */
uint64_t position; /**< The position when the clock position == clock_start */
double rate; /**< overal rate of the graph, can be negative for
* backwards time reporting. */
#define SPA_IO_POSITION_VALID_BAR (1<<0)
#define SPA_IO_POSITION_VALID_VIDEO (1<<1)
uint32_t valid; /**< indicates what fields are valid */
struct spa_io_position_bar bar; /**< when mask & SPA_IO_POSITION_VALID_BAR */
struct spa_io_position_video video; /**< when mask & SPA_IO_POSITION_VALID_VIDEO */
};
/** rate matching */

View file

@ -656,11 +656,14 @@ static int update_time(struct state *state, uint64_t nsec, snd_pcm_sframes_t del
state->rate_match->rate = SPA_CLAMP(1.0/corr, 0.95, 1.05);
}
if (!slave && state->clock) {
state->clock->nsec = state->next_time;
state->clock->nsec = nsec;
state->clock->rate = SPA_FRACTION(1, state->rate);
state->clock->position += state->size;
state->clock->delay = state->size * corr;
state->clock->position += state->duration;
state->clock->duration = state->duration;
state->clock->count = state->clock->position;
state->clock->delay = state->duration * corr;
state->clock->rate_diff = corr;
state->clock->next_nsec = state->next_time;
}
spa_log_trace_fp(state->log, "slave:%d %"PRIu64" %f %ld %f %f %d", slave, nsec,
@ -680,9 +683,9 @@ int spa_alsa_write(struct state *state, snd_pcm_uframes_t silence)
snd_pcm_uframes_t written, frames, offset, off, to_write, total_written;
int res;
if (state->position && state->size != state->position->size) {
state->size = state->position->size;
state->threshold = (state->size * state->rate + state->rate_denom-1) / state->rate_denom;
if (state->position && state->duration != state->position->clock.duration) {
state->duration = state->position->clock.duration;
state->threshold = (state->duration * state->rate + state->rate_denom-1) / state->rate_denom;
}
if (state->slaved && state->alsa_started) {
@ -877,21 +880,21 @@ int spa_alsa_read(struct state *state, snd_pcm_uframes_t silence)
int res;
if (state->position && !state->slaved) {
uint64_t position, size;
uint64_t position, duration;
size = state->position->size;
if (state->size != size) {
state->size = size;
state->threshold = (size * state->rate + state->rate_denom-1) / state->rate_denom;
duration = state->position->clock.duration;
if (state->duration != duration) {
state->duration = duration;
state->threshold = (duration * state->rate + state->rate_denom-1) / state->rate_denom;
}
position = state->position->clock.position;
if (state->last_position && state->last_position + state->last_size != position) {
if (state->last_position && state->last_position + state->last_duration != position) {
state->alsa_sync = true;
spa_log_warn(state->log, "discont, resync %"PRIu64" %"PRIu64" %d",
state->last_position, position, state->last_size);
state->last_position, position, state->last_duration);
}
state->last_position = position;
state->last_size = size;
state->last_duration = duration;
}
if (state->slaved && state->alsa_started) {
@ -1031,8 +1034,8 @@ static void alsa_on_timeout_event(struct spa_source *source)
spa_log_warn(state->log, "error reading timerfd: %m");
if (state->position) {
state->size = state->position->size;
state->threshold = (state->size * state->rate + state->rate_denom-1) / state->rate_denom;
state->duration = state->position->clock.duration;
state->threshold = (state->duration * state->rate + state->rate_denom-1) / state->rate_denom;
}
spa_system_clock_gettime(state->data_system, CLOCK_MONOTONIC, &state->now);
@ -1099,15 +1102,15 @@ int spa_alsa_start(struct state *state)
state->slaved = is_slaved(state);
if (state->position) {
state->size = state->position->size;
state->duration = state->position->clock.duration;
state->rate_denom = state->position->clock.rate.denom;
}
else {
state->size = state->props.min_latency;
state->duration = state->props.min_latency;
state->rate_denom = state->rate;
}
state->threshold = (state->size * state->rate + state->rate_denom-1) / state->rate_denom;
state->threshold = (state->duration * state->rate + state->rate_denom-1) / state->rate_denom;
state->last_threshold = state->threshold;
init_loop(state);

View file

@ -128,8 +128,8 @@ struct state {
uint32_t threshold;
uint32_t last_threshold;
uint32_t size;
uint32_t last_size;
uint32_t duration;
uint32_t last_duration;
uint64_t last_position;
unsigned int alsa_started:1;
unsigned int alsa_sync:1;

View file

@ -750,7 +750,7 @@ static int impl_node_process(void *object)
maxsize = db->datas[0].maxsize;
if (this->io_position) {
max = this->io_position->size;
max = this->io_position->clock.duration;
} else {
max = 1024;
}

View file

@ -322,32 +322,30 @@ static void client_process(void *data)
if (this->clock) {
struct spa_io_clock *c = this->clock;
c->nsec = this->client->current_usecs * SPA_NSEC_PER_USEC;
c->count = this->client->current_frames;
c->rate = SPA_FRACTION(1, this->client->frame_rate);
c->position = this->client->current_frames;
c->duration = this->client->buffer_size;
c->delay = 0;
c->rate_diff = 1.0;
c->next_nsec = this->client->next_usecs * SPA_NSEC_PER_USEC;
}
if (this->position) {
jack_position_t *jp = &this->client->pos;
struct spa_io_position *p = this->position;
p->version = 0;
p->size = this->client->buffer_size;
p->rate = SPA_FRACTION(1, this->client->frame_rate);
p->flags = 0;
p->rate = 1.0;
p->valid = 0;
if (jp->valid & JackPositionBBT) {
p->flags |= SPA_IO_POSITION_FLAG_BAR;
p->bar.size = sizeof(struct spa_io_position_bar);
p->valid |= SPA_IO_POSITION_VALID_BAR;
if (jp->valid & JackBBTFrameOffset)
p->bar.offset = jp->bbt_offset;
else
p->bar.offset = 0;
p->bar.signature = SPA_FRACTION(jp->beats_per_bar, jp->beat_type);
p->bar.signature_num = jp->beats_per_bar;
p->bar.signature_denom = jp->beat_type;
p->bar.bpm = jp->beats_per_minute;
p->bar.bar = jp->bar;
p->bar.last_bar = jp->bar;
p->bar.cycle_start = jp->bar;
p->bar.cycle_end = jp->bar;
p->bar.beat = jp->bar * jp->beats_per_bar + jp->beat;
}
}
spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_BUFFER);

View file

@ -1187,6 +1187,11 @@ static int mmap_read(struct impl *this)
this->clock->nsec = pts;
this->clock->rate = port->rate;
this->clock->position = buf.sequence;
this->clock->duration = 1;
this->clock->count = buf.sequence;
this->clock->delay = 0;
this->clock->rate_diff = 1.0;
this->clock->next_nsec = pts + 1000000000LL / port->rate.denom;
}
b = &port->buffers[buf.index];

View file

@ -1093,8 +1093,8 @@ static int collect_nodes(struct pw_node *driver)
}
quantum = SPA_MAX(quantum, MIN_QUANTUM);
if (driver->rt.position && quantum != driver->rt.position->size)
driver->rt.position->size = quantum;
if (driver->rt.position && quantum != driver->rt.position->clock.duration)
driver->rt.position->clock.duration = quantum;
return 0;
}
@ -1161,8 +1161,8 @@ int pw_core_recalc_graph(struct pw_core *core)
spa_list_for_each(n, &core->driver_list, driver_link) {
if (!n->master)
continue;
pw_log_info(NAME" %p: master %p quantum:%d '%s'", core, n,
n->rt.position ? n->rt.position->size : 0, n->name);
pw_log_info(NAME" %p: master %p quantum:%"PRIu64" '%s'", core, n,
n->rt.position ? n->rt.position->clock.duration : 0, n->name);
spa_list_for_each(s, &n->slave_list, slave_link)
pw_log_info(NAME" %p: slave %p: active:%d '%s'",
core, s, s->active, s->name);

View file

@ -962,7 +962,8 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->rt.driver_target.signal = process_node;
this->rt.activation->position.clock.rate = SPA_FRACTION(1, 48000);
this->rt.activation->position.size = DEFAULT_QUANTUM;
this->rt.activation->position.clock.duration = DEFAULT_QUANTUM;
this->rt.activation->position.rate = 1.0;
check_properties(this);
@ -1160,6 +1161,52 @@ static const struct spa_node_events node_events = {
.event = node_event,
};
static void update_position(struct pw_node *node)
{
struct pw_node_activation *a = node->rt.activation;
struct spa_io_position position;
uint32_t seq1, seq2, change_mask;
enum spa_io_position_state state;
seq1 = SEQ_READ(&a->pending.seq);
change_mask = a->pending.change_mask;
state = a->pending.state;
position = a->pending.position;
seq2 = SEQ_READ(&a->pending.seq);
if (SEQ_READ_SUCCESS(seq1, seq2))
a->pending.change_mask = 0;
else
change_mask = 0;
if (change_mask & UPDATE_POSITION) {
pw_log_debug("update position:%lu", position.position);
a->position.position = position.position;
a->position.clock_start = position.clock_start;
a->position.clock_duration = position.clock_duration;
a->position.rate = position.rate;
}
if (change_mask & UPDATE_STATE) {
switch (state) {
case SPA_IO_POSITION_STATE_STOPPED:
a->position.state = state;
break;
case SPA_IO_POSITION_STATE_STARTING:
a->position.state = SPA_IO_POSITION_STATE_RUNNING;
break;
case SPA_IO_POSITION_STATE_RUNNING:
case SPA_IO_POSITION_STATE_LOOPING:
a->position.state = state;
break;
}
}
if (a->position.clock_start == 0)
a->position.clock_start = a->position.clock.position;
if (a->position.state == SPA_IO_POSITION_STATE_STOPPED)
a->position.clock_start += a->position.clock.duration;
}
static int node_ready(void *data, int status)
{
struct pw_node *node = data;
@ -1183,6 +1230,8 @@ static int node_ready(void *data, int status)
t->activation->status = NOT_TRIGGERED;
}
a->prev_signal_time = a->signal_time;
update_position(node);
}
if (node->driver && !node->master)
return 0;

View file

@ -368,6 +368,15 @@ struct pw_node_activation {
uint64_t xrun_time; /* time of last xrun in microseconds */
uint64_t xrun_delay; /* delay of last xrun in microseconds */
uint64_t max_delay; /* max of all xruns in microseconds */
struct {
uint32_t seq;
#define UPDATE_STATE (1<<0)
#define UPDATE_POSITION (1<<1)
uint32_t change_mask;
enum spa_io_position_state state;
struct spa_io_position position;
} pending;
};
#define SEQ_WRITE(s) __atomic_add_fetch((s), 1, __ATOMIC_SEQ_CST)