From b68b62740c50acc06f4ce640c1ebaaee0b166b71 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 12 Jun 2015 12:10:27 +0200 Subject: [PATCH] Fix locking and threadsafety Emit all notify in the context thread. Handle error cases better. Make sure we hold the lock when running the mainloop, this will cause all of the signals to be emited with the lock. Make sure we call all methods with the lock. --- src/Makefile.am | 10 ++--- src/client/pv-context.c | 18 +++++++-- src/client/pv-stream.c | 43 ++++++++++++++++----- src/gst/gstpvsink.c | 58 +++++++++++++++++++--------- src/gst/gstpvsink.h | 1 + src/gst/gstpvsrc.c | 85 +++++++++++++++++++++++++++++------------ src/gst/gstpvsrc.h | 1 + 7 files changed, 153 insertions(+), 63 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 22a4be67f..2c2a75b4b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -155,8 +155,7 @@ pulsevideoinclude_HEADERS = \ client/pv-enumtypes.h \ client/pv-introspect.h \ client/pv-stream.h \ - client/pv-subscribe.h \ - dbus/org-pulsevideo.h + client/pv-subscribe.h lib_LTLIBRARIES = \ libpulsevideo-@PV_MAJORMINOR@.la @@ -169,7 +168,6 @@ libpulsevideo_@PV_MAJORMINOR@_la_SOURCES = \ client/pv-stream.h client/pv-stream.c \ client/pulsevideo.c client/pulsevideo.h \ client/pv-subscribe.c client/pv-subscribe.h \ - dbus/org-pulsevideo.c \ $(pulsevideogstsource) @@ -191,11 +189,13 @@ libpulsevideocore_@PV_MAJORMINOR@_la_SOURCES = \ server/pv-client-source.c server/pv-client-source.h \ server/pv-source-output.c server/pv-source-output.h \ modules/gst/pv-gst-manager.c \ - modules/gst/pv-gst-source.c + modules/gst/pv-gst-source.c \ + dbus/org-pulsevideo.c dbus/org-pulsevideo.h libpulsevideocore_@PV_MAJORMINOR@_la_CFLAGS = $(AM_CFLAGS) $(SERVER_CFLAGS) libpulsevideocore_@PV_MAJORMINOR@_la_LDFLAGS = $(AM_LDFLAGS) -avoid-version -libpulsevideocore_@PV_MAJORMINOR@_la_LIBADD = $(AM_LIBADD) $(LIBLTDL) $(LTLIBICONV) +libpulsevideocore_@PV_MAJORMINOR@_la_LIBADD = $(AM_LIBADD) $(LIBLTDL) $(LTLIBICONV) \ + libpulsevideo-@PV_MAJORMINOR@.la ################################### # GStreamer Plugin # diff --git a/src/client/pv-context.c b/src/client/pv-context.c index 79cfc7087..32450b4db 100644 --- a/src/client/pv-context.c +++ b/src/client/pv-context.c @@ -302,12 +302,22 @@ pv_context_new (GMainContext *context, const gchar *name, GVariant *properties) return g_object_new (PV_TYPE_CONTEXT, "main-context", context, "name", name, "properties", properties, NULL); } +static gboolean +do_notify_state (PvContext *context) +{ + g_object_notify (G_OBJECT (context), "state"); + g_object_unref (context); + return FALSE; +} + static void context_set_state (PvContext *context, PvContextState state) { if (context->priv->state != state) { context->priv->state = state; - g_object_notify (G_OBJECT (context), "state"); + g_main_context_invoke (context->priv->context, + (GSourceFunc) do_notify_state, + g_object_ref (context)); } } static void @@ -333,7 +343,7 @@ client_failed: { priv->error = error; context_set_state (context, PV_STREAM_STATE_ERROR); - g_error ("failed to get client proxy: %s", error->message); + g_warning ("failed to get client proxy: %s", error->message); return; } } @@ -351,7 +361,7 @@ on_client_connected (GObject *source_object, ret = g_dbus_proxy_call_finish (priv->daemon, res, &error); if (ret == NULL) { - g_error ("failed to connect client: %s", error->message); + g_warning ("failed to connect client: %s", error->message); priv->error = error; context_set_state (context, PV_CONTEXT_STATE_ERROR); return; @@ -554,7 +564,7 @@ on_client_disconnected (GObject *source_object, ret = g_dbus_proxy_call_finish (priv->client, res, &error); if (ret == NULL) { - g_error ("failed to disconnect client: %s", error->message); + g_warning ("failed to disconnect client: %s", error->message); priv->error = error; context_set_state (context, PV_CONTEXT_STATE_ERROR); g_object_unref (context); diff --git a/src/client/pv-stream.c b/src/client/pv-stream.c index e5db0a122..58569d604 100644 --- a/src/client/pv-stream.c +++ b/src/client/pv-stream.c @@ -154,12 +154,22 @@ pv_stream_set_property (GObject *_object, } } +static gboolean +do_notify_state (PvStream *stream) +{ + g_object_notify (G_OBJECT (stream), "state"); + g_object_unref (stream); + return FALSE; +} + static void stream_set_state (PvStream *stream, PvStreamState state) { if (stream->priv->state != state) { stream->priv->state = state; - g_object_notify (G_OBJECT (stream), "state"); + g_main_context_invoke (stream->priv->context->priv->context, + (GSourceFunc) do_notify_state, + g_object_ref (stream)); } } @@ -461,7 +471,7 @@ source_output_failed: { priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_error ("failed to get source output proxy: %s", error->message); + g_warning ("failed to get source output proxy: %s", error->message); g_object_unref (stream); return; } @@ -687,7 +697,7 @@ pv_stream_disconnect (PvStream *stream) g_return_val_if_fail (priv->state >= PV_STREAM_STATE_READY, FALSE); g_return_val_if_fail (priv->source_output != NULL, FALSE); context = priv->context; - g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE); + g_return_val_if_fail (pv_context_get_state (context) >= PV_CONTEXT_STATE_READY, FALSE); g_main_context_invoke (context->priv->context, (GSourceFunc) do_disconnect, @@ -795,7 +805,7 @@ socket_failed: { priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_error ("failed to create socket: %s", error->message); + g_warning ("failed to create socket: %s", error->message); return; } } @@ -868,12 +878,12 @@ on_stream_started (GObject *source_object, /* ERRORS */ start_failed: { - g_error ("failed to start: %s", error->message); + g_warning ("failed to start: %s", error->message); goto exit_error; } fd_failed: { - g_error ("failed to get FD: %s", error->message); + g_warning ("failed to get FD: %s", error->message); goto exit_error; } exit_error: @@ -966,7 +976,7 @@ call_failed: { priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_error ("failed to release: %s", error->message); + g_warning ("failed to release: %s", error->message); return; } } @@ -1082,11 +1092,24 @@ pv_stream_provide_buffer (PvStream *stream, PvBufferInfo *info) flags, NULL, &error); + if (info->message) { + g_object_unref (info->message); + info->message = NULL; + } + + if (len == -1) + goto send_error; + g_assert (len == sizeof (msg)); - if (info->message) - g_object_unref (info->message); - return TRUE; + +send_error: + { + priv->error = error; + stream_set_state (stream, PV_STREAM_STATE_ERROR); + g_warning ("failed to send_message: %s", error->message); + return FALSE; + } } diff --git a/src/gst/gstpvsink.c b/src/gst/gstpvsink.c index 5f768932e..4800790c1 100644 --- a/src/gst/gstpvsink.c +++ b/src/gst/gstpvsink.c @@ -192,9 +192,7 @@ on_new_buffer (GObject *gobject, { GstPulsevideoSink *pvsink = user_data; - g_mutex_lock (&pvsink->lock); g_cond_signal (&pvsink->cond); - g_mutex_unlock (&pvsink->lock); } static void @@ -205,23 +203,21 @@ on_stream_notify (GObject *gobject, PvStreamState state; GstPulsevideoSink *pvsink = user_data; - g_mutex_lock (&pvsink->lock); state = pv_stream_get_state (pvsink->stream); g_print ("got stream state %d\n", state); + g_cond_broadcast (&pvsink->cond); + if (state == PV_STREAM_STATE_ERROR) { GST_ELEMENT_ERROR (pvsink, RESOURCE, FAILED, ("Failed to connect stream: %s", pv_stream_get_error (pvsink->stream)->message), (NULL)); } - g_cond_broadcast (&pvsink->cond); - g_mutex_unlock (&pvsink->lock); } static GstCaps * gst_pulsevideo_sink_getcaps (GstBaseSink * bsink, GstCaps * filter) { return GST_BASE_SINK_CLASS (parent_class)->get_caps (bsink, filter); - } static gboolean @@ -233,16 +229,16 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) pvsink = GST_PULSEVIDEO_SINK (bsink); + str = gst_caps_to_string (caps); + format = g_bytes_new_take (str, strlen (str) + 1); + + g_mutex_lock (&pvsink->lock); pvsink->stream = pv_stream_new (pvsink->ctx, "test", NULL); g_signal_connect (pvsink->stream, "notify::state", (GCallback) on_stream_notify, pvsink); g_signal_connect (pvsink->stream, "new-buffer", (GCallback) on_new_buffer, pvsink); - str = gst_caps_to_string (caps); - format = g_bytes_new_take (str, strlen (str) + 1); - pv_stream_connect_provide (pvsink->stream, 0, format); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvStreamState state = pv_stream_get_state (pvsink->stream); @@ -254,11 +250,9 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) g_cond_wait (&pvsink->cond, &pvsink->lock); } - g_mutex_unlock (&pvsink->lock); pv_stream_start (pvsink->stream, format, PV_STREAM_MODE_BUFFER); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvStreamState state = pv_stream_get_state (pvsink->stream); @@ -315,7 +309,7 @@ gst_pulsevideo_sink_render (GstBaseSink * bsink, GstBuffer * buffer) mem = gst_allocator_alloc (pvsink->allocator, info.size, ¶ms); if (!gst_memory_map (mem, &minfo, GST_MAP_WRITE)) - goto error; + goto map_error; gst_buffer_extract (buffer, 0, minfo.data, info.size); gst_memory_unmap (mem, &minfo); } @@ -324,6 +318,8 @@ gst_pulsevideo_sink_render (GstBaseSink * bsink, GstBuffer * buffer) info.message = mesg; g_mutex_lock (&pvsink->lock); + if (pv_stream_get_state (pvsink->stream) != PV_STREAM_STATE_STREAMING) + goto streaming_error; pv_stream_provide_buffer (pvsink->stream, &info); g_mutex_unlock (&pvsink->lock); @@ -333,10 +329,15 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } -error: +map_error: { return GST_FLOW_ERROR; } +streaming_error: + { + g_mutex_unlock (&pvsink->lock); + return GST_FLOW_ERROR; + } } static gboolean @@ -359,14 +360,34 @@ gst_pulsevideo_sink_stop (GstBaseSink * basesink) return TRUE; } +static GPrivate sink_key; + +static gint +do_poll (GPollFD *ufds, guint nfsd, gint timeout_) +{ + gint res; + GstPulsevideoSink *this = g_private_get (&sink_key); + + g_mutex_unlock (&this->lock); + res = this->poll_func (ufds, nfsd, timeout_); + g_mutex_lock (&this->lock); + + return res; +} + static gpointer handle_mainloop (GstPulsevideoSink *this) { + g_mutex_lock (&this->lock); + g_private_set (&sink_key, this); + this->poll_func = g_main_context_get_poll_func (this->context); + g_main_context_set_poll_func (this->context, do_poll); g_main_context_push_thread_default (this->context); g_print ("run mainloop\n"); g_main_loop_run (this->loop); g_print ("quit mainloop\n"); g_main_context_pop_thread_default (this->context); + g_mutex_unlock (&this->lock); return NULL; } @@ -379,11 +400,9 @@ on_state_notify (GObject *gobject, GstPulsevideoSink *pvsink = user_data; PvContextState state; - g_mutex_lock (&pvsink->lock); state = pv_context_get_state (pvsink->ctx); g_print ("got context state %d\n", state); g_cond_broadcast (&pvsink->cond); - g_mutex_unlock (&pvsink->lock); if (state == PV_CONTEXT_STATE_ERROR) { GST_ELEMENT_ERROR (pvsink, RESOURCE, FAILED, @@ -395,12 +414,12 @@ on_state_notify (GObject *gobject, static gboolean gst_pulsevideo_sink_open (GstPulsevideoSink * pvsink) { + g_mutex_lock (&pvsink->lock); pvsink->ctx = pv_context_new (pvsink->context, "test-client", NULL); g_signal_connect (pvsink->ctx, "notify::state", (GCallback) on_state_notify, pvsink); pv_context_connect(pvsink->ctx, PV_CONTEXT_FLAGS_NONE); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvContextState state = pv_context_get_state (pvsink->ctx); @@ -427,13 +446,14 @@ connect_error: static gboolean gst_pulsevideo_sink_close (GstPulsevideoSink * pvsink) { + + g_mutex_lock (&pvsink->lock); if (pvsink->stream) { pv_stream_disconnect (pvsink->stream); } if (pvsink->ctx) { pv_context_disconnect(pvsink->ctx); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvContextState state = pv_context_get_state (pvsink->ctx); @@ -445,8 +465,8 @@ gst_pulsevideo_sink_close (GstPulsevideoSink * pvsink) g_cond_wait (&pvsink->cond, &pvsink->lock); } - g_mutex_unlock (&pvsink->lock); } + g_mutex_unlock (&pvsink->lock); return TRUE; } diff --git a/src/gst/gstpvsink.h b/src/gst/gstpvsink.h index 955b5c6fe..f5e6c5faf 100644 --- a/src/gst/gstpvsink.h +++ b/src/gst/gstpvsink.h @@ -65,6 +65,7 @@ struct _GstPulsevideoSink { PvStream *stream; GstAllocator *allocator; + GPollFunc poll_func; GMutex lock; GCond cond; }; diff --git a/src/gst/gstpvsrc.c b/src/gst/gstpvsrc.c index 30da4a210..95eff7061 100644 --- a/src/gst/gstpvsrc.c +++ b/src/gst/gstpvsrc.c @@ -183,6 +183,7 @@ gst_pulsevideo_src_init (GstPulsevideoSrc * src) /* we operate in time */ gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); gst_base_src_set_live (GST_BASE_SRC (src), TRUE); + gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE); src->fd_allocator = gst_fd_allocator_new (); g_mutex_init (&src->lock); @@ -232,9 +233,7 @@ on_new_buffer (GObject *gobject, { GstPulsevideoSrc *pvsrc = user_data; - g_mutex_lock (&pvsrc->lock); g_cond_signal (&pvsrc->cond); - g_mutex_unlock (&pvsrc->lock); } static void @@ -245,16 +244,15 @@ on_stream_notify (GObject *gobject, PvStreamState state; GstPulsevideoSrc *pvsrc = user_data; - g_mutex_lock (&pvsrc->lock); state = pv_stream_get_state (pvsrc->stream); g_print ("got stream state %d\n", state); + g_cond_broadcast (&pvsrc->cond); + if (state == PV_STREAM_STATE_ERROR) { GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, ("Failed to connect stream: %s", pv_stream_get_error (pvsrc->stream)->message), (NULL)); } - g_cond_broadcast (&pvsrc->cond); - g_mutex_unlock (&pvsrc->lock); } static gboolean @@ -295,9 +293,10 @@ gst_pulsevideo_src_negotiate (GstBaseSrc * basesrc) /* open a connection with these caps */ str = gst_caps_to_string (caps); accepted = g_bytes_new_take (str, strlen (str) + 1); - pv_stream_connect_capture (pvsrc->stream, pvsrc->source, 0, accepted); g_mutex_lock (&pvsrc->lock); + pv_stream_connect_capture (pvsrc->stream, pvsrc->source, 0, accepted); + while (TRUE) { PvStreamState state = pv_stream_get_state (pvsrc->stream); @@ -379,13 +378,18 @@ gst_pulsevideo_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps) GstPulsevideoSrc *pvsrc; gchar *str; GBytes *format; + gboolean res; pvsrc = GST_PULSEVIDEO_SRC (bsrc); str = gst_caps_to_string (caps); format = g_bytes_new_take (str, strlen (str) + 1); - return pv_stream_start (pvsrc->stream, format, PV_STREAM_MODE_BUFFER); + g_mutex_lock (&pvsrc->lock); + res = pv_stream_start (pvsrc->stream, format, PV_STREAM_MODE_BUFFER); + g_mutex_unlock (&pvsrc->lock); + + return res; } static GstFlowReturn @@ -393,17 +397,26 @@ gst_pulsevideo_src_create (GstPushSrc * psrc, GstBuffer ** buffer) { GstPulsevideoSrc *pvsrc; PvBufferInfo info; + gint *fds, n_fds; + GstMemory *fdmem = NULL; pvsrc = GST_PULSEVIDEO_SRC (psrc); if (!pvsrc->negotiated) goto not_negotiated; +again: g_mutex_lock (&pvsrc->lock); while (TRUE) { + PvStreamState state; + g_cond_wait (&pvsrc->cond, &pvsrc->lock); - if (pv_stream_get_state (pvsrc->stream) != PV_STREAM_STATE_STREAMING) + state = pv_stream_get_state (pvsrc->stream); + if (state == PV_STREAM_STATE_ERROR) + goto streaming_error; + + if (state != PV_STREAM_STATE_STREAMING) goto streaming_stopped; pv_stream_capture_buffer (pvsrc->stream, &info); @@ -412,20 +425,19 @@ gst_pulsevideo_src_create (GstPushSrc * psrc, GstBuffer ** buffer) } g_mutex_unlock (&pvsrc->lock); + if (g_socket_control_message_get_msg_type (info.message) != SCM_RIGHTS) + goto again; + + fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (info.message), &n_fds); + if (n_fds < 1 || fds[0] < 0) + goto again; + + fdmem = gst_fd_allocator_alloc (pvsrc->fd_allocator, fds[0], + info.offset + info.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, info.offset, info.size); + *buffer = gst_buffer_new (); - - if (g_socket_control_message_get_msg_type (info.message) == SCM_RIGHTS) { - gint *fds, n_fds; - GstMemory *fdmem = NULL; - - fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (info.message), &n_fds); - - fdmem = gst_fd_allocator_alloc (pvsrc->fd_allocator, fds[0], - info.offset + info.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, info.offset, info.size); - - gst_buffer_append_memory (*buffer, fdmem); - } + gst_buffer_append_memory (*buffer, fdmem); return GST_FLOW_OK; @@ -433,6 +445,11 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } +streaming_error: + { + g_mutex_unlock (&pvsrc->lock); + return GST_FLOW_ERROR; + } streaming_stopped: { g_mutex_unlock (&pvsrc->lock); @@ -452,14 +469,34 @@ gst_pulsevideo_src_stop (GstBaseSrc * basesrc) return TRUE; } +static GPrivate src_key; + +static gint +do_poll (GPollFD *ufds, guint nfsd, gint timeout_) +{ + gint res; + GstPulsevideoSrc *this = g_private_get (&src_key); + + g_mutex_unlock (&this->lock); + res = this->poll_func (ufds, nfsd, timeout_); + g_mutex_lock (&this->lock); + + return res; +} + static gpointer handle_mainloop (GstPulsevideoSrc *this) { + g_mutex_lock (&this->lock); + g_private_set (&src_key, this); + this->poll_func = g_main_context_get_poll_func (this->context); + g_main_context_set_poll_func (this->context, do_poll); g_main_context_push_thread_default (this->context); g_print ("run mainloop\n"); g_main_loop_run (this->loop); g_print ("quit mainloop\n"); g_main_context_pop_thread_default (this->context); + g_mutex_unlock (&this->lock); return NULL; } @@ -472,11 +509,9 @@ on_state_notify (GObject *gobject, GstPulsevideoSrc *pvsrc = user_data; PvContextState state; - g_mutex_lock (&pvsrc->lock); state = pv_context_get_state (pvsrc->ctx); g_print ("got context state %d\n", state); g_cond_broadcast (&pvsrc->cond); - g_mutex_unlock (&pvsrc->lock); if (state == PV_CONTEXT_STATE_ERROR) { GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, @@ -489,12 +524,12 @@ static gboolean gst_pulsevideo_src_open (GstPulsevideoSrc * pvsrc) { + g_mutex_lock (&pvsrc->lock); pvsrc->ctx = pv_context_new (pvsrc->context, "test-client", NULL); g_signal_connect (pvsrc->ctx, "notify::state", (GCallback) on_state_notify, pvsrc); pv_context_connect(pvsrc->ctx, PV_CONTEXT_FLAGS_NONE); - g_mutex_lock (&pvsrc->lock); while (TRUE) { PvContextState state = pv_context_get_state (pvsrc->ctx); @@ -506,11 +541,11 @@ gst_pulsevideo_src_open (GstPulsevideoSrc * pvsrc) g_cond_wait (&pvsrc->cond, &pvsrc->lock); } - g_mutex_unlock (&pvsrc->lock); pvsrc->stream = pv_stream_new (pvsrc->ctx, "test", NULL); g_signal_connect (pvsrc->stream, "notify::state", (GCallback) on_stream_notify, pvsrc); g_signal_connect (pvsrc->stream, "new-buffer", (GCallback) on_new_buffer, pvsrc); + g_mutex_unlock (&pvsrc->lock); return TRUE; diff --git a/src/gst/gstpvsrc.h b/src/gst/gstpvsrc.h index 93ad30e6d..01a196d46 100644 --- a/src/gst/gstpvsrc.h +++ b/src/gst/gstpvsrc.h @@ -65,6 +65,7 @@ struct _GstPulsevideoSrc { PvStream *stream; GstAllocator *fd_allocator; + GPollFunc poll_func; GMutex lock; GCond cond; };