From cbeee04809df4a6bdc411e8bce616ccb0a5a3795 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 8 Jul 2015 12:11:55 +0200 Subject: [PATCH] mainloop: add threaded mainloop Add a mainloop wrapper that runs the mainloop in a separate thread and has some synchronization primitives. Use new mainloop in gstreamer source and sink elements --- src/Makefile.am | 1 + src/client/.gitignore | 4 +- src/client/mainloop.c | 450 +++++++++++++++++++++++++++++++++++++++++ src/client/mainloop.h | 88 ++++++++ src/client/pinos.h | 5 +- src/gst/gstpinossink.c | 167 ++++++++------- src/gst/gstpinossink.h | 11 +- src/gst/gstpinossrc.c | 168 ++++++++------- src/gst/gstpinossrc.h | 11 +- 9 files changed, 710 insertions(+), 195 deletions(-) create mode 100644 src/client/mainloop.c create mode 100644 src/client/mainloop.h diff --git a/src/Makefile.am b/src/Makefile.am index 211ea9cec..3d2023d02 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -165,6 +165,7 @@ libpinos_@PINOS_MAJORMINOR@_la_SOURCES = \ client/context.h client/context.c \ client/enumtypes.h client/enumtypes.c \ client/introspect.h client/introspect.c \ + client/mainloop.h client/mainloop.c \ client/stream.h client/stream.c \ client/pinos.c client/pinos.h \ client/subscribe.c client/subscribe.h \ diff --git a/src/client/.gitignore b/src/client/.gitignore index 3fc13c714..7ed962366 100644 --- a/src/client/.gitignore +++ b/src/client/.gitignore @@ -1,2 +1,2 @@ -pv-enumtypes.c -pv-enumtypes.h +enumtypes.c +enumtypes.h diff --git a/src/client/mainloop.c b/src/client/mainloop.c new file mode 100644 index 000000000..912e79872 --- /dev/null +++ b/src/client/mainloop.c @@ -0,0 +1,450 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include "mainloop.h" + +struct _PinosMainLoopPrivate +{ + GMainContext *maincontext; + GMainLoop *mainloop; + + gchar *name; + + GPollFunc poll_func; + + GMutex lock; + GCond cond; + GCond accept_cond; + GThread *thread; + + gint n_waiting; + gint n_waiting_for_accept; +}; + +#define PINOS_MAIN_LOOP_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), PINOS_TYPE_MAIN_LOOP, PinosMainLoopPrivate)) + +G_DEFINE_TYPE (PinosMainLoop, pinos_main_loop, G_TYPE_OBJECT); + +enum +{ + PROP_0, + PROP_MAIN_CONTEXT, + PROP_NAME, + PROP_MAIN_LOOP, +}; + +static void +pinos_main_loop_get_property (GObject *_object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + PinosMainLoop *loop = PINOS_MAIN_LOOP (_object); + PinosMainLoopPrivate *priv = loop->priv; + + switch (prop_id) { + case PROP_MAIN_CONTEXT: + g_value_set_boxed (value, priv->maincontext); + break; + + case PROP_NAME: + g_value_set_string (value, priv->name); + break; + + case PROP_MAIN_LOOP: + g_value_set_boxed (value, priv->mainloop); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (loop, prop_id, pspec); + break; + } +} + +static void +pinos_main_loop_set_property (GObject *_object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + PinosMainLoop *loop = PINOS_MAIN_LOOP (_object); + PinosMainLoopPrivate *priv = loop->priv; + + switch (prop_id) { + case PROP_MAIN_CONTEXT: + priv->maincontext = g_value_dup_boxed (value); + break; + + case PROP_NAME: + priv->name = g_value_dup_string (value); + break; + + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (loop, prop_id, pspec); + break; + } +} + +static void +pinos_main_loop_constructed (GObject * object) +{ + PinosMainLoop *loop = PINOS_MAIN_LOOP (object); + PinosMainLoopPrivate *priv = loop->priv; + + priv->mainloop = g_main_loop_new (priv->maincontext, FALSE); + + G_OBJECT_CLASS (pinos_main_loop_parent_class)->constructed (object); +} + +static void +pinos_main_loop_finalize (GObject * object) +{ + PinosMainLoop *loop = PINOS_MAIN_LOOP (object); + PinosMainLoopPrivate *priv = loop->priv; + + if (priv->maincontext) + g_main_context_unref (priv->maincontext); + g_main_loop_unref (priv->mainloop); + + g_free (priv->name); + g_mutex_clear (&priv->lock); + g_cond_clear (&priv->cond); + g_cond_clear (&priv->accept_cond); + + G_OBJECT_CLASS (pinos_main_loop_parent_class)->finalize (object); +} + +static void +pinos_main_loop_class_init (PinosMainLoopClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + + g_type_class_add_private (klass, sizeof (PinosMainLoopPrivate)); + + gobject_class->constructed = pinos_main_loop_constructed; + gobject_class->finalize = pinos_main_loop_finalize; + gobject_class->set_property = pinos_main_loop_set_property; + gobject_class->get_property = pinos_main_loop_get_property; + + /** + * PinosMainLoop:main-context + * + * The GMainContext of the loop. + */ + g_object_class_install_property (gobject_class, + PROP_MAIN_CONTEXT, + g_param_spec_boxed ("main-context", + "Main Context", + "The GMainContext of the loop", + G_TYPE_MAIN_CONTEXT, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + /** + * PinosMainLoop:name + * + * The name of the loop as specified at construction time. + */ + g_object_class_install_property (gobject_class, + PROP_NAME, + g_param_spec_string ("name", + "Name", + "The name of the loop thread", + NULL, + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY | + G_PARAM_STATIC_STRINGS)); + /** + * PinosMainLoop:main-loop + * + * The GMainLoop of the loop. + */ + g_object_class_install_property (gobject_class, + PROP_MAIN_LOOP, + g_param_spec_boxed ("main-loop", + "Main Loop", + "The GMainLoop", + G_TYPE_MAIN_LOOP, + G_PARAM_READABLE | + G_PARAM_STATIC_STRINGS)); +} + +static void +pinos_main_loop_init (PinosMainLoop * loop) +{ + PinosMainLoopPrivate *priv = loop->priv = PINOS_MAIN_LOOP_GET_PRIVATE (loop); + + g_mutex_init (&priv->lock); + g_cond_init (&priv->cond); + g_cond_init (&priv->accept_cond); +} + +/** + * pinos_main_loop_new: + * @context: a #GMainContext + * @name: a thread name + * + * Make a new #PinosMainLoop that will run a mainloop on @context in + * a thread with @name. + * + * Returns: a #PinosMainLoop + */ +PinosMainLoop * +pinos_main_loop_new (GMainContext * context, const gchar *name) +{ + PinosMainLoop *loop; + + loop = g_object_new (PINOS_TYPE_MAIN_LOOP, + "main-context", context, + "name", name, + NULL); + return loop; +} + +/** + * pinos_main_loop_get_impl: + * @loop: a #PinosMainLoop + * + * Get the #GMainLoop used by @loop. + * + * Returns: the #GMainLoop used by @loop. It remains valid as long as + * @loop is valid. + */ +GMainLoop * +pinos_main_loop_get_impl (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv; + + g_return_val_if_fail (PINOS_IS_MAIN_LOOP (loop), NULL); + + priv = loop->priv; + + return priv->mainloop; +} + +static GPrivate loop_key; + +static gint +do_poll (GPollFD *ufds, guint nfsd, gint timeout_) +{ + gint res; + PinosMainLoop *loop = g_private_get (&loop_key); + PinosMainLoopPrivate *priv = loop->priv; + + g_mutex_unlock (&priv->lock); + res = priv->poll_func (ufds, nfsd, timeout_); + g_mutex_lock (&priv->lock); + + return res; +} + +static gpointer +handle_mainloop (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv = loop->priv; + + g_mutex_lock (&priv->lock); + g_private_set (&loop_key, loop); + + priv->poll_func = g_main_context_get_poll_func (priv->maincontext); + g_main_context_set_poll_func (priv->maincontext, do_poll); + + g_main_context_push_thread_default (priv->maincontext); + g_main_loop_run (priv->mainloop); + g_main_context_pop_thread_default (priv->maincontext); + + g_main_context_set_poll_func (priv->maincontext, priv->poll_func); + + g_mutex_unlock (&priv->lock); + + return NULL; +} + + +/** + * pinos_main_loop_start: + * @loop: a #PinosMainLoop + * @error: am optional #GError + * + * Start the thread to handle @loop. + * + * Returns: %TRUE on success. %FALSE will be returned when an error occured + * and @error will contain more information. + */ +gboolean +pinos_main_loop_start (PinosMainLoop *loop, GError **error) +{ + PinosMainLoopPrivate *priv; + + g_return_val_if_fail (PINOS_IS_MAIN_LOOP (loop), FALSE); + priv = loop->priv; + g_return_val_if_fail (priv->thread == NULL, FALSE); + + priv->thread = g_thread_try_new (priv->name, (GThreadFunc) handle_mainloop, loop, error); + + return priv->thread != NULL; +} + +/** + * pinos_main_loop_stop: + * @loop: a #PinosMainLoop + * + * Quit the main loop and stop its thread. + */ +void +pinos_main_loop_stop (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv; + + g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + priv = loop->priv; + + g_return_if_fail (priv->thread != NULL); + g_return_if_fail (!pinos_main_loop_in_thread (loop)); + + g_mutex_lock (&priv->lock); + g_main_loop_quit (priv->mainloop); + g_mutex_unlock (&priv->lock); + + g_thread_join (priv->thread); + priv->thread = NULL; +} + +/** + * pinos_main_loop_lock: + * @loop: a #PinosMainLoop + * + * Lock the mutex associated with @loop. + */ +void +pinos_main_loop_lock (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv; + + g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + priv = loop->priv; + g_return_if_fail (!pinos_main_loop_in_thread (loop)); + + g_mutex_lock (&priv->lock); +} + +/** + * pinos_main_loop_unlock: + * @loop: a #PinosMainLoop + * + * Unlock the mutex associated with @loop. + */ +void +pinos_main_loop_unlock (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv; + + g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + priv = loop->priv; + g_return_if_fail (!pinos_main_loop_in_thread (loop)); + + g_mutex_unlock (&priv->lock); +} + +/** + * pinos_main_loop_signal: + * @loop: a #PinosMainLoop + * + * Signal the main thread of @loop. If @wait_for_accept is %TRUE, + * this function waits until pinos_main_loop_accept() is called. + */ +void +pinos_main_loop_signal (PinosMainLoop *loop, gboolean wait_for_accept) +{ + PinosMainLoopPrivate *priv; + + g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + priv = loop->priv; + + if (priv->n_waiting > 0) + g_cond_broadcast (&priv->cond); + + if (wait_for_accept) { + priv->n_waiting_for_accept++; + + while (priv->n_waiting_for_accept > 0) + g_cond_wait (&priv->accept_cond, &priv->lock); + } +} + +/** + * pinos_main_loop_wait: + * @loop: a #PinosMainLoop + * + * Wait for the loop thread to call pinos_main_loop_signal(). + */ +void +pinos_main_loop_wait (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv; + + g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + priv = loop->priv; + g_return_if_fail (!pinos_main_loop_in_thread (loop)); + + priv->n_waiting ++; + + g_cond_wait (&priv->cond, &priv->lock); + + g_assert (priv->n_waiting > 0); + priv->n_waiting --; +} + +/** + * pinos_main_loop_accept: + * @loop: a #PinosMainLoop + * + * Signal the loop thread waiting for accept with pinos_main_loop_signal(). + */ +void +pinos_main_loop_accept (PinosMainLoop *loop) +{ + PinosMainLoopPrivate *priv; + + g_return_if_fail (PINOS_IS_MAIN_LOOP (loop)); + priv = loop->priv; + g_return_if_fail (!pinos_main_loop_in_thread (loop)); + + g_assert (priv->n_waiting_for_accept > 0); + priv->n_waiting_for_accept--; + + g_cond_signal (&priv->accept_cond); +} + +/** + * pinos_main_loop_in_thread: + * @loop: a #PinosMainLoop + * + * Check if we are inside the thread of @loop. + * + * Returns: %TRUE when called inside the thread of @loop. + */ +gboolean +pinos_main_loop_in_thread (PinosMainLoop *loop) +{ + g_return_val_if_fail (PINOS_IS_MAIN_LOOP (loop), FALSE); + + return g_thread_self() == loop->priv->thread; +} diff --git a/src/client/mainloop.h b/src/client/mainloop.h new file mode 100644 index 000000000..746b6de4c --- /dev/null +++ b/src/client/mainloop.h @@ -0,0 +1,88 @@ +/* Pinos + * Copyright (C) 2015 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __PINOS_MAIN_LOOP_H__ +#define __PINOS_MAIN_LOOP_H__ + +#include +#include + +#include "context.h" + +G_BEGIN_DECLS + +#define PINOS_TYPE_MAIN_LOOP (pinos_main_loop_get_type ()) +#define PINOS_IS_MAIN_LOOP(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), PINOS_TYPE_MAIN_LOOP)) +#define PINOS_IS_MAIN_LOOP_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), PINOS_TYPE_MAIN_LOOP)) +#define PINOS_MAIN_LOOP_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), PINOS_TYPE_MAIN_LOOP, PinosMainLoopClass)) +#define PINOS_MAIN_LOOP(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), PINOS_TYPE_MAIN_LOOP, PinosMainLoop)) +#define PINOS_MAIN_LOOP_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), PINOS_TYPE_MAIN_LOOP, PinosMainLoopClass)) +#define PINOS_MAIN_LOOP_CAST(obj) ((PinosMainLoop*)(obj)) +#define PINOS_MAIN_LOOP_CLASS_CAST(klass) ((PinosMainLoopClass*)(klass)) + +typedef struct _PinosMainLoop PinosMainLoop; +typedef struct _PinosMainLoopClass PinosMainLoopClass; +typedef struct _PinosMainLoopPrivate PinosMainLoopPrivate; + + +/** + * PinosMainLoop: + * + * Pinos main loop object class. + */ +struct _PinosMainLoop { + GObject object; + + PinosMainLoopPrivate *priv; +}; + +/** + * PinosMainLoopClass: + * + * Pinos main loop object class. + */ +struct _PinosMainLoopClass { + GObjectClass parent_class; +}; + +/* normal GObject stuff */ +GType pinos_main_loop_get_type (void); + +PinosMainLoop * pinos_main_loop_new (GMainContext * context, + const gchar *name); + +GMainLoop * pinos_main_loop_get_impl (PinosMainLoop *loop); + +gboolean pinos_main_loop_start (PinosMainLoop *loop, GError **error); +void pinos_main_loop_stop (PinosMainLoop *loop); + +void pinos_main_loop_lock (PinosMainLoop *loop); +void pinos_main_loop_unlock (PinosMainLoop *loop); + +void pinos_main_loop_wait (PinosMainLoop *loop); +void pinos_main_loop_signal (PinosMainLoop *loop, gboolean wait_for_accept); +void pinos_main_loop_accept (PinosMainLoop *loop); + +gboolean pinos_main_loop_in_thread (PinosMainLoop *loop); + + +G_END_DECLS + +#endif /* __PINOS_MAIN_LOOP_H__ */ + diff --git a/src/client/pinos.h b/src/client/pinos.h index 576a5070a..06d32e779 100644 --- a/src/client/pinos.h +++ b/src/client/pinos.h @@ -20,10 +20,11 @@ #ifndef __PINOS_H__ #define __PINOS_H__ -#include #include -#include #include +#include +#include +#include #define PINOS_DBUS_SERVICE "org.pinos" #define PINOS_DBUS_OBJECT_PREFIX "/org/pinos" diff --git a/src/gst/gstpinossink.c b/src/gst/gstpinossink.c index cf784836c..c76b88256 100644 --- a/src/gst/gstpinossink.c +++ b/src/gst/gstpinossink.c @@ -123,8 +123,6 @@ static void gst_pinos_sink_init (GstPinosSink * sink) { sink->allocator = gst_tmpfile_allocator_new (); - g_mutex_init (&sink->lock); - g_cond_init (&sink->cond); } static GstCaps * @@ -198,7 +196,7 @@ on_new_buffer (GObject *gobject, { GstPinosSink *pinossink = user_data; - g_cond_signal (&pinossink->cond); + pinos_main_loop_signal (pinossink->loop, FALSE); } static void @@ -207,17 +205,26 @@ on_stream_notify (GObject *gobject, gpointer user_data) { PinosStreamState state; + PinosStream *stream = PINOS_STREAM (gobject); GstPinosSink *pinossink = user_data; - state = pinos_stream_get_state (pinossink->stream); - g_print ("got stream state %d\n", state); - g_cond_broadcast (&pinossink->cond); + state = pinos_stream_get_state (stream); + GST_DEBUG ("got stream state %d\n", state); - if (state == PINOS_STREAM_STATE_ERROR) { - GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pinos_stream_get_error (pinossink->stream)->message), (NULL)); + switch (state) { + case PINOS_STREAM_STATE_UNCONNECTED: + case PINOS_STREAM_STATE_CONNECTING: + case PINOS_STREAM_STATE_STARTING: + case PINOS_STREAM_STATE_STREAMING: + case PINOS_STREAM_STATE_READY: + break; + case PINOS_STREAM_STATE_ERROR: + GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, + ("stream error: %s", + pinos_stream_get_error (stream)->message), (NULL)); + break; } + pinos_main_loop_signal (pinossink->loop, FALSE); } static GstCaps * @@ -238,7 +245,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) str = gst_caps_to_string (caps); format = g_bytes_new_take (str, strlen (str) + 1); - g_mutex_lock (&pinossink->lock); + pinos_main_loop_lock (pinossink->loop); pinossink->stream = pinos_stream_new (pinossink->ctx, "test", NULL); g_signal_connect (pinossink->stream, "notify::state", (GCallback) on_stream_notify, pinossink); g_signal_connect (pinossink->stream, "new-buffer", (GCallback) on_new_buffer, pinossink); @@ -254,7 +261,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (state == PINOS_STREAM_STATE_ERROR) goto connect_error; - g_cond_wait (&pinossink->cond, &pinossink->lock); + pinos_main_loop_wait (pinossink->loop); } pinos_stream_start (pinossink->stream, format, PINOS_STREAM_MODE_BUFFER); @@ -268,9 +275,9 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (state == PINOS_STREAM_STATE_ERROR) goto connect_error; - g_cond_wait (&pinossink->cond, &pinossink->lock); + pinos_main_loop_wait (pinossink->loop); } - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); pinossink->negotiated = TRUE; @@ -278,7 +285,7 @@ gst_pinos_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) connect_error: { - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); return FALSE; } } @@ -323,11 +330,11 @@ gst_pinos_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_memory_unref (mem); info.message = mesg; - g_mutex_lock (&pinossink->lock); + pinos_main_loop_lock (pinossink->loop); if (pinos_stream_get_state (pinossink->stream) != PINOS_STREAM_STATE_STREAMING) goto streaming_error; pinos_stream_provide_buffer (pinossink->stream, &info); - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); return GST_FLOW_OK; @@ -341,7 +348,7 @@ map_error: } streaming_error: { - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); return GST_FLOW_ERROR; } } @@ -366,63 +373,48 @@ gst_pinos_sink_stop (GstBaseSink * basesink) return TRUE; } -static GPrivate sink_key; - -static gint -do_poll (GPollFD *ufds, guint nfsd, gint timeout_) -{ - gint res; - GstPinosSink *this = g_private_get (&sink_key); - - g_mutex_unlock (&this->lock); - res = this->poll_func (ufds, nfsd, timeout_); - g_mutex_lock (&this->lock); - - return res; -} - -static gpointer -handle_mainloop (GstPinosSink *this) -{ - g_mutex_lock (&this->lock); - g_private_set (&sink_key, this); - this->poll_func = g_main_context_get_poll_func (this->context); - g_main_context_set_poll_func (this->context, do_poll); - g_main_context_push_thread_default (this->context); - g_print ("run mainloop\n"); - g_main_loop_run (this->loop); - g_print ("quit mainloop\n"); - g_main_context_pop_thread_default (this->context); - g_mutex_unlock (&this->lock); - - return NULL; -} - static void -on_state_notify (GObject *gobject, - GParamSpec *pspec, - gpointer user_data) +on_context_notify (GObject *gobject, + GParamSpec *pspec, + gpointer user_data) { GstPinosSink *pinossink = user_data; + PinosContext *ctx = PINOS_CONTEXT (gobject); PinosContextState state; - state = pinos_context_get_state (pinossink->ctx); - g_print ("got context state %d\n", state); - g_cond_broadcast (&pinossink->cond); + state = pinos_context_get_state (ctx); + GST_DEBUG ("got context state %d\n", state); - if (state == PINOS_CONTEXT_STATE_ERROR) { - GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pinos_context_get_error (pinossink->ctx)->message), (NULL)); + switch (state) { + case PINOS_CONTEXT_STATE_UNCONNECTED: + case PINOS_CONTEXT_STATE_CONNECTING: + case PINOS_CONTEXT_STATE_REGISTERING: + case PINOS_CONTEXT_STATE_READY: + break; + case PINOS_CONTEXT_STATE_ERROR: + GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, + ("context error: %s", + pinos_context_get_error (pinossink->ctx)->message), (NULL)); + break; } + pinos_main_loop_signal (pinossink->loop, FALSE); } static gboolean gst_pinos_sink_open (GstPinosSink * pinossink) { - g_mutex_lock (&pinossink->lock); + GError *error = NULL; + + pinossink->context = g_main_context_new (); + GST_DEBUG ("context %p\n", pinossink->context); + + pinossink->loop = pinos_main_loop_new (pinossink->context, "pinos-sink-loop"); + if (!pinos_main_loop_start (pinossink->loop, &error)) + goto mainloop_error; + + pinos_main_loop_lock (pinossink->loop); pinossink->ctx = pinos_context_new (pinossink->context, "test-client", NULL); - g_signal_connect (pinossink->ctx, "notify::state", (GCallback) on_state_notify, pinossink); + g_signal_connect (pinossink->ctx, "notify::state", (GCallback) on_context_notify, pinossink); pinos_context_connect(pinossink->ctx, PINOS_CONTEXT_FLAGS_NONE); @@ -435,16 +427,22 @@ gst_pinos_sink_open (GstPinosSink * pinossink) if (state == PINOS_CONTEXT_STATE_ERROR) goto connect_error; - g_cond_wait (&pinossink->cond, &pinossink->lock); + pinos_main_loop_wait (pinossink->loop); } - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); return TRUE; /* ERRORS */ +mainloop_error: + { + GST_ELEMENT_ERROR (pinossink, RESOURCE, FAILED, + ("Failed to start mainloop: %s", error->message), (NULL)); + return FALSE; + } connect_error: { - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); return FALSE; } } @@ -452,13 +450,12 @@ connect_error: static gboolean gst_pinos_sink_close (GstPinosSink * pinossink) { - - g_mutex_lock (&pinossink->lock); + pinos_main_loop_lock (pinossink->loop); if (pinossink->stream) { pinos_stream_disconnect (pinossink->stream); } if (pinossink->ctx) { - pinos_context_disconnect(pinossink->ctx); + pinos_context_disconnect (pinossink->ctx); while (TRUE) { PinosContextState state = pinos_context_get_state (pinossink->ctx); @@ -469,10 +466,16 @@ gst_pinos_sink_close (GstPinosSink * pinossink) if (state == PINOS_CONTEXT_STATE_ERROR) break; - g_cond_wait (&pinossink->cond, &pinossink->lock); + pinos_main_loop_wait (pinossink->loop); } } - g_mutex_unlock (&pinossink->lock); + pinos_main_loop_unlock (pinossink->loop); + + pinos_main_loop_stop (pinossink->loop); + g_clear_object (&pinossink->loop); + g_clear_object (&pinossink->stream); + g_clear_object (&pinossink->ctx); + g_main_context_unref (pinossink->context); return TRUE; } @@ -485,14 +488,8 @@ gst_pinos_sink_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: - this->context = g_main_context_new (); - g_print ("context %p\n", this->context); - this->loop = g_main_loop_new (this->context, FALSE); - this->thread = g_thread_new ("pinos", (GThreadFunc) handle_mainloop, this); - if (!gst_pinos_sink_open (this)) { - ret = GST_STATE_CHANGE_FAILURE; - goto exit; - } + if (!gst_pinos_sink_open (this)) + goto open_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: break; @@ -515,17 +512,15 @@ gst_pinos_sink_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_READY_TO_NULL: gst_pinos_sink_close (this); - g_main_loop_quit (this->loop); - g_thread_join (this->thread); - g_main_loop_unref (this->loop); - g_clear_object (&this->stream); - g_clear_object (&this->ctx); - g_main_context_unref (this->context); break; default: break; } - -exit: return ret; + + /* ERRORS */ +open_failed: + { + return GST_STATE_CHANGE_FAILURE; + } } diff --git a/src/gst/gstpinossink.h b/src/gst/gstpinossink.h index ac6998abb..1eb46c04c 100644 --- a/src/gst/gstpinossink.h +++ b/src/gst/gstpinossink.h @@ -23,9 +23,7 @@ #include #include -#include -#include -#include +#include G_BEGIN_DECLS @@ -59,15 +57,10 @@ struct _GstPinosSink { gboolean negotiated; GMainContext *context; - GMainLoop *loop; - GThread *thread; + PinosMainLoop *loop; PinosContext *ctx; PinosStream *stream; GstAllocator *allocator; - - GPollFunc poll_func; - GMutex lock; - GCond cond; }; struct _GstPinosSinkClass { diff --git a/src/gst/gstpinossrc.c b/src/gst/gstpinossrc.c index a951ce9c4..d55203aca 100644 --- a/src/gst/gstpinossrc.c +++ b/src/gst/gstpinossrc.c @@ -121,8 +121,6 @@ gst_pinos_src_finalize (GObject * object) GstPinosSrc *pinossrc = GST_PINOS_SRC (object); g_object_unref (pinossrc->fd_allocator); - g_mutex_clear (&pinossrc->lock); - g_cond_clear (&pinossrc->cond); g_free (pinossrc->source); G_OBJECT_CLASS (parent_class)->finalize (object); @@ -186,8 +184,6 @@ gst_pinos_src_init (GstPinosSrc * src) gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE); src->fd_allocator = gst_fd_allocator_new (); - g_mutex_init (&src->lock); - g_cond_init (&src->cond); } static GstCaps * @@ -239,7 +235,7 @@ on_new_buffer (GObject *gobject, { GstPinosSrc *pinossrc = user_data; - g_cond_signal (&pinossrc->cond); + pinos_main_loop_signal (pinossrc->loop, FALSE); } static void @@ -247,18 +243,25 @@ on_stream_notify (GObject *gobject, GParamSpec *pspec, gpointer user_data) { - PinosStreamState state; GstPinosSrc *pinossrc = user_data; + PinosStreamState state = pinos_stream_get_state (pinossrc->stream); - state = pinos_stream_get_state (pinossrc->stream); - g_print ("got stream state %d\n", state); - g_cond_broadcast (&pinossrc->cond); + GST_DEBUG ("got stream state %d\n", state); - if (state == PINOS_STREAM_STATE_ERROR) { - GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pinos_stream_get_error (pinossrc->stream)->message), (NULL)); + switch (state) { + case PINOS_STREAM_STATE_UNCONNECTED: + case PINOS_STREAM_STATE_CONNECTING: + case PINOS_STREAM_STATE_STARTING: + case PINOS_STREAM_STATE_STREAMING: + case PINOS_STREAM_STATE_READY: + break; + case PINOS_STREAM_STATE_ERROR: + GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, + ("stream error: %s", + pinos_stream_get_error (pinossrc->stream)->message), (NULL)); + break; } + pinos_main_loop_signal (pinossrc->loop, FALSE); } static gboolean @@ -300,7 +303,7 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) str = gst_caps_to_string (caps); accepted = g_bytes_new_take (str, strlen (str) + 1); - g_mutex_lock (&pinossrc->lock); + pinos_main_loop_lock (pinossrc->loop); pinos_stream_connect_capture (pinossrc->stream, pinossrc->source, 0, accepted); while (TRUE) { @@ -312,9 +315,9 @@ gst_pinos_src_negotiate (GstBaseSrc * basesrc) if (state == PINOS_STREAM_STATE_ERROR) goto connect_error; - g_cond_wait (&pinossrc->cond, &pinossrc->lock); + pinos_main_loop_wait (pinossrc->loop); } - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); g_object_get (pinossrc->stream, "possible-formats", &possible, NULL); if (possible) { @@ -367,7 +370,7 @@ no_caps: } connect_error: { - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); return FALSE; } } @@ -391,9 +394,9 @@ gst_pinos_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps) str = gst_caps_to_string (caps); format = g_bytes_new_take (str, strlen (str) + 1); - g_mutex_lock (&pinossrc->lock); + pinos_main_loop_lock (pinossrc->loop); res = pinos_stream_start (pinossrc->stream, format, PINOS_STREAM_MODE_BUFFER); - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); return res; } @@ -412,11 +415,11 @@ gst_pinos_src_create (GstPushSrc * psrc, GstBuffer ** buffer) goto not_negotiated; again: - g_mutex_lock (&pinossrc->lock); + pinos_main_loop_lock (pinossrc->loop); while (TRUE) { PinosStreamState state; - g_cond_wait (&pinossrc->cond, &pinossrc->lock); + pinos_main_loop_wait (pinossrc->loop); state = pinos_stream_get_state (pinossrc->stream); if (state == PINOS_STREAM_STATE_ERROR) @@ -429,7 +432,7 @@ again: if (info.message != NULL) break; } - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); if (g_socket_control_message_get_msg_type (info.message) != SCM_RIGHTS) goto again; @@ -453,12 +456,12 @@ not_negotiated: } streaming_error: { - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); return GST_FLOW_ERROR; } streaming_stopped: { - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); return GST_FLOW_FLUSHING; } } @@ -475,64 +478,46 @@ gst_pinos_src_stop (GstBaseSrc * basesrc) return TRUE; } -static GPrivate src_key; - -static gint -do_poll (GPollFD *ufds, guint nfsd, gint timeout_) -{ - gint res; - GstPinosSrc *this = g_private_get (&src_key); - - g_mutex_unlock (&this->lock); - res = this->poll_func (ufds, nfsd, timeout_); - g_mutex_lock (&this->lock); - - return res; -} - -static gpointer -handle_mainloop (GstPinosSrc *this) -{ - g_mutex_lock (&this->lock); - g_private_set (&src_key, this); - this->poll_func = g_main_context_get_poll_func (this->context); - g_main_context_set_poll_func (this->context, do_poll); - g_main_context_push_thread_default (this->context); - g_print ("run mainloop\n"); - g_main_loop_run (this->loop); - g_print ("quit mainloop\n"); - g_main_context_pop_thread_default (this->context); - g_mutex_unlock (&this->lock); - - return NULL; -} - static void -on_state_notify (GObject *gobject, - GParamSpec *pspec, - gpointer user_data) +on_context_notify (GObject *gobject, + GParamSpec *pspec, + gpointer user_data) { GstPinosSrc *pinossrc = user_data; - PinosContextState state; + PinosContextState state = pinos_context_get_state (pinossrc->ctx); - state = pinos_context_get_state (pinossrc->ctx); - g_print ("got context state %d\n", state); - g_cond_broadcast (&pinossrc->cond); + GST_DEBUG ("got context state %d\n", state); - if (state == PINOS_CONTEXT_STATE_ERROR) { - GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pinos_context_get_error (pinossrc->ctx)->message), (NULL)); + switch (state) { + case PINOS_CONTEXT_STATE_UNCONNECTED: + case PINOS_CONTEXT_STATE_CONNECTING: + case PINOS_CONTEXT_STATE_REGISTERING: + case PINOS_CONTEXT_STATE_READY: + break; + case PINOS_CONTEXT_STATE_ERROR: + GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, + ("context error: %s", + pinos_context_get_error (pinossrc->ctx)->message), (NULL)); + break; } + pinos_main_loop_signal (pinossrc->loop, FALSE); } static gboolean gst_pinos_src_open (GstPinosSrc * pinossrc) { + GError *error = NULL; - g_mutex_lock (&pinossrc->lock); + pinossrc->context = g_main_context_new (); + GST_DEBUG ("context %p\n", pinossrc->context); + + pinossrc->loop = pinos_main_loop_new (pinossrc->context, "pinos-main-loop"); + if (!pinos_main_loop_start (pinossrc->loop, &error)) + goto mainloop_failed; + + pinos_main_loop_lock (pinossrc->loop); pinossrc->ctx = pinos_context_new (pinossrc->context, "test-client", NULL); - g_signal_connect (pinossrc->ctx, "notify::state", (GCallback) on_state_notify, pinossrc); + g_signal_connect (pinossrc->ctx, "notify::state", (GCallback) on_context_notify, pinossrc); pinos_context_connect(pinossrc->ctx, PINOS_CONTEXT_FLAGS_NONE); @@ -545,22 +530,36 @@ gst_pinos_src_open (GstPinosSrc * pinossrc) if (state == PINOS_CONTEXT_STATE_ERROR) goto connect_error; - g_cond_wait (&pinossrc->cond, &pinossrc->lock); + pinos_main_loop_wait (pinossrc->loop); } pinossrc->stream = pinos_stream_new (pinossrc->ctx, "test", NULL); g_signal_connect (pinossrc->stream, "notify::state", (GCallback) on_stream_notify, pinossrc); g_signal_connect (pinossrc->stream, "new-buffer", (GCallback) on_new_buffer, pinossrc); - g_mutex_unlock (&pinossrc->lock); + pinos_main_loop_unlock (pinossrc->loop); return TRUE; /* ERRORS */ -connect_error: +mainloop_failed: { - g_mutex_unlock (&pinossrc->lock); + GST_ELEMENT_ERROR (pinossrc, RESOURCE, FAILED, + ("mainloop error: %s", error->message), (NULL)); return FALSE; } +connect_error: + { + pinos_main_loop_unlock (pinossrc->loop); + return FALSE; + } +} + +static void +gst_pinos_src_close (GstPinosSrc * pinossrc) +{ + pinos_main_loop_stop (pinossrc->loop); + g_clear_object (&pinossrc->loop); + g_main_context_unref (pinossrc->context); } static GstStateChangeReturn @@ -571,14 +570,8 @@ gst_pinos_src_change_state (GstElement * element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_NULL_TO_READY: - this->context = g_main_context_new (); - g_print ("context %p\n", this->context); - this->loop = g_main_loop_new (this->context, FALSE); - this->thread = g_thread_new ("pinos", (GThreadFunc) handle_mainloop, this); - if (!gst_pinos_src_open (this)) { - ret = GST_STATE_CHANGE_FAILURE; - goto exit; - } + if (!gst_pinos_src_open (this)) + goto open_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: break; @@ -601,15 +594,16 @@ gst_pinos_src_change_state (GstElement * element, GstStateChange transition) this->negotiated = FALSE; break; case GST_STATE_CHANGE_READY_TO_NULL: - g_main_loop_quit (this->loop); - g_thread_join (this->thread); - g_main_loop_unref (this->loop); - g_main_context_unref (this->context); + gst_pinos_src_close (this); break; default: break; } - -exit: return ret; + + /* ERRORS */ +open_failed: + { + return GST_STATE_CHANGE_FAILURE; + } } diff --git a/src/gst/gstpinossrc.h b/src/gst/gstpinossrc.h index 59b8673fc..0b3e4c5a1 100644 --- a/src/gst/gstpinossrc.h +++ b/src/gst/gstpinossrc.h @@ -23,9 +23,7 @@ #include #include -#include -#include -#include +#include G_BEGIN_DECLS @@ -59,15 +57,10 @@ struct _GstPinosSrc { gboolean negotiated; GMainContext *context; - GMainLoop *loop; - GThread *thread; + PinosMainLoop *loop; PinosContext *ctx; PinosStream *stream; GstAllocator *fd_allocator; - - GPollFunc poll_func; - GMutex lock; - GCond cond; }; struct _GstPinosSrcClass {