pinossrc: fix timestamp and clock handling

Send the base-time to the client so that it can calibrarte the clock.
Wait until we are completely negotiated before completing the state
change to paused. This makes it possible to provide a clock and decide
if we are live or not.
This commit is contained in:
Wim Taymans 2016-04-28 16:42:25 +02:00
parent f3053c963d
commit b86eb22922
4 changed files with 77 additions and 34 deletions

View file

@ -332,6 +332,9 @@ gst_pinos_socket_sink_render_other (GstPinosSocketSink * this, GstBuffer * buffe
p.size = fdmem->size;
pinos_buffer_builder_add_fd_payload (&builder, &p);
GST_LOG ("send %d %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT" %"G_GUINT64_FORMAT,
p.id, hdr.pts, GST_BUFFER_PTS (buffer), GST_ELEMENT_CAST (this)->base_time);
pinos_buffer_builder_end (&builder, &pbuf);
gst_memory_unref(fdmem);
fdmem = NULL;

View file

@ -263,7 +263,6 @@ gst_pinos_src_init (GstPinosSrc * src)
{
/* we operate in time */
gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (src), TRUE);
GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
@ -380,10 +379,10 @@ on_new_buffer (GObject *gobject,
if (buf == NULL)
buf = gst_buffer_new ();
if (GST_CLOCK_TIME_IS_VALID (hdr.pts)) {
if (hdr.pts > GST_ELEMENT_CAST (pinossrc)->base_time)
GST_BUFFER_PTS (buf) = hdr.pts - GST_ELEMENT_CAST (pinossrc)->base_time;
GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT "\n", hdr.pts, hdr.dts_offset);
if (GST_CLOCK_TIME_IS_VALID (hdr.pts)) {
GST_BUFFER_PTS (buf) = hdr.pts;
if (GST_BUFFER_PTS (buf) + hdr.dts_offset > 0)
GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf) + hdr.dts_offset;
}
@ -492,27 +491,30 @@ on_stream_notify (GObject *gobject,
}
static void
parse_clock_info (GstPinosSrc *pinossrc)
parse_stream_properties (GstPinosSrc *pinossrc, PinosProperties *props)
{
PinosProperties *props;
const gchar *var;
g_object_get (pinossrc->stream, "properties", &props, NULL);
var = pinos_properties_get (props, "pinos.clock.type");
if (var != NULL) {
GST_DEBUG_OBJECT (pinossrc, "got clock type %s", var);
if (strcmp (var, "gst.net.time.provider") == 0) {
const gchar *address;
gint port;
GstClockTime base_time;
address = pinos_properties_get (props, "pinos.clock.address");
port = atoi (pinos_properties_get (props, "pinos.clock.port"));
base_time = atoll (pinos_properties_get (props, "pinos.clock.base-time"));
GST_DEBUG_OBJECT (pinossrc, "making net clock for %s:%d", address, port);
GST_DEBUG_OBJECT (pinossrc, "making net clock for %s:%d %" G_GUINT64_FORMAT, address, port, base_time);
if (pinossrc->clock)
gst_object_unref (pinossrc->clock);
pinossrc->clock = gst_net_client_clock_new ("pinosclock", address, port, 0);
pinossrc->clock = gst_net_client_clock_new ("pinosclock", address, port, base_time);
gst_element_post_message (GST_ELEMENT_CAST (pinossrc),
gst_message_new_clock_provide (GST_OBJECT_CAST (pinossrc),
pinossrc->clock, TRUE));
}
}
var = pinos_properties_get (props, "pinos.latency.is-live");
@ -524,8 +526,6 @@ parse_clock_info (GstPinosSrc *pinossrc)
var = pinos_properties_get (props, "pinos.latency.min");
pinossrc->min_latency = var ? (GstClockTime) atoi (var) : 0;
pinos_properties_free (props);
}
@ -535,6 +535,7 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc, GstCaps * caps)
gchar *str;
GBytes *format;
gboolean res;
PinosProperties *props;
str = gst_caps_to_string (caps);
format = g_bytes_new_take (str, strlen (str) + 1);
@ -553,10 +554,18 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc, GstCaps * caps)
pinos_main_loop_wait (pinossrc->loop);
}
parse_clock_info (pinossrc);
g_object_get (pinossrc->stream, "properties", &props, NULL);
pinos_main_loop_unlock (pinossrc->loop);
parse_stream_properties (pinossrc, props);
pinos_properties_free (props);
pinos_main_loop_lock (pinossrc->loop);
pinossrc->started = TRUE;
pinos_main_loop_signal (pinossrc->loop, FALSE);
pinos_main_loop_unlock (pinossrc->loop);
return res;
start_error:
@ -566,6 +575,16 @@ start_error:
}
}
static void
wait_negotiated (GstPinosSrc *this)
{
pinos_main_loop_lock (this->loop);
while (!this->started) {
pinos_main_loop_wait (this->loop);
}
pinos_main_loop_unlock (this->loop);
}
static gboolean
gst_pinos_src_negotiate (GstBaseSrc * basesrc)
{
@ -642,8 +661,6 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc)
}
pinos_main_loop_unlock (pinossrc->loop);
parse_clock_info (pinossrc);
g_object_get (pinossrc->stream, "possible-formats", &possible, NULL);
if (possible) {
GstCaps *newcaps;
@ -812,6 +829,7 @@ static GstFlowReturn
gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
{
GstPinosSrc *pinossrc;
GstClockTime base_time;
pinossrc = GST_PINOS_SRC (psrc);
@ -838,6 +856,18 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
pinos_main_loop_wait (pinossrc->loop);
}
base_time = GST_ELEMENT_CAST (psrc)->base_time;
if (GST_BUFFER_PTS_IS_VALID (*buffer) && GST_BUFFER_PTS (*buffer) >= base_time)
GST_BUFFER_PTS (*buffer) -= base_time;
else
GST_BUFFER_PTS (*buffer) = 0;
if (GST_BUFFER_DTS_IS_VALID (*buffer) && GST_BUFFER_DTS (*buffer) >= base_time)
GST_BUFFER_DTS (*buffer) -= base_time;
else
GST_BUFFER_DTS (*buffer) = 0;
pinos_main_loop_unlock (pinossrc->loop);
return GST_FLOW_OK;
@ -1026,6 +1056,11 @@ gst_pinos_src_change_state (GstElement * element, GstStateChange transition)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
wait_negotiated (this);
if (gst_base_src_is_live (GST_BASE_SRC (element)))
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:

View file

@ -57,6 +57,7 @@ struct _GstPinosSrc {
gboolean negotiated;
gboolean flushing;
gboolean started;
gboolean is_live;
GstClockTime min_latency;

View file

@ -130,6 +130,7 @@ setup_pipeline (PinosGstSource *source, GError **error)
priv->sink = gst_element_factory_make ("pinossocketsink", NULL);
g_object_set (priv->sink, "sync", TRUE,
"enable-last-sample", FALSE,
"qos", FALSE,
NULL);
gst_bin_add (GST_BIN (priv->pipeline), priv->sink);
@ -148,9 +149,6 @@ start_pipeline (PinosGstSource *source, GError **error)
PinosGstSourcePrivate *priv = source->priv;
GstCaps *res;
GstQuery *query;
GstClock *clock;
gchar *address;
gint port;
GstStateChangeReturn ret;
g_debug ("gst-source %p: starting pipeline", source);
@ -169,22 +167,6 @@ start_pipeline (PinosGstSource *source, GError **error)
gst_caps_replace (&priv->possible_formats, res);
gst_query_unref (query);
clock = gst_pipeline_get_clock (GST_PIPELINE (priv->pipeline));
if (priv->provider)
g_object_unref (priv->provider);
priv->provider = gst_net_time_provider_new (clock, NULL, 0);
g_object_get (priv->provider, "address", &address, "port", &port, NULL);
pinos_properties_set (priv->props, "pinos.clock.type", "gst.net.time.provider");
pinos_properties_set (priv->props, "pinos.clock.source", GST_OBJECT_NAME (clock));
pinos_properties_set (priv->props, "pinos.clock.address", address);
pinos_properties_setf (priv->props, "pinos.clock.port", "%d", port);
g_free (address);
gst_object_unref (clock);
return TRUE;
/* ERRORS */
@ -245,10 +227,32 @@ set_state (PinosSource *source,
case PINOS_SOURCE_STATE_RUNNING:
{
GstQuery *query;
GstClock *clock;
gchar *address;
gint port;
GstClockTime base_time;
gst_element_set_state (priv->pipeline, GST_STATE_PLAYING);
gst_element_get_state (priv->pipeline, NULL, NULL, -1);
clock = gst_pipeline_get_clock (GST_PIPELINE (priv->pipeline));
base_time = gst_clock_get_time (clock);
if (priv->provider)
g_object_unref (priv->provider);
priv->provider = gst_net_time_provider_new (clock, NULL, 0);
g_object_get (priv->provider, "address", &address, "port", &port, NULL);
pinos_properties_set (priv->props, "pinos.clock.type", "gst.net.time.provider");
pinos_properties_set (priv->props, "pinos.clock.source", GST_OBJECT_NAME (clock));
pinos_properties_set (priv->props, "pinos.clock.address", address);
pinos_properties_setf (priv->props, "pinos.clock.port", "%d", port);
pinos_properties_setf (priv->props, "pinos.clock.base-time", "%"G_GUINT64_FORMAT, base_time);
g_free (address);
gst_object_unref (clock);
query = gst_query_new_latency ();
if (gst_element_query (GST_ELEMENT_CAST (priv->pipeline), query)) {
gboolean live;