Fix locking and threadsafety

Emit all notify in the context thread.
Handle error cases better.
Make sure we hold the lock when running the mainloop, this will cause
all of the signals to be emited with the lock. Make sure we call all
methods with the lock.
This commit is contained in:
Wim Taymans 2015-06-12 12:10:27 +02:00
parent 83223f0a83
commit b68b62740c
7 changed files with 153 additions and 63 deletions

View file

@ -192,9 +192,7 @@ on_new_buffer (GObject *gobject,
{
GstPulsevideoSink *pvsink = user_data;
g_mutex_lock (&pvsink->lock);
g_cond_signal (&pvsink->cond);
g_mutex_unlock (&pvsink->lock);
}
static void
@ -205,23 +203,21 @@ on_stream_notify (GObject *gobject,
PvStreamState state;
GstPulsevideoSink *pvsink = user_data;
g_mutex_lock (&pvsink->lock);
state = pv_stream_get_state (pvsink->stream);
g_print ("got stream state %d\n", state);
g_cond_broadcast (&pvsink->cond);
if (state == PV_STREAM_STATE_ERROR) {
GST_ELEMENT_ERROR (pvsink, RESOURCE, FAILED,
("Failed to connect stream: %s",
pv_stream_get_error (pvsink->stream)->message), (NULL));
}
g_cond_broadcast (&pvsink->cond);
g_mutex_unlock (&pvsink->lock);
}
static GstCaps *
gst_pulsevideo_sink_getcaps (GstBaseSink * bsink, GstCaps * filter)
{
return GST_BASE_SINK_CLASS (parent_class)->get_caps (bsink, filter);
}
static gboolean
@ -233,16 +229,16 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
pvsink = GST_PULSEVIDEO_SINK (bsink);
str = gst_caps_to_string (caps);
format = g_bytes_new_take (str, strlen (str) + 1);
g_mutex_lock (&pvsink->lock);
pvsink->stream = pv_stream_new (pvsink->ctx, "test", NULL);
g_signal_connect (pvsink->stream, "notify::state", (GCallback) on_stream_notify, pvsink);
g_signal_connect (pvsink->stream, "new-buffer", (GCallback) on_new_buffer, pvsink);
str = gst_caps_to_string (caps);
format = g_bytes_new_take (str, strlen (str) + 1);
pv_stream_connect_provide (pvsink->stream, 0, format);
g_mutex_lock (&pvsink->lock);
while (TRUE) {
PvStreamState state = pv_stream_get_state (pvsink->stream);
@ -254,11 +250,9 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
g_cond_wait (&pvsink->cond, &pvsink->lock);
}
g_mutex_unlock (&pvsink->lock);
pv_stream_start (pvsink->stream, format, PV_STREAM_MODE_BUFFER);
g_mutex_lock (&pvsink->lock);
while (TRUE) {
PvStreamState state = pv_stream_get_state (pvsink->stream);
@ -315,7 +309,7 @@ gst_pulsevideo_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
mem = gst_allocator_alloc (pvsink->allocator, info.size, &params);
if (!gst_memory_map (mem, &minfo, GST_MAP_WRITE))
goto error;
goto map_error;
gst_buffer_extract (buffer, 0, minfo.data, info.size);
gst_memory_unmap (mem, &minfo);
}
@ -324,6 +318,8 @@ gst_pulsevideo_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
info.message = mesg;
g_mutex_lock (&pvsink->lock);
if (pv_stream_get_state (pvsink->stream) != PV_STREAM_STATE_STREAMING)
goto streaming_error;
pv_stream_provide_buffer (pvsink->stream, &info);
g_mutex_unlock (&pvsink->lock);
@ -333,10 +329,15 @@ not_negotiated:
{
return GST_FLOW_NOT_NEGOTIATED;
}
error:
map_error:
{
return GST_FLOW_ERROR;
}
streaming_error:
{
g_mutex_unlock (&pvsink->lock);
return GST_FLOW_ERROR;
}
}
static gboolean
@ -359,14 +360,34 @@ gst_pulsevideo_sink_stop (GstBaseSink * basesink)
return TRUE;
}
static GPrivate sink_key;
static gint
do_poll (GPollFD *ufds, guint nfsd, gint timeout_)
{
gint res;
GstPulsevideoSink *this = g_private_get (&sink_key);
g_mutex_unlock (&this->lock);
res = this->poll_func (ufds, nfsd, timeout_);
g_mutex_lock (&this->lock);
return res;
}
static gpointer
handle_mainloop (GstPulsevideoSink *this)
{
g_mutex_lock (&this->lock);
g_private_set (&sink_key, this);
this->poll_func = g_main_context_get_poll_func (this->context);
g_main_context_set_poll_func (this->context, do_poll);
g_main_context_push_thread_default (this->context);
g_print ("run mainloop\n");
g_main_loop_run (this->loop);
g_print ("quit mainloop\n");
g_main_context_pop_thread_default (this->context);
g_mutex_unlock (&this->lock);
return NULL;
}
@ -379,11 +400,9 @@ on_state_notify (GObject *gobject,
GstPulsevideoSink *pvsink = user_data;
PvContextState state;
g_mutex_lock (&pvsink->lock);
state = pv_context_get_state (pvsink->ctx);
g_print ("got context state %d\n", state);
g_cond_broadcast (&pvsink->cond);
g_mutex_unlock (&pvsink->lock);
if (state == PV_CONTEXT_STATE_ERROR) {
GST_ELEMENT_ERROR (pvsink, RESOURCE, FAILED,
@ -395,12 +414,12 @@ on_state_notify (GObject *gobject,
static gboolean
gst_pulsevideo_sink_open (GstPulsevideoSink * pvsink)
{
g_mutex_lock (&pvsink->lock);
pvsink->ctx = pv_context_new (pvsink->context, "test-client", NULL);
g_signal_connect (pvsink->ctx, "notify::state", (GCallback) on_state_notify, pvsink);
pv_context_connect(pvsink->ctx, PV_CONTEXT_FLAGS_NONE);
g_mutex_lock (&pvsink->lock);
while (TRUE) {
PvContextState state = pv_context_get_state (pvsink->ctx);
@ -427,13 +446,14 @@ connect_error:
static gboolean
gst_pulsevideo_sink_close (GstPulsevideoSink * pvsink)
{
g_mutex_lock (&pvsink->lock);
if (pvsink->stream) {
pv_stream_disconnect (pvsink->stream);
}
if (pvsink->ctx) {
pv_context_disconnect(pvsink->ctx);
g_mutex_lock (&pvsink->lock);
while (TRUE) {
PvContextState state = pv_context_get_state (pvsink->ctx);
@ -445,8 +465,8 @@ gst_pulsevideo_sink_close (GstPulsevideoSink * pvsink)
g_cond_wait (&pvsink->cond, &pvsink->lock);
}
g_mutex_unlock (&pvsink->lock);
}
g_mutex_unlock (&pvsink->lock);
return TRUE;
}