Work on main loop

Make a main-loop object with associated helper functions to handle async
methods.
rtloop -> data_loop
Handle async results a lot better.
Remove REMOVE_MEM command. We don't need it.
Handle stream memory updates better.
This commit is contained in:
Wim Taymans 2016-10-20 16:26:55 +02:00
parent 98dbb6424d
commit 8fac22afdb
26 changed files with 926 additions and 583 deletions

View file

@ -451,7 +451,7 @@ context_state_notify (GObject *gobject,
pinos_context_get_error (context)->message);
break;
}
pinos_main_loop_signal (self->loop, FALSE);
pinos_thread_main_loop_signal (self->loop, FALSE);
}
static gboolean
@ -465,23 +465,23 @@ gst_pinos_device_provider_start (GstDeviceProvider * provider)
c = g_main_context_new ();
if (!(self->loop = pinos_main_loop_new (c, "pinos-device-monitor"))) {
if (!(self->loop = pinos_thread_main_loop_new (c, "pinos-device-monitor"))) {
GST_ERROR_OBJECT (self, "Could not create pinos mainloop");
goto failed;
}
if (!pinos_main_loop_start (self->loop, &error)) {
if (!pinos_thread_main_loop_start (self->loop, &error)) {
GST_ERROR_OBJECT (self, "Could not start pinos mainloop: %s", error->message);
g_clear_object (&self->loop);
goto failed;
}
pinos_main_loop_lock (self->loop);
pinos_thread_main_loop_lock (self->loop);
if (!(self->context = pinos_context_new (c, self->client_name, NULL))) {
GST_ERROR_OBJECT (self, "Failed to create context");
pinos_main_loop_unlock (self->loop);
pinos_main_loop_stop (self->loop);
pinos_thread_main_loop_unlock (self->loop);
pinos_thread_main_loop_stop (self->loop);
g_clear_object (&self->loop);
goto failed;
}
@ -515,7 +515,7 @@ gst_pinos_device_provider_start (GstDeviceProvider * provider)
break;
/* Wait until something happens */
pinos_main_loop_wait (self->loop);
pinos_thread_main_loop_wait (self->loop);
}
GST_DEBUG_OBJECT (self, "connected");
pinos_context_get_daemon_info (self->context,
@ -524,7 +524,7 @@ gst_pinos_device_provider_start (GstDeviceProvider * provider)
NULL,
NULL,
self);
pinos_main_loop_unlock (self->loop);
pinos_thread_main_loop_unlock (self->loop);
g_main_context_unref (c);
@ -537,8 +537,8 @@ failed:
}
not_running:
{
pinos_main_loop_unlock (self->loop);
pinos_main_loop_stop (self->loop);
pinos_thread_main_loop_unlock (self->loop);
pinos_thread_main_loop_stop (self->loop);
g_clear_object (&self->context);
g_clear_object (&self->loop);
return TRUE;
@ -554,7 +554,7 @@ gst_pinos_device_provider_stop (GstDeviceProvider * provider)
pinos_context_disconnect (self->context);
}
if (self->loop) {
pinos_main_loop_stop (self->loop);
pinos_thread_main_loop_stop (self->loop);
}
g_clear_object (&self->context);
g_clear_object (&self->loop);

View file

@ -82,7 +82,7 @@ struct _GstPinosDeviceProvider {
gchar *client_name;
GMainContext *maincontext;
PinosMainLoop *loop;
PinosThreadMainLoop *loop;
PinosContext *context;
};

View file

