gst: fix clock handling again

Request periodic clock updates.
Interpollate clock times in the gstreamer clock
Make sure we don't read the clock after shutdown.
This commit is contained in:
Wim Taymans 2018-08-15 15:51:35 +02:00
parent 98f54c4135
commit fb76b65dfb
3 changed files with 29 additions and 37 deletions

View file

@ -48,15 +48,19 @@ gst_pipewire_clock_get_internal_time (GstClock * clock)
GstPipeWireClock *pclock = (GstPipeWireClock *) clock; GstPipeWireClock *pclock = (GstPipeWireClock *) clock;
GstClockTime result; GstClockTime result;
struct pw_time t; struct pw_time t;
struct timespec ts;
if (pclock->stream == NULL || if (pclock->stream == NULL ||
pw_stream_get_time (pclock->stream, &t) < 0 || pw_stream_get_time (pclock->stream, &t) < 0 ||
t.rate.num == 0) t.rate.denom == 0)
return pclock->last_time; return pclock->last_time;
result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.denom, t.rate.num); result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.num, t.rate.denom);
clock_gettime(CLOCK_MONOTONIC, &ts);
result += SPA_TIMESPEC_TO_TIME(&ts) - t.now;
GST_DEBUG ("%"PRId64", %d/%d %"PRId64, t.ticks, t.rate.num, t.rate.denom, result); GST_DEBUG ("%"PRId64", %"PRId64" %d/%d %"PRId64,
t.ticks, GST_SECOND, t.rate.num, t.rate.denom, result);
return result; return result;
} }

View file

@ -1065,12 +1065,6 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
pw_thread_loop_stop (pwsrc->main_loop); pw_thread_loop_stop (pwsrc->main_loop);
pw_stream_destroy (pwsrc->stream);
pwsrc->stream = NULL;
pw_remote_destroy (pwsrc->remote);
pwsrc->remote = NULL;
pwsrc->last_time = gst_clock_get_time (pwsrc->clock); pwsrc->last_time = gst_clock_get_time (pwsrc->clock);
gst_element_post_message (GST_ELEMENT (pwsrc), gst_element_post_message (GST_ELEMENT (pwsrc),
@ -1080,6 +1074,13 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
GST_PIPEWIRE_CLOCK (pwsrc->clock)->stream = NULL; GST_PIPEWIRE_CLOCK (pwsrc->clock)->stream = NULL;
g_clear_object (&pwsrc->clock); g_clear_object (&pwsrc->clock);
GST_OBJECT_UNLOCK (pwsrc); GST_OBJECT_UNLOCK (pwsrc);
pw_stream_destroy (pwsrc->stream);
pwsrc->stream = NULL;
pw_remote_destroy (pwsrc->remote);
pwsrc->remote = NULL;
} }
static GstStateChangeReturn static GstStateChangeReturn

View file

@ -114,9 +114,7 @@ struct stream {
struct buffer buffers[MAX_BUFFERS]; struct buffer buffers[MAX_BUFFERS];
int n_buffers; int n_buffers;
int64_t last_ticks; struct pw_time last_time;
int32_t last_rate;
int64_t last_monotonic;
}; };
/** \endcond */ /** \endcond */
@ -468,10 +466,6 @@ do_remove_sources(struct spa_loop *loop,
pw_loop_destroy_source(stream->remote->core->data_loop, impl->rtsocket_source); pw_loop_destroy_source(stream->remote->core->data_loop, impl->rtsocket_source);
impl->rtsocket_source = NULL; impl->rtsocket_source = NULL;
} }
if (impl->timeout_source) {
pw_loop_destroy_source(stream->remote->core->data_loop, impl->timeout_source);
impl->timeout_source = NULL;
}
if (impl->rtwritefd != -1) { if (impl->rtwritefd != -1) {
close(impl->rtwritefd); close(impl->rtwritefd);
impl->rtwritefd = -1; impl->rtwritefd = -1;
@ -483,6 +477,10 @@ static void unhandle_socket(struct pw_stream *stream)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (impl->timeout_source) {
pw_loop_destroy_source(stream->remote->core->main_loop, impl->timeout_source);
impl->timeout_source = NULL;
}
pw_loop_invoke(stream->remote->core->data_loop, pw_loop_invoke(stream->remote->core->data_loop,
do_remove_sources, 1, NULL, 0, true, impl); do_remove_sources, 1, NULL, 0, true, impl);
} }
@ -668,7 +666,6 @@ static void do_node_init(struct pw_stream *stream)
pw_client_node_proxy_set_active(impl->node_proxy, true); pw_client_node_proxy_set_active(impl->node_proxy, true);
} }
#if 0
static void add_request_clock_update(struct pw_stream *stream) static void add_request_clock_update(struct pw_stream *stream)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
@ -686,7 +683,6 @@ static void on_timeout(void *data, uint64_t expirations)
struct pw_stream *stream = data; struct pw_stream *stream = data;
add_request_clock_update(stream); add_request_clock_update(stream);
} }
#endif
static inline void reuse_buffer(struct pw_stream *stream, uint32_t id) static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
{ {
@ -847,6 +843,7 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask)
static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd) static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct timespec interval;
impl->rtwritefd = rtwritefd; impl->rtwritefd = rtwritefd;
impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop, impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop,
@ -854,13 +851,10 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
SPA_IO_ERR | SPA_IO_HUP, SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, stream); true, on_rtsocket_condition, stream);
#if 0
struct timespec interval;
impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream); impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream);
interval.tv_sec = 0; interval.tv_sec = 0;
interval.tv_nsec = 100000000; interval.tv_nsec = 100000000;
pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false); pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false);
#endif
return; return;
} }
@ -925,11 +919,13 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma
PW_STREAM_PROP_LATENCY_MIN, "%" PRId64, PW_STREAM_PROP_LATENCY_MIN, "%" PRId64,
cu->body.latency.value); cu->body.latency.value);
} }
impl->last_ticks = cu->body.ticks.value; impl->last_time.now = cu->body.monotonic_time.value;
impl->last_rate = cu->body.rate.value; impl->last_time.ticks = cu->body.ticks.value;
impl->last_monotonic = cu->body.monotonic_time.value; impl->last_time.rate.num = 1;
pw_log_debug("clock update %ld %d %ld", impl->last_ticks, impl->last_time.rate.denom = cu->body.rate.value;
impl->last_rate, impl->last_monotonic); impl->last_time.delay = 0;
pw_log_debug("clock update %ld %d %ld", impl->last_time.ticks,
impl->last_time.rate.denom, impl->last_time.now);
} else { } else {
pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command));
add_async_complete(stream, seq, -ENOTSUP); add_async_complete(stream, seq, -ENOTSUP);
@ -1366,20 +1362,11 @@ static inline int64_t get_queue_size(struct queue *queue)
int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int64_t elapsed;
struct timespec ts;
if (impl->last_rate == 0) if (impl->last_time.rate.denom == 0)
return -EAGAIN; return -EAGAIN;
clock_gettime(CLOCK_MONOTONIC, &ts); *time = impl->last_time;
time->now = SPA_TIMESPEC_TO_TIME(&ts);
elapsed = (time->now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC;
time->rate.num = impl->last_rate;
time->rate.denom = 1;
time->delay = 0;
if (impl->direction == SPA_DIRECTION_INPUT) if (impl->direction == SPA_DIRECTION_INPUT)
time->queued = get_queue_size(&impl->dequeue); time->queued = get_queue_size(&impl->dequeue);
else else