mainloop: add threaded mainloop

Add a mainloop wrapper that runs the mainloop in a separate thread and
has some synchronization primitives.
Use new mainloop in gstreamer source and sink elements
This commit is contained in:
Wim Taymans 2015-07-08 12:11:55 +02:00
parent a3505fb880
commit cbeee04809
9 changed files with 710 additions and 195 deletions

View file

@ -123,8 +123,6 @@ static void
gst_pinos_sink_init (GstPinosSink * sink)
{
sink->allocator = gst_tmpfile_allocator_new ();
g_mutex_init (&sink->lock);
g_cond_init (&sink->cond);
}
static GstCaps *
@ -198,7 +196,7 @@ on_new_buffer (GObject *gobject,
{
GstPinosSink *pinossink = user_data;
g_cond_signal (&pinossink->cond);
pinos_main_loop_signal (pinossink->loop, FALSE);
}
static void
@ -207,17 +205,26 @@ on_stream_notify (GObject *gobject,
gpointer user_data)
{
PinosStreamState state;
PinosStream *stream = PINOS_STREAM (gobject);
GstPinosSink *pinossink = user_data;
state = pinos_stream_get_state (pinossink->stream);
g_print ("got stream state %d\n", state);
g_cond_broadcast (&pinossink->cond);
state = pinos_stream_get_state (stream);
GST_DEBUG ("got stream state %d\n", state);
if (state == PINOS_STREAM_STATE_ERROR) {
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("Failed to connect stream: %s",
pinos_stream_get_error (pinossink->stream)->message), (NULL));
switch (state) {
case PINOS_STREAM_STATE_UNCONNECTED:
case PINOS_STREAM_STATE_CONNECTING:
case PINOS_STREAM_STATE_STARTING:
case PINOS_STREAM_STATE_STREAMING:
case PINOS_STREAM_STATE_READY:
break;
case PINOS_STREAM_STATE_ERROR:
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("stream error: %s",
pinos_stream_get_error (stream)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossink->loop, FALSE);
}
static GstCaps *
@ -238,7 +245,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
str = gst_caps_to_string (caps);
format = g_bytes_new_take (str, strlen (str) + 1);
g_mutex_lock (&pinossink->lock);
pinos_main_loop_lock (pinossink->loop);
pinossink->stream = pinos_stream_new (pinossink->ctx, "test", NULL);
g_signal_connect (pinossink->stream, "notify::state", (GCallback) on_stream_notify, pinossink);
g_signal_connect (pinossink->stream, "new-buffer", (GCallback) on_new_buffer, pinossink);
@ -254,7 +261,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (state == PINOS_STREAM_STATE_ERROR)
goto connect_error;
g_cond_wait (&pinossink->cond, &pinossink->lock);
pinos_main_loop_wait (pinossink->loop);
}
pinos_stream_start (pinossink->stream, format, PINOS_STREAM_MODE_BUFFER);
@ -268,9 +275,9 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (state == PINOS_STREAM_STATE_ERROR)
goto connect_error;
g_cond_wait (&pinossink->cond, &pinossink->lock);
pinos_main_loop_wait (pinossink->loop);
}
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
pinossink->negotiated = TRUE;
@ -278,7 +285,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
connect_error:
{
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
return FALSE;
}
}
@ -323,11 +330,11 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_memory_unref (mem);
info.message = mesg;
g_mutex_lock (&pinossink->lock);
pinos_main_loop_lock (pinossink->loop);
if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING)
goto streaming_error;
pinos_stream_provide_buffer (pinossink->stream, &info);
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
return GST_FLOW_OK;
@ -341,7 +348,7 @@ map_error:
}
streaming_error:
{
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
return GST_FLOW_ERROR;
}
}
@ -366,63 +373,48 @@ gst_pinos_sink_stop (GstBaseSink * basesink)
return TRUE;
}
static GPrivate sink_key;
static gint
do_poll (GPollFD *ufds, guint nfsd, gint timeout_)
{
gint res;
GstPinosSink *this = g_private_get (&sink_key);
g_mutex_unlock (&this->lock);
res = this->poll_func (ufds, nfsd, timeout_);
g_mutex_lock (&this->lock);
return res;
}
static gpointer
handle_mainloop (GstPinosSink *this)
{
g_mutex_lock (&this->lock);
g_private_set (&sink_key, this);
this->poll_func = g_main_context_get_poll_func (this->context);
g_main_context_set_poll_func (this->context, do_poll);
g_main_context_push_thread_default (this->context);
g_print ("run mainloop\n");
g_main_loop_run (this->loop);
g_print ("quit mainloop\n");
g_main_context_pop_thread_default (this->context);
g_mutex_unlock (&this->lock);
return NULL;
}
static void
on_state_notify (GObject *gobject,
GParamSpec *pspec,
gpointer user_data)
on_context_notify (GObject *gobject,
GParamSpec *pspec,
gpointer user_data)
{
GstPinosSink *pinossink = user_data;
PinosContext *ctx = PINOS_CONTEXT (gobject);
PinosContextState state;
state = pinos_context_get_state (pinossink->ctx);
g_print ("got context state %d\n", state);
g_cond_broadcast (&pinossink->cond);
state = pinos_context_get_state (ctx);
GST_DEBUG ("got context state %d\n", state);
if (state == PINOS_CONTEXT_STATE_ERROR) {
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("Failed to connect stream: %s",
pinos_context_get_error (pinossink->ctx)->message), (NULL));
switch (state) {
case PINOS_CONTEXT_STATE_UNCONNECTED:
case PINOS_CONTEXT_STATE_CONNECTING:
case PINOS_CONTEXT_STATE_REGISTERING:
case PINOS_CONTEXT_STATE_READY:
break;
case PINOS_CONTEXT_STATE_ERROR:
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("context error: %s",
pinos_context_get_error (pinossink->ctx)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossink->loop, FALSE);
}
static gboolean
gst_pinos_sink_open (GstPinosSink * pinossink)
{
g_mutex_lock (&pinossink->lock);
GError *error = NULL;
pinossink->context = g_main_context_new ();
GST_DEBUG ("context %p\n", pinossink->context);
pinossink->loop = pinos_main_loop_new (pinossink->context, "pinos-sink-loop");
if (!pinos_main_loop_start (pinossink->loop, &error))
goto mainloop_error;
pinos_main_loop_lock (pinossink->loop);
pinossink->ctx = pinos_context_new (pinossink->context, "test-client", NULL);
g_signal_connect (pinossink->ctx, "notify::state", (GCallback) on_state_notify, pinossink);
g_signal_connect (pinossink->ctx, "notify::state", (GCallback) on_context_notify, pinossink);
pinos_context_connect(pinossink->ctx, PINOS_CONTEXT_FLAGS_NONE);
@ -435,16 +427,22 @@ gst_pinos_sink_open (GstPinosSink * pinossink)
if (state == PINOS_CONTEXT_STATE_ERROR)
goto connect_error;
g_cond_wait (&pinossink->cond, &pinossink->lock);
pinos_main_loop_wait (pinossink->loop);
}
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
return TRUE;
/* ERRORS */
mainloop_error:
{
GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED,
("Failed to start mainloop: %s", error->message), (NULL));
return FALSE;
}
connect_error:
{
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
return FALSE;
}
}
@ -452,13 +450,12 @@ connect_error:
static gboolean
gst_pinos_sink_close (GstPinosSink * pinossink)
{
g_mutex_lock (&pinossink->lock);
pinos_main_loop_lock (pinossink->loop);
if (pinossink->stream) {
pinos_stream_disconnect (pinossink->stream);
}
if (pinossink->ctx) {
pinos_context_disconnect(pinossink->ctx);
pinos_context_disconnect (pinossink->ctx);
while (TRUE) {
PinosContextState state = pinos_context_get_state (pinossink->ctx);
@ -469,10 +466,16 @@ gst_pinos_sink_close (GstPinosSink * pinossink)
if (state == PINOS_CONTEXT_STATE_ERROR)
break;
g_cond_wait (&pinossink->cond, &pinossink->lock);
pinos_main_loop_wait (pinossink->loop);
}
}
g_mutex_unlock (&pinossink->lock);
pinos_main_loop_unlock (pinossink->loop);
pinos_main_loop_stop (pinossink->loop);
g_clear_object (&pinossink->loop);
g_clear_object (&pinossink->stream);
g_clear_object (&pinossink->ctx);
g_main_context_unref (pinossink->context);
return TRUE;
}
@ -485,14 +488,8 @@ gst_pinos_sink_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
this->context = g_main_context_new ();
g_print ("context %p\n", this->context);
this->loop = g_main_loop_new (this->context, FALSE);
this->thread = g_thread_new ("pinos", (GThreadFunc) handle_mainloop, this);
if (!gst_pinos_sink_open (this)) {
ret = GST_STATE_CHANGE_FAILURE;
goto exit;
}
if (!gst_pinos_sink_open (this))
goto open_failed;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
@ -515,17 +512,15 @@ gst_pinos_sink_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_READY_TO_NULL:
gst_pinos_sink_close (this);
g_main_loop_quit (this->loop);
g_thread_join (this->thread);
g_main_loop_unref (this->loop);
g_clear_object (&this->stream);
g_clear_object (&this->ctx);
g_main_context_unref (this->context);
break;
default:
break;
}
exit:
return ret;
/* ERRORS */
open_failed:
{
return GST_STATE_CHANGE_FAILURE;
}
}

