stream: update to latest API

This commit is contained in:
Wim Taymans 2018-07-19 16:33:00 +02:00
parent 061f2c82b5
commit 9e0bce4cb7
3 changed files with 36 additions and 7 deletions

View file

@ -51,11 +51,11 @@ gst_pipewire_clock_get_internal_time (GstClock * clock)
pw_stream_get_time (pclock->stream, &t);
if (t.rate.denom)
result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.num, t.rate.denom);
result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.denom, t.rate.num);
else
result = GST_CLOCK_TIME_NONE;
GST_DEBUG ("%"PRId64", %d %"PRId64, t.ticks, t.rate.denom, result);
GST_DEBUG ("%"PRId64", %d/%d %"PRId64, t.ticks, t.rate.num, t.rate.denom, result);
return result;
}

View file

@ -68,6 +68,8 @@ struct buffer {
struct queue {
uint32_t ids[MAX_BUFFERS];
struct spa_ringbuffer ring;
uint64_t incount;
uint64_t outcount;
};
struct stream {
@ -114,7 +116,6 @@ struct stream {
struct buffer buffers[MAX_BUFFERS];
int n_buffers;
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
@ -270,8 +271,10 @@ static inline int push_queue(struct stream *stream, struct queue *queue, struct
if (SPA_FLAG_CHECK(buffer->flags, BUFFER_FLAG_QUEUED))
return -EINVAL;
filled = spa_ringbuffer_get_write_index(&queue->ring, &index);
SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED);
queue->incount += buffer->buffer.size;
filled = spa_ringbuffer_get_write_index(&queue->ring, &index);
queue->ids[index & MASK_BUFFERS] = buffer->id;
spa_ringbuffer_write_update(&queue->ring, index + 1);
@ -293,6 +296,7 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu
spa_ringbuffer_read_update(&queue->ring, index + 1);
buffer = &stream->buffers[id];
queue->outcount += buffer->buffer.size;
SPA_FLAG_UNSET(buffer->flags, BUFFER_FLAG_QUEUED);
pw_log_trace("stream %p: dequeued buffer %d %d", stream, id, avail);
@ -1359,6 +1363,11 @@ int pw_stream_set_active(struct pw_stream *stream, bool active)
return 0;
}
static inline int64_t get_queue_size(struct queue *queue)
{
return (int64_t)(queue->incount - queue->outcount);
}
int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
@ -1370,8 +1379,15 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
elapsed = (time->now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC;
time->rate.num = 1;
time->rate.denom = impl->last_rate;
time->rate.num = impl->last_rate;
time->rate.denom = 1;
time->delay = 0;
if (impl->direction == SPA_DIRECTION_INPUT)
time->queued = get_queue_size(&impl->dequeue);
else
time->queued = get_queue_size(&impl->queue);
pw_log_trace("%ld %d/%d %ld", time->ticks, time->rate.num, time->rate.denom, time->queued);
return 0;
}

View file

@ -166,6 +166,11 @@ enum pw_stream_state {
struct pw_buffer {
struct spa_buffer *buffer; /* the spa buffer */
void *user_data; /* user data attached to the buffer */
uint64_t size; /* For input streams, this field is set by pw_stream
with the duration of the buffer in ticks.
For output streams, this field is set by the user.
This field is added for all queued buffers and
returned in the time info. */
};
/** Events for a stream */
@ -308,9 +313,17 @@ int pw_stream_set_active(struct pw_stream *stream, bool active);
/** A time structure \memberof pw_stream */
struct pw_time {
int64_t now; /**< the monotonic time */
int64_t ticks; /**< the ticks at \a now */
struct spa_fraction rate; /**< the rate of \a ticks */
uint64_t ticks; /**< the ticks at \a now. This is the current time that
the remote end is reading/writing. */
uint64_t delay; /**< delay to device, add to ticks for INPUT streams and
subtract from ticks for OUTPUT streams to get the
time of the device. */
uint64_t queued; /**< data queued in the stream, this is the sum
of the size fields in the pw_buffer that are
currently queued */
};
/** Query the time on the stream \memberof pw_stream */
int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time);