@ -415,7 +415,7 @@ on_add_buffer (GObject *gobject,
gst_pinos_pool_add_buffer (pinossink->pool, buf);
g_hash_table_insert (pinossink->buf_ids, GINT_TO_POINTER (id), buf);
pinos_main_loop_signal (pinossink->loop, FALSE);
pinos_thread_main_loop_signal (pinossink->loop, FALSE);
}
static void
@ -450,7 +450,7 @@ on_new_buffer (GObject *gobject,
buf = g_hash_table_lookup (pinossink->buf_ids, GINT_TO_POINTER (id));
if (buf) {
pinos_main_loop_signal (pinossink->loop, FALSE);
pinos_thread_main_loop_signal (pinossink->loop, FALSE);
}
}
@ -479,7 +479,7 @@ on_stream_notify (GObject *gobject,
pinos_stream_get_error (stream)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossink->loop, FALSE);
pinos_thread_main_loop_signal (pinossink->loop, FALSE);
}
static void
@ -528,7 +528,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
possible = gst_caps_to_format_all (caps);
pinos_main_loop_lock (pinossink->loop);
pinos_thread_main_loop_lock (pinossink->loop);
state = pinos_stream_get_state (pinossink->stream);
if (state == PINOS_STREAM_STATE_ERROR)
@ -556,7 +556,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (state == PINOS_STREAM_STATE_ERROR)
goto start_error;
pinos_main_loop_wait (pinossink->loop);
pinos_thread_main_loop_wait (pinossink->loop);
}
}
res = TRUE;
@ -574,11 +574,11 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (state == PINOS_STREAM_STATE_ERROR)
goto start_error;
pinos_main_loop_wait (pinossink->loop);
pinos_thread_main_loop_wait (pinossink->loop);
}
}
#endif
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
pinossink->negotiated = res;
@ -587,7 +587,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
start_error:
{
GST_ERROR ("could not start stream");
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
g_ptr_array_unref (possible);
return FALSE;
}
@ -606,7 +606,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
if (!pinossink->negotiated)
goto not_negotiated;
pinos_main_loop_lock (pinossink->loop);
pinos_thread_main_loop_lock (pinossink->loop);
if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
goto done;
// goto streaming_error;
@ -639,7 +639,7 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
g_warning ("can't send buffer");
done:
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
return GST_FLOW_OK;
@ -649,7 +649,7 @@ not_negotiated:
}
//streaming_error:
// {
// pinos_main_loop_unlock (pinossink->loop);
// pinos_thread_main_loop_unlock (pinossink->loop);
// return GST_FLOW_ERROR;
// }
}
@ -683,7 +683,7 @@ gst_pinos_sink_start (GstBaseSink * basesink)
props = NULL;
}
pinos_main_loop_lock (pinossink->loop);
pinos_thread_main_loop_lock (pinossink->loop);
pinossink->stream = pinos_stream_new (pinossink->ctx, pinossink->client_name, props);
pinossink->pool->stream = pinossink->stream;
g_signal_connect (pinossink->stream, "notify::state", (GCallback) on_stream_notify, pinossink);
@ -691,7 +691,7 @@ gst_pinos_sink_start (GstBaseSink * basesink)
g_signal_connect (pinossink->stream, "add-buffer", (GCallback) on_add_buffer, pinossink);
g_signal_connect (pinossink->stream, "remove-buffer", (GCallback) on_remove_buffer, pinossink);
g_signal_connect (pinossink->stream, "new-buffer", (GCallback) on_new_buffer, pinossink);
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
return TRUE;
}
@ -701,14 +701,14 @@ gst_pinos_sink_stop (GstBaseSink * basesink)
{
GstPinosSink *pinossink = GST_PINOS_SINK (basesink);
pinos_main_loop_lock (pinossink->loop);
pinos_thread_main_loop_lock (pinossink->loop);
if (pinossink->stream) {
pinos_stream_stop (pinossink->stream);
pinos_stream_disconnect (pinossink->stream);
g_clear_object (&pinossink->stream);
pinossink->pool->stream = NULL;
}
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
pinossink->negotiated = FALSE;
@ -738,7 +738,7 @@ on_context_notify (GObject *gobject,
pinos_context_get_error (pinossink->ctx)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossink->loop, FALSE);
pinos_thread_main_loop_signal (pinossink->loop, FALSE);
}
static gboolean
@ -749,11 +749,11 @@ gst_pinos_sink_open (GstPinosSink * pinossink)
pinossink->context = g_main_context_new ();
GST_DEBUG ("context %p", pinossink->context);
pinossink->loop = pinos_main_loop_new (pinossink->context, "pinos-sink-loop");
if (!pinos_main_loop_start (pinossink->loop, &error))
pinossink->loop = pinos_thread_main_loop_new (pinossink->context, "pinos-sink-loop");
if (!pinos_thread_main_loop_start (pinossink->loop, &error))
goto mainloop_error;
pinos_main_loop_lock (pinossink->loop);
pinos_thread_main_loop_lock (pinossink->loop);
pinossink->ctx = pinos_context_new (pinossink->context, g_get_application_name (), NULL);
g_signal_connect (pinossink->ctx, "notify::state", (GCallback) on_context_notify, pinossink);
@ -768,9 +768,9 @@ gst_pinos_sink_open (GstPinosSink * pinossink)
if (state == PINOS_CONTEXT_STATE_ERROR)
goto connect_error;
pinos_main_loop_wait (pinossink->loop);
pinos_thread_main_loop_wait (pinossink->loop);
}
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
return TRUE;
@ -783,7 +783,7 @@ mainloop_error:
}
connect_error:
{
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
return FALSE;
}
}
@ -791,7 +791,7 @@ connect_error:
static gboolean
gst_pinos_sink_close (GstPinosSink * pinossink)
{
pinos_main_loop_lock (pinossink->loop);
pinos_thread_main_loop_lock (pinossink->loop);
if (pinossink->stream) {
pinos_stream_disconnect (pinossink->stream);
}
@ -807,12 +807,12 @@ gst_pinos_sink_close (GstPinosSink * pinossink)
if (state == PINOS_CONTEXT_STATE_ERROR)
break;
pinos_main_loop_wait (pinossink->loop);
pinos_thread_main_loop_wait (pinossink->loop);
}
}
pinos_main_loop_unlock (pinossink->loop);
pinos_thread_main_loop_unlock (pinossink->loop);
pinos_main_loop_stop (pinossink->loop);
pinos_thread_main_loop_stop (pinossink->loop);
g_clear_object (&pinossink->loop);
g_clear_object (&pinossink->stream);
g_clear_object (&pinossink->ctx);

View file

@ -78,7 +78,7 @@ struct _GstPinosSink {
gboolean negotiated;
GMainContext *context;
PinosMainLoop *loop;
PinosThreadMainLoop *loop;
PinosContext *ctx;
PinosStream *stream;
GstAllocator *allocator;

View file

@ -479,7 +479,7 @@ on_new_buffer (GObject *gobject,
}
g_queue_push_tail (&pinossrc->queue, buf);
pinos_main_loop_signal (pinossrc->loop, FALSE);
pinos_thread_main_loop_signal (pinossrc->loop, FALSE);
}
return;
}
@ -507,7 +507,7 @@ on_stream_notify (GObject *gobject,
pinos_stream_get_error (pinossrc->stream)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossrc->loop, FALSE);
pinos_thread_main_loop_signal (pinossrc->loop, FALSE);
}
static void
@ -536,7 +536,7 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc)
gboolean res;
PinosProperties *props;
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
res = pinos_stream_start (pinossrc->stream);
while (TRUE) {
PinosStreamState state = pinos_stream_get_state (pinossrc->stream);
@ -547,26 +547,26 @@ gst_pinos_src_stream_start (GstPinosSrc *pinossrc)
if (state == PINOS_STREAM_STATE_ERROR)
goto start_error;
pinos_main_loop_wait (pinossrc->loop);
pinos_thread_main_loop_wait (pinossrc->loop);
}
g_object_get (pinossrc->stream, "properties", &props, NULL);
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
parse_stream_properties (pinossrc, props);
pinos_properties_free (props);
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
pinossrc->started = TRUE;
pinos_main_loop_signal (pinossrc->loop, FALSE);
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_signal (pinossrc->loop, FALSE);
pinos_thread_main_loop_unlock (pinossrc->loop);
return res;
start_error:
{
GST_DEBUG_OBJECT (pinossrc, "error starting stream");
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return FALSE;
}
}
@ -576,7 +576,7 @@ wait_negotiated (GstPinosSrc *this)
{
PinosStreamState state;
pinos_main_loop_lock (this->loop);
pinos_thread_main_loop_lock (this->loop);
while (TRUE) {
state = pinos_stream_get_state (this->stream);
@ -586,9 +586,9 @@ wait_negotiated (GstPinosSrc *this)
if (this->started)
break;
pinos_main_loop_wait (this->loop);
pinos_thread_main_loop_wait (this->loop);
}
pinos_main_loop_unlock (this->loop);
pinos_thread_main_loop_unlock (this->loop);
return state;
}
@ -633,7 +633,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc)
possible = gst_caps_to_format_all (caps);
/* first disconnect */
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
if (pinos_stream_get_state (pinossrc->stream) != PINOS_STREAM_STATE_UNCONNECTED) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pinos_stream_disconnect (pinossrc->stream);
@ -648,7 +648,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc)
goto connect_error;
}
pinos_main_loop_wait (pinossrc->loop);
pinos_thread_main_loop_wait (pinossrc->loop);
}
}
@ -669,9 +669,9 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc)
if (state == PINOS_STREAM_STATE_ERROR)
goto connect_error;
pinos_main_loop_wait (pinossrc->loop);
pinos_thread_main_loop_wait (pinossrc->loop);
}
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
result = gst_pinos_src_stream_start (pinossrc);
@ -706,7 +706,7 @@ no_common_caps:
}
connect_error:
{
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return FALSE;
}
}
@ -747,11 +747,11 @@ gst_pinos_src_unlock (GstBaseSrc * basesrc)
{
GstPinosSrc *pinossrc = GST_PINOS_SRC (basesrc);
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
GST_DEBUG_OBJECT (pinossrc, "setting flushing");
pinossrc->flushing = TRUE;
pinos_main_loop_signal (pinossrc->loop, FALSE);
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_signal (pinossrc->loop, FALSE);
pinos_thread_main_loop_unlock (pinossrc->loop);
return TRUE;
}
@ -761,10 +761,10 @@ gst_pinos_src_unlock_stop (GstBaseSrc * basesrc)
{
GstPinosSrc *pinossrc = GST_PINOS_SRC (basesrc);
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
GST_DEBUG_OBJECT (pinossrc, "unsetting flushing");
pinossrc->flushing = FALSE;
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return TRUE;
}
@ -850,7 +850,7 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (!pinossrc->negotiated)
goto not_negotiated;
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
while (TRUE) {
PinosStreamState state;
@ -868,9 +868,9 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (*buffer != NULL)
break;
pinos_main_loop_wait (pinossrc->loop);
pinos_thread_main_loop_wait (pinossrc->loop);
}
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
if (pinossrc->is_live)
base_time = GST_ELEMENT_CAST (psrc)->base_time;
@ -901,12 +901,12 @@ not_negotiated:
}
streaming_error:
{
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return GST_FLOW_ERROR;
}
streaming_stopped:
{
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return GST_FLOW_FLUSHING;
}
}
@ -931,9 +931,9 @@ gst_pinos_src_stop (GstBaseSrc * basesrc)
pinossrc = GST_PINOS_SRC (basesrc);
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
clear_queue (pinossrc);
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return TRUE;
}
@ -959,7 +959,7 @@ on_context_notify (GObject *gobject,
pinos_context_get_error (pinossrc->ctx)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossrc->loop, FALSE);
pinos_thread_main_loop_signal (pinossrc->loop, FALSE);
}
static gboolean
@ -985,11 +985,11 @@ gst_pinos_src_open (GstPinosSrc * pinossrc)
pinossrc->context = g_main_context_new ();
GST_DEBUG ("context %p", pinossrc->context);
pinossrc->loop = pinos_main_loop_new (pinossrc->context, "pinos-main-loop");
if (!pinos_main_loop_start (pinossrc->loop, &error))
pinossrc->loop = pinos_thread_main_loop_new (pinossrc->context, "pinos-main-loop");
if (!pinos_thread_main_loop_start (pinossrc->loop, &error))
goto mainloop_failed;
pinos_main_loop_lock (pinossrc->loop);
pinos_thread_main_loop_lock (pinossrc->loop);
pinossrc->ctx = pinos_context_new (pinossrc->context, g_get_application_name (), NULL);
g_signal_connect (pinossrc->ctx, "notify::state", (GCallback) on_context_notify, pinossrc);
@ -1004,7 +1004,7 @@ gst_pinos_src_open (GstPinosSrc * pinossrc)
if (state == PINOS_CONTEXT_STATE_ERROR)
goto connect_error;
pinos_main_loop_wait (pinossrc->loop);
pinos_thread_main_loop_wait (pinossrc->loop);
}
if (pinossrc->properties) {
@ -1021,7 +1021,7 @@ gst_pinos_src_open (GstPinosSrc * pinossrc)
g_signal_connect (pinossrc->stream, "remove-buffer", (GCallback) on_remove_buffer, pinossrc);
g_signal_connect (pinossrc->stream, "new-buffer", (GCallback) on_new_buffer, pinossrc);
pinossrc->clock = gst_pinos_clock_new (pinossrc->stream);
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return TRUE;
@ -1034,7 +1034,7 @@ mainloop_failed:
}
connect_error:
{
pinos_main_loop_unlock (pinossrc->loop);
pinos_thread_main_loop_unlock (pinossrc->loop);
return FALSE;
}
}
@ -1042,7 +1042,7 @@ connect_error:
static void
gst_pinos_src_close (GstPinosSrc * pinossrc)
{
pinos_main_loop_stop (pinossrc->loop);
pinos_thread_main_loop_stop (pinossrc->loop);
g_clear_object (&pinossrc->loop);
g_clear_object (&pinossrc->ctx);
g_main_context_unref (pinossrc->context);

View file

@ -64,7 +64,7 @@ struct _GstPinosSrc {
GstClockTime max_latency;
GMainContext *context;
PinosMainLoop *loop;
PinosThreadMainLoop *loop;
PinosContext *ctx;
PinosStream *stream;
GstAllocator *fd_allocator;