View file

@ -23,9 +23,7 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <client/context.h>
#include <client/stream.h>
#include <client/introspect.h>
#include <client/pinos.h>
G_BEGIN_DECLS
@ -59,15 +57,10 @@ struct _GstPinosSink {
gboolean negotiated;
GMainContext *context;
GMainLoop *loop;
GThread *thread;
PinosMainLoop *loop;
PinosContext *ctx;
PinosStream *stream;
GstAllocator *allocator;
GPollFunc poll_func;
GMutex lock;
GCond cond;
};
struct _GstPinosSinkClass {

View file

@ -121,8 +121,6 @@ gst_pinos_src_finalize (GObject * object)
GstPinosSrc *pinossrc = GST_PINOS_SRC (object);
g_object_unref (pinossrc->fd_allocator);
g_mutex_clear (&pinossrc->lock);
g_cond_clear (&pinossrc->cond);
g_free (pinossrc->source);
G_OBJECT_CLASS (parent_class)->finalize (object);
@ -186,8 +184,6 @@ gst_pinos_src_init (GstPinosSrc * src)
gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE);
src->fd_allocator = gst_fd_allocator_new ();
g_mutex_init (&src->lock);
g_cond_init (&src->cond);
}
static GstCaps *
@ -239,7 +235,7 @@ on_new_buffer (GObject *gobject,
{
GstPinosSrc *pinossrc = user_data;
g_cond_signal (&pinossrc->cond);
pinos_main_loop_signal (pinossrc->loop, FALSE);
}
static void
@ -247,18 +243,25 @@ on_stream_notify (GObject *gobject,
GParamSpec *pspec,
gpointer user_data)
{
PinosStreamState state;
GstPinosSrc *pinossrc = user_data;
PinosStreamState state = pinos_stream_get_state (pinossrc->stream);
state = pinos_stream_get_state (pinossrc->stream);
g_print ("got stream state %d\n", state);
g_cond_broadcast (&pinossrc->cond);
GST_DEBUG ("got stream state %d\n", state);
if (state == PINOS_STREAM_STATE_ERROR) {
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED,
("Failed to connect stream: %s",
pinos_stream_get_error (pinossrc->stream)->message), (NULL));
switch (state) {
case PINOS_STREAM_STATE_UNCONNECTED:
case PINOS_STREAM_STATE_CONNECTING:
case PINOS_STREAM_STATE_STARTING:
case PINOS_STREAM_STATE_STREAMING:
case PINOS_STREAM_STATE_READY:
break;
case PINOS_STREAM_STATE_ERROR:
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED,
("stream error: %s",
pinos_stream_get_error (pinossrc->stream)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossrc->loop, FALSE);
}
static gboolean
@ -300,7 +303,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc)
str = gst_caps_to_string (caps);
accepted = g_bytes_new_take (str, strlen (str) + 1);
g_mutex_lock (&pinossrc->lock);
pinos_main_loop_lock (pinossrc->loop);
pinos_stream_connect_capture (pinossrc->stream, pinossrc->source, 0, accepted);
while (TRUE) {
@ -312,9 +315,9 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc)
if (state == PINOS_STREAM_STATE_ERROR)
goto connect_error;
g_cond_wait (&pinossrc->cond, &pinossrc->lock);
pinos_main_loop_wait (pinossrc->loop);
}
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
g_object_get (pinossrc->stream, "possible-formats", &possible, NULL);
if (possible) {
@ -367,7 +370,7 @@ no_caps:
}
connect_error:
{
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
return FALSE;
}
}
@ -391,9 +394,9 @@ gst_pinos_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps)
str = gst_caps_to_string (caps);
format = g_bytes_new_take (str, strlen (str) + 1);
g_mutex_lock (&pinossrc->lock);
pinos_main_loop_lock (pinossrc->loop);
res = pinos_stream_start (pinossrc->stream, format, PINOS_STREAM_MODE_BUFFER);
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
return res;
}
@ -412,11 +415,11 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
goto not_negotiated;
again:
g_mutex_lock (&pinossrc->lock);
pinos_main_loop_lock (pinossrc->loop);
while (TRUE) {
PinosStreamState state;
g_cond_wait (&pinossrc->cond, &pinossrc->lock);
pinos_main_loop_wait (pinossrc->loop);
state = pinos_stream_get_state (pinossrc->stream);
if (state == PINOS_STREAM_STATE_ERROR)
@ -429,7 +432,7 @@ again:
if (info.message != NULL)
break;
}
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
if (g_socket_control_message_get_msg_type (info.message) != SCM_RIGHTS)
goto again;
@ -453,12 +456,12 @@ not_negotiated:
}
streaming_error:
{
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
return GST_FLOW_ERROR;
}
streaming_stopped:
{
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
return GST_FLOW_FLUSHING;
}
}
@ -475,64 +478,46 @@ gst_pinos_src_stop (GstBaseSrc * basesrc)
return TRUE;
}
static GPrivate src_key;
static gint
do_poll (GPollFD *ufds, guint nfsd, gint timeout_)
{
gint res;
GstPinosSrc *this = g_private_get (&src_key);
g_mutex_unlock (&this->lock);
res = this->poll_func (ufds, nfsd, timeout_);
g_mutex_lock (&this->lock);
return res;
}
static gpointer
handle_mainloop (GstPinosSrc *this)
{
g_mutex_lock (&this->lock);
g_private_set (&src_key, this);
this->poll_func = g_main_context_get_poll_func (this->context);
g_main_context_set_poll_func (this->context, do_poll);
g_main_context_push_thread_default (this->context);
g_print ("run mainloop\n");
g_main_loop_run (this->loop);
g_print ("quit mainloop\n");
g_main_context_pop_thread_default (this->context);
g_mutex_unlock (&this->lock);
return NULL;
}
static void
on_state_notify (GObject *gobject,
GParamSpec *pspec,
gpointer user_data)
on_context_notify (GObject *gobject,
GParamSpec *pspec,
gpointer user_data)
{
GstPinosSrc *pinossrc = user_data;
PinosContextState state;
PinosContextState state = pinos_context_get_state (pinossrc->ctx);
state = pinos_context_get_state (pinossrc->ctx);
g_print ("got context state %d\n", state);
g_cond_broadcast (&pinossrc->cond);
GST_DEBUG ("got context state %d\n", state);
if (state == PINOS_CONTEXT_STATE_ERROR) {
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED,
("Failed to connect stream: %s",
pinos_context_get_error (pinossrc->ctx)->message), (NULL));
switch (state) {
case PINOS_CONTEXT_STATE_UNCONNECTED:
case PINOS_CONTEXT_STATE_CONNECTING:
case PINOS_CONTEXT_STATE_REGISTERING:
case PINOS_CONTEXT_STATE_READY:
break;
case PINOS_CONTEXT_STATE_ERROR:
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED,
("context error: %s",
pinos_context_get_error (pinossrc->ctx)->message), (NULL));
break;
}
pinos_main_loop_signal (pinossrc->loop, FALSE);
}
static gboolean
gst_pinos_src_open (GstPinosSrc * pinossrc)
{
GError *error = NULL;
g_mutex_lock (&pinossrc->lock);
pinossrc->context = g_main_context_new ();
GST_DEBUG ("context %p\n", pinossrc->context);
pinossrc->loop = pinos_main_loop_new (pinossrc->context, "pinos-main-loop");
if (!pinos_main_loop_start (pinossrc->loop, &error))
goto mainloop_failed;
pinos_main_loop_lock (pinossrc->loop);
pinossrc->ctx = pinos_context_new (pinossrc->context, "test-client", NULL);
g_signal_connect (pinossrc->ctx, "notify::state", (GCallback) on_state_notify, pinossrc);
g_signal_connect (pinossrc->ctx, "notify::state", (GCallback) on_context_notify, pinossrc);
pinos_context_connect(pinossrc->ctx, PINOS_CONTEXT_FLAGS_NONE);
@ -545,22 +530,36 @@ gst_pinos_src_open (GstPinosSrc * pinossrc)
if (state == PINOS_CONTEXT_STATE_ERROR)
goto connect_error;
g_cond_wait (&pinossrc->cond, &pinossrc->lock);
pinos_main_loop_wait (pinossrc->loop);
}
pinossrc->stream = pinos_stream_new (pinossrc->ctx, "test", NULL);
g_signal_connect (pinossrc->stream, "notify::state", (GCallback) on_stream_notify, pinossrc);
g_signal_connect (pinossrc->stream, "new-buffer", (GCallback) on_new_buffer, pinossrc);
g_mutex_unlock (&pinossrc->lock);
pinos_main_loop_unlock (pinossrc->loop);
return TRUE;
/* ERRORS */
connect_error:
mainloop_failed:
{
g_mutex_unlock (&pinossrc->lock);
GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED,
("mainloop error: %s", error->message), (NULL));
return FALSE;
}
connect_error:
{
pinos_main_loop_unlock (pinossrc->loop);
return FALSE;
}
}
static void
gst_pinos_src_close (GstPinosSrc * pinossrc)
{
pinos_main_loop_stop (pinossrc->loop);
g_clear_object (&pinossrc->loop);
g_main_context_unref (pinossrc->context);
}
static GstStateChangeReturn
@ -571,14 +570,8 @@ gst_pinos_src_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:
this->context = g_main_context_new ();
g_print ("context %p\n", this->context);
this->loop = g_main_loop_new (this->context, FALSE);
this->thread = g_thread_new ("pinos", (GThreadFunc) handle_mainloop, this);
if (!gst_pinos_src_open (this)) {
ret = GST_STATE_CHANGE_FAILURE;
goto exit;
}
if (!gst_pinos_src_open (this))
goto open_failed;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
break;
@ -601,15 +594,16 @@ gst_pinos_src_change_state (GstElement * element, GstStateChange transition)
this->negotiated = FALSE;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
g_main_loop_quit (this->loop);
g_thread_join (this->thread);
g_main_loop_unref (this->loop);
g_main_context_unref (this->context);
gst_pinos_src_close (this);
break;
default:
break;
}
exit:
return ret;
/* ERRORS */
open_failed:
{
return GST_STATE_CHANGE_FAILURE;
}
}

View file

@ -23,9 +23,7 @@
#include <gst/gst.h>
#include <gst/base/gstpushsrc.h>
#include <client/context.h>
#include <client/stream.h>
#include <client/introspect.h>
#include <client/pinos.h>
G_BEGIN_DECLS
@ -59,15 +57,10 @@ struct _GstPinosSrc {
gboolean negotiated;
GMainContext *context;
GMainLoop *loop;
GThread *thread;
PinosMainLoop *loop;
PinosContext *ctx;
PinosStream *stream;
GstAllocator *fd_allocator;
GPollFunc poll_func;
GMutex lock;
GCond cond;
};
struct _GstPinosSrcClass {