diff --git a/pinos/gst/gstpinossocketsink.c b/pinos/gst/gstpinossocketsink.c index bb8367aa5..73e036994 100644 --- a/pinos/gst/gstpinossocketsink.c +++ b/pinos/gst/gstpinossocketsink.c @@ -416,8 +416,6 @@ gst_pinos_socket_sink_open (GstPinosSocketSink * this) { GError *error = NULL; - this->cache = gst_burst_cache_new (sizeof (MyReader)); - this->context = g_main_context_new (); this->loop = g_main_loop_new (this->context, TRUE); GST_DEBUG ("context %p, loop %p", this->context, this->loop); @@ -453,7 +451,6 @@ gst_pinos_socket_sink_close (GstPinosSocketSink * this) g_clear_pointer (&this->loop, g_main_loop_unref); g_clear_pointer (&this->context, g_main_context_unref); g_hash_table_remove_all (this->hash); - g_clear_pointer (&this->cache, g_object_unref); return TRUE; } @@ -799,6 +796,7 @@ gst_pinos_socket_sink_finalize (GObject * object) GstPinosSocketSink *this = GST_PINOS_SOCKET_SINK (object); g_clear_pointer (&this->hash, g_hash_table_unref); + g_clear_pointer (&this->cache, g_object_unref); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -879,6 +877,7 @@ static void gst_pinos_socket_sink_init (GstPinosSocketSink * this) { this->hash = g_hash_table_new (g_direct_hash, g_direct_equal); + this->cache = gst_burst_cache_new (sizeof (MyReader)); this->allocator = gst_tmpfile_allocator_new (); this->fdmanager = pinos_fd_manager_get (PINOS_FD_MANAGER_DEFAULT); } diff --git a/pinos/gst/gstpinossrc.c b/pinos/gst/gstpinossrc.c index d0487f87f..bfe53d121 100644 --- a/pinos/gst/gstpinossrc.c +++ b/pinos/gst/gstpinossrc.c @@ -379,7 +379,7 @@ on_new_buffer (GObject *gobject, if (buf == NULL) buf = gst_buffer_new (); - GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT "\n", hdr.pts, hdr.dts_offset); + GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, hdr.pts, hdr.dts_offset); if (GST_CLOCK_TIME_IS_VALID (hdr.pts)) { GST_BUFFER_PTS (buf) = hdr.pts; @@ -495,6 +495,16 @@ parse_stream_properties (GstPinosSrc *pinossrc, PinosProperties *props) { const gchar *var; + var = pinos_properties_get (props, "pinos.latency.is-live"); + pinossrc->is_live = var ? (atoi (var) == 1) : FALSE; + gst_base_src_set_live (GST_BASE_SRC (pinossrc), pinossrc->is_live); + + var = pinos_properties_get (props, "pinos.latency.min"); + pinossrc->min_latency = var ? (GstClockTime) atoi (var) : 0; + + var = pinos_properties_get (props, "pinos.latency.max"); + pinossrc->max_latency = var ? (GstClockTime) atoi (var) : GST_CLOCK_TIME_NONE; + var = pinos_properties_get (props, "pinos.clock.type"); if (var != NULL) { GST_DEBUG_OBJECT (pinossrc, "got clock type %s", var); @@ -517,15 +527,6 @@ parse_stream_properties (GstPinosSrc *pinossrc, PinosProperties *props) pinossrc->clock, TRUE)); } } - var = pinos_properties_get (props, "pinos.latency.is-live"); - pinossrc->is_live = var ? (atoi (var) == 1) : FALSE; - gst_base_src_set_live (GST_BASE_SRC (pinossrc), pinossrc->is_live); - - var = pinos_properties_get (props, "pinos.latency.max"); - pinossrc->max_latency = var ? (GstClockTime) atoi (var) : GST_CLOCK_TIME_NONE; - - var = pinos_properties_get (props, "pinos.latency.min"); - pinossrc->min_latency = var ? (GstClockTime) atoi (var) : 0; } @@ -829,7 +830,7 @@ static GstFlowReturn gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer) { GstPinosSrc *pinossrc; - GstClockTime base_time; + GstClockTime pts, dts, base_time; pinossrc = GST_PINOS_SRC (psrc); @@ -856,20 +857,25 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer) pinos_main_loop_wait (pinossrc->loop); } - base_time = GST_ELEMENT_CAST (psrc)->base_time; - - if (GST_BUFFER_PTS_IS_VALID (*buffer) && GST_BUFFER_PTS (*buffer) >= base_time) - GST_BUFFER_PTS (*buffer) -= base_time; - else - GST_BUFFER_PTS (*buffer) = 0; - - if (GST_BUFFER_DTS_IS_VALID (*buffer) && GST_BUFFER_DTS (*buffer) >= base_time) - GST_BUFFER_DTS (*buffer) -= base_time; - else - GST_BUFFER_DTS (*buffer) = 0; - pinos_main_loop_unlock (pinossrc->loop); + base_time = GST_ELEMENT_CAST (psrc)->base_time; + pts = GST_BUFFER_PTS (*buffer); + dts = GST_BUFFER_DTS (*buffer); + + if (GST_CLOCK_TIME_IS_VALID (pts)) + pts = (pts >= base_time ? pts - base_time : 0); + if (GST_CLOCK_TIME_IS_VALID (dts)) + dts = (dts >= base_time ? dts - base_time : 0); + + GST_INFO ("pts %" G_GUINT64_FORMAT ", dts %"G_GUINT64_FORMAT + ", base-time %"GST_TIME_FORMAT" -> %"GST_TIME_FORMAT", %"GST_TIME_FORMAT, + GST_BUFFER_PTS (*buffer), GST_BUFFER_DTS (*buffer), GST_TIME_ARGS (base_time), + GST_TIME_ARGS (pts), GST_TIME_ARGS (dts)); + + GST_BUFFER_PTS (*buffer) = pts; + GST_BUFFER_DTS (*buffer) = dts; + return GST_FLOW_OK; not_negotiated: diff --git a/pinos/modules/gst/gst-source.c b/pinos/modules/gst/gst-source.c index 935db173c..1d359f6d8 100644 --- a/pinos/modules/gst/gst-source.c +++ b/pinos/modules/gst/gst-source.c @@ -111,6 +111,8 @@ bus_handler (GstBus *bus, return TRUE; } + + static gboolean setup_pipeline (PinosGstSource *source, GError **error) { @@ -118,6 +120,7 @@ setup_pipeline (PinosGstSource *source, GError **error) GstBus *bus; priv->pipeline = gst_pipeline_new (NULL); + gst_pipeline_set_latency (GST_PIPELINE_CAST (priv->pipeline), 0); g_debug ("gst-source %p: setup pipeline", source); @@ -153,13 +156,9 @@ start_pipeline (PinosGstSource *source, GError **error) g_debug ("gst-source %p: starting pipeline", source); - ret = gst_element_set_state (priv->pipeline, GST_STATE_PAUSED); + ret = gst_element_set_state (priv->pipeline, GST_STATE_READY); if (ret == GST_STATE_CHANGE_FAILURE) - goto paused_failed; - - ret = gst_element_get_state (priv->pipeline, NULL, NULL, -1); - if (ret == GST_STATE_CHANGE_FAILURE) - goto paused_failed; + goto ready_failed; query = gst_query_new_caps (NULL); gst_element_query (priv->element, query); @@ -170,9 +169,9 @@ start_pipeline (PinosGstSource *source, GError **error) return TRUE; /* ERRORS */ -paused_failed: +ready_failed: { - GST_ERROR_OBJECT (source, "failed state change"); + GST_ERROR_OBJECT (source, "failed state change to READY"); gst_element_set_state (priv->pipeline, GST_STATE_NULL); if (error) *error = g_error_new (G_IO_ERROR, @@ -231,7 +230,11 @@ set_state (PinosSource *source, gchar *address; gint port; GstClockTime base_time; + gboolean live; + GstClockTime min_latency, max_latency; + gst_element_set_state (priv->pipeline, GST_STATE_PAUSED); + gst_element_get_state (priv->pipeline, NULL, NULL, -1); gst_element_set_state (priv->pipeline, GST_STATE_PLAYING); gst_element_get_state (priv->pipeline, NULL, NULL, -1); @@ -255,21 +258,22 @@ set_state (PinosSource *source, query = gst_query_new_latency (); if (gst_element_query (GST_ELEMENT_CAST (priv->pipeline), query)) { - gboolean live; - GstClockTime min_latency, max_latency; - gst_query_parse_latency (query, &live, &min_latency, &max_latency); - - GST_DEBUG_OBJECT (priv->pipeline, - "got min latency %" GST_TIME_FORMAT ", max latency %" - GST_TIME_FORMAT ", live %d", GST_TIME_ARGS (min_latency), - GST_TIME_ARGS (max_latency), live); - - pinos_properties_setf (priv->props, "pinos.latency.is-live", "%d", live); - pinos_properties_setf (priv->props, "pinos.latency.min", "%"G_GUINT64_FORMAT, min_latency); - pinos_properties_setf (priv->props, "pinos.latency.max", "%"G_GUINT64_FORMAT, max_latency); + } else { + live = FALSE; + min_latency = 0; + max_latency = -1; } gst_query_unref (query); + + GST_DEBUG_OBJECT (priv->pipeline, + "got min latency %" GST_TIME_FORMAT ", max latency %" + GST_TIME_FORMAT ", live %d", GST_TIME_ARGS (min_latency), + GST_TIME_ARGS (max_latency), live); + + pinos_properties_setf (priv->props, "pinos.latency.is-live", "%d", live); + pinos_properties_setf (priv->props, "pinos.latency.min", "%"G_GUINT64_FORMAT, min_latency); + pinos_properties_setf (priv->props, "pinos.latency.max", "%"G_GUINT64_FORMAT, max_latency); break; } case PINOS_SOURCE_STATE_ERROR: @@ -355,6 +359,7 @@ on_socket_notify (GObject *gobject, PinosProperties *props; g_object_get (gobject, "socket", &socket, NULL); + GST_DEBUG ("got socket %p", socket); if (socket == NULL) { GSocket *prev_socket = g_object_steal_data (gobject, "last-socket"); @@ -363,7 +368,6 @@ on_socket_notify (GObject *gobject, g_object_unref (prev_socket); } } else { - pinos_source_report_busy (PINOS_SOURCE (source)); g_signal_emit_by_name (priv->sink, "add", socket); g_object_set_data_full (gobject, "last-socket", socket, g_object_unref); } @@ -400,6 +404,7 @@ on_socket_notify (GObject *gobject, } /* this is what we use as the final format for the output */ g_object_set (gobject, "format", format, NULL); + pinos_source_report_busy (PINOS_SOURCE (source)); } if (format) { pinos_source_update_possible_formats (PINOS_SOURCE (source), format);