diff --git a/src/Makefile.am b/src/Makefile.am index 22a4be67f..2c2a75b4b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -155,8 +155,7 @@ pulsevideoinclude_HEADERS = \ client/pv-enumtypes.h \ client/pv-introspect.h \ client/pv-stream.h \ - client/pv-subscribe.h \ - dbus/org-pulsevideo.h + client/pv-subscribe.h lib_LTLIBRARIES = \ libpulsevideo-@PV_MAJORMINOR@.la @@ -169,7 +168,6 @@ libpulsevideo_@PV_MAJORMINOR@_la_SOURCES = \ client/pv-stream.h client/pv-stream.c \ client/pulsevideo.c client/pulsevideo.h \ client/pv-subscribe.c client/pv-subscribe.h \ - dbus/org-pulsevideo.c \ $(pulsevideogstsource) @@ -191,11 +189,13 @@ libpulsevideocore_@PV_MAJORMINOR@_la_SOURCES = \ server/pv-client-source.c server/pv-client-source.h \ server/pv-source-output.c server/pv-source-output.h \ modules/gst/pv-gst-manager.c \ - modules/gst/pv-gst-source.c + modules/gst/pv-gst-source.c \ + dbus/org-pulsevideo.c dbus/org-pulsevideo.h libpulsevideocore_@PV_MAJORMINOR@_la_CFLAGS = $(AM_CFLAGS) $(SERVER_CFLAGS) libpulsevideocore_@PV_MAJORMINOR@_la_LDFLAGS = $(AM_LDFLAGS) -avoid-version -libpulsevideocore_@PV_MAJORMINOR@_la_LIBADD = $(AM_LIBADD) $(LIBLTDL) $(LTLIBICONV) +libpulsevideocore_@PV_MAJORMINOR@_la_LIBADD = $(AM_LIBADD) $(LIBLTDL) $(LTLIBICONV) \ + libpulsevideo-@PV_MAJORMINOR@.la ################################### # GStreamer Plugin # diff --git a/src/client/pv-context.c b/src/client/pv-context.c index 79cfc7087..32450b4db 100644 --- a/src/client/pv-context.c +++ b/src/client/pv-context.c @@ -302,12 +302,22 @@ pv_context_new (GMainContext *context, const gchar *name, GVariant *properties) return g_object_new (PV_TYPE_CONTEXT, "main-context", context, "name", name, "properties", properties, NULL); } +static gboolean +do_notify_state (PvContext *context) +{ + g_object_notify (G_OBJECT (context), "state"); + g_object_unref (context); + return FALSE; +} + static void context_set_state (PvContext *context, PvContextState state) { if (context->priv->state != state) { context->priv->state = state; - g_object_notify (G_OBJECT (context), "state"); + g_main_context_invoke (context->priv->context, + (GSourceFunc) do_notify_state, + g_object_ref (context)); } } static void @@ -333,7 +343,7 @@ client_failed: { priv->error = error; context_set_state (context, PV_STREAM_STATE_ERROR); - g_error ("failed to get client proxy: %s", error->message); + g_warning ("failed to get client proxy: %s", error->message); return; } } @@ -351,7 +361,7 @@ on_client_connected (GObject *source_object, ret = g_dbus_proxy_call_finish (priv->daemon, res, &error); if (ret == NULL) { - g_error ("failed to connect client: %s", error->message); + g_warning ("failed to connect client: %s", error->message); priv->error = error; context_set_state (context, PV_CONTEXT_STATE_ERROR); return; @@ -554,7 +564,7 @@ on_client_disconnected (GObject *source_object, ret = g_dbus_proxy_call_finish (priv->client, res, &error); if (ret == NULL) { - g_error ("failed to disconnect client: %s", error->message); + g_warning ("failed to disconnect client: %s", error->message); priv->error = error; context_set_state (context, PV_CONTEXT_STATE_ERROR); g_object_unref (context); diff --git a/src/client/pv-stream.c b/src/client/pv-stream.c index e5db0a122..58569d604 100644 --- a/src/client/pv-stream.c +++ b/src/client/pv-stream.c @@ -154,12 +154,22 @@ pv_stream_set_property (GObject *_object, } } +static gboolean +do_notify_state (PvStream *stream) +{ + g_object_notify (G_OBJECT (stream), "state"); + g_object_unref (stream); + return FALSE; +} + static void stream_set_state (PvStream *stream, PvStreamState state) { if (stream->priv->state != state) { stream->priv->state = state; - g_object_notify (G_OBJECT (stream), "state"); + g_main_context_invoke (stream->priv->context->priv->context, + (GSourceFunc) do_notify_state, + g_object_ref (stream)); } } @@ -461,7 +471,7 @@ source_output_failed: { priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_error ("failed to get source output proxy: %s", error->message); + g_warning ("failed to get source output proxy: %s", error->message); g_object_unref (stream); return; } @@ -687,7 +697,7 @@ pv_stream_disconnect (PvStream *stream) g_return_val_if_fail (priv->state >= PV_STREAM_STATE_READY, FALSE); g_return_val_if_fail (priv->source_output != NULL, FALSE); context = priv->context; - g_return_val_if_fail (pv_context_get_state (context) == PV_CONTEXT_STATE_READY, FALSE); + g_return_val_if_fail (pv_context_get_state (context) >= PV_CONTEXT_STATE_READY, FALSE); g_main_context_invoke (context->priv->context, (GSourceFunc) do_disconnect, @@ -795,7 +805,7 @@ socket_failed: { priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_error ("failed to create socket: %s", error->message); + g_warning ("failed to create socket: %s", error->message); return; } } @@ -868,12 +878,12 @@ on_stream_started (GObject *source_object, /* ERRORS */ start_failed: { - g_error ("failed to start: %s", error->message); + g_warning ("failed to start: %s", error->message); goto exit_error; } fd_failed: { - g_error ("failed to get FD: %s", error->message); + g_warning ("failed to get FD: %s", error->message); goto exit_error; } exit_error: @@ -966,7 +976,7 @@ call_failed: { priv->error = error; stream_set_state (stream, PV_STREAM_STATE_ERROR); - g_error ("failed to release: %s", error->message); + g_warning ("failed to release: %s", error->message); return; } } @@ -1082,11 +1092,24 @@ pv_stream_provide_buffer (PvStream *stream, PvBufferInfo *info) flags, NULL, &error); + if (info->message) { + g_object_unref (info->message); + info->message = NULL; + } + + if (len == -1) + goto send_error; + g_assert (len == sizeof (msg)); - if (info->message) - g_object_unref (info->message); - return TRUE; + +send_error: + { + priv->error = error; + stream_set_state (stream, PV_STREAM_STATE_ERROR); + g_warning ("failed to send_message: %s", error->message); + return FALSE; + } } diff --git a/src/gst/gstpvsink.c b/src/gst/gstpvsink.c index 5f768932e..4800790c1 100644 --- a/src/gst/gstpvsink.c +++ b/src/gst/gstpvsink.c @@ -192,9 +192,7 @@ on_new_buffer (GObject *gobject, { GstPulsevideoSink *pvsink = user_data; - g_mutex_lock (&pvsink->lock); g_cond_signal (&pvsink->cond); - g_mutex_unlock (&pvsink->lock); } static void @@ -205,23 +203,21 @@ on_stream_notify (GObject *gobject, PvStreamState state; GstPulsevideoSink *pvsink = user_data; - g_mutex_lock (&pvsink->lock); state = pv_stream_get_state (pvsink->stream); g_print ("got stream state %d\n", state); + g_cond_broadcast (&pvsink->cond); + if (state == PV_STREAM_STATE_ERROR) { GST_ELEMENT_ERROR (pvsink, RESOURCE, FAILED, ("Failed to connect stream: %s", pv_stream_get_error (pvsink->stream)->message), (NULL)); } - g_cond_broadcast (&pvsink->cond); - g_mutex_unlock (&pvsink->lock); } static GstCaps * gst_pulsevideo_sink_getcaps (GstBaseSink * bsink, GstCaps * filter) { return GST_BASE_SINK_CLASS (parent_class)->get_caps (bsink, filter); - } static gboolean @@ -233,16 +229,16 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) pvsink = GST_PULSEVIDEO_SINK (bsink); + str = gst_caps_to_string (caps); + format = g_bytes_new_take (str, strlen (str) + 1); + + g_mutex_lock (&pvsink->lock); pvsink->stream = pv_stream_new (pvsink->ctx, "test", NULL); g_signal_connect (pvsink->stream, "notify::state", (GCallback) on_stream_notify, pvsink); g_signal_connect (pvsink->stream, "new-buffer", (GCallback) on_new_buffer, pvsink); - str = gst_caps_to_string (caps); - format = g_bytes_new_take (str, strlen (str) + 1); - pv_stream_connect_provide (pvsink->stream, 0, format); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvStreamState state = pv_stream_get_state (pvsink->stream); @@ -254,11 +250,9 @@ gst_pulsevideo_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) g_cond_wait (&pvsink->cond, &pvsink->lock); } - g_mutex_unlock (&pvsink->lock); pv_stream_start (pvsink->stream, format, PV_STREAM_MODE_BUFFER); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvStreamState state = pv_stream_get_state (pvsink->stream); @@ -315,7 +309,7 @@ gst_pulsevideo_sink_render (GstBaseSink * bsink, GstBuffer * buffer) mem = gst_allocator_alloc (pvsink->allocator, info.size, ¶ms); if (!gst_memory_map (mem, &minfo, GST_MAP_WRITE)) - goto error; + goto map_error; gst_buffer_extract (buffer, 0, minfo.data, info.size); gst_memory_unmap (mem, &minfo); } @@ -324,6 +318,8 @@ gst_pulsevideo_sink_render (GstBaseSink * bsink, GstBuffer * buffer) info.message = mesg; g_mutex_lock (&pvsink->lock); + if (pv_stream_get_state (pvsink->stream) != PV_STREAM_STATE_STREAMING) + goto streaming_error; pv_stream_provide_buffer (pvsink->stream, &info); g_mutex_unlock (&pvsink->lock); @@ -333,10 +329,15 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } -error: +map_error: { return GST_FLOW_ERROR; } +streaming_error: + { + g_mutex_unlock (&pvsink->lock); + return GST_FLOW_ERROR; + } } static gboolean @@ -359,14 +360,34 @@ gst_pulsevideo_sink_stop (GstBaseSink * basesink) return TRUE; } +static GPrivate sink_key; + +static gint +do_poll (GPollFD *ufds, guint nfsd, gint timeout_) +{ + gint res; + GstPulsevideoSink *this = g_private_get (&sink_key); + + g_mutex_unlock (&this->lock); + res = this->poll_func (ufds, nfsd, timeout_); + g_mutex_lock (&this->lock); + + return res; +} + static gpointer handle_mainloop (GstPulsevideoSink *this) { + g_mutex_lock (&this->lock); + g_private_set (&sink_key, this); + this->poll_func = g_main_context_get_poll_func (this->context); + g_main_context_set_poll_func (this->context, do_poll); g_main_context_push_thread_default (this->context); g_print ("run mainloop\n"); g_main_loop_run (this->loop); g_print ("quit mainloop\n"); g_main_context_pop_thread_default (this->context); + g_mutex_unlock (&this->lock); return NULL; } @@ -379,11 +400,9 @@ on_state_notify (GObject *gobject, GstPulsevideoSink *pvsink = user_data; PvContextState state; - g_mutex_lock (&pvsink->lock); state = pv_context_get_state (pvsink->ctx); g_print ("got context state %d\n", state); g_cond_broadcast (&pvsink->cond); - g_mutex_unlock (&pvsink->lock); if (state == PV_CONTEXT_STATE_ERROR) { GST_ELEMENT_ERROR (pvsink, RESOURCE, FAILED, @@ -395,12 +414,12 @@ on_state_notify (GObject *gobject, static gboolean gst_pulsevideo_sink_open (GstPulsevideoSink * pvsink) { + g_mutex_lock (&pvsink->lock); pvsink->ctx = pv_context_new (pvsink->context, "test-client", NULL); g_signal_connect (pvsink->ctx, "notify::state", (GCallback) on_state_notify, pvsink); pv_context_connect(pvsink->ctx, PV_CONTEXT_FLAGS_NONE); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvContextState state = pv_context_get_state (pvsink->ctx); @@ -427,13 +446,14 @@ connect_error: static gboolean gst_pulsevideo_sink_close (GstPulsevideoSink * pvsink) { + + g_mutex_lock (&pvsink->lock); if (pvsink->stream) { pv_stream_disconnect (pvsink->stream); } if (pvsink->ctx) { pv_context_disconnect(pvsink->ctx); - g_mutex_lock (&pvsink->lock); while (TRUE) { PvContextState state = pv_context_get_state (pvsink->ctx); @@ -445,8 +465,8 @@ gst_pulsevideo_sink_close (GstPulsevideoSink * pvsink) g_cond_wait (&pvsink->cond, &pvsink->lock); } - g_mutex_unlock (&pvsink->lock); } + g_mutex_unlock (&pvsink->lock); return TRUE; } diff --git a/src/gst/gstpvsink.h b/src/gst/gstpvsink.h index 955b5c6fe..f5e6c5faf 100644 --- a/src/gst/gstpvsink.h +++ b/src/gst/gstpvsink.h @@ -65,6 +65,7 @@ struct _GstPulsevideoSink { PvStream *stream; GstAllocator *allocator; + GPollFunc poll_func; GMutex lock; GCond cond; }; diff --git a/src/gst/gstpvsrc.c b/src/gst/gstpvsrc.c index 30da4a210..95eff7061 100644 --- a/src/gst/gstpvsrc.c +++ b/src/gst/gstpvsrc.c @@ -183,6 +183,7 @@ gst_pulsevideo_src_init (GstPulsevideoSrc * src) /* we operate in time */ gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME); gst_base_src_set_live (GST_BASE_SRC (src), TRUE); + gst_base_src_set_do_timestamp (GST_BASE_SRC (src), TRUE); src->fd_allocator = gst_fd_allocator_new (); g_mutex_init (&src->lock); @@ -232,9 +233,7 @@ on_new_buffer (GObject *gobject, { GstPulsevideoSrc *pvsrc = user_data; - g_mutex_lock (&pvsrc->lock); g_cond_signal (&pvsrc->cond); - g_mutex_unlock (&pvsrc->lock); } static void @@ -245,16 +244,15 @@ on_stream_notify (GObject *gobject, PvStreamState state; GstPulsevideoSrc *pvsrc = user_data; - g_mutex_lock (&pvsrc->lock); state = pv_stream_get_state (pvsrc->stream); g_print ("got stream state %d\n", state); + g_cond_broadcast (&pvsrc->cond); + if (state == PV_STREAM_STATE_ERROR) { GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, ("Failed to connect stream: %s", pv_stream_get_error (pvsrc->stream)->message), (NULL)); } - g_cond_broadcast (&pvsrc->cond); - g_mutex_unlock (&pvsrc->lock); } static gboolean @@ -295,9 +293,10 @@ gst_pulsevideo_src_negotiate (GstBaseSrc * basesrc) /* open a connection with these caps */ str = gst_caps_to_string (caps); accepted = g_bytes_new_take (str, strlen (str) + 1); - pv_stream_connect_capture (pvsrc->stream, pvsrc->source, 0, accepted); g_mutex_lock (&pvsrc->lock); + pv_stream_connect_capture (pvsrc->stream, pvsrc->source, 0, accepted); + while (TRUE) { PvStreamState state = pv_stream_get_state (pvsrc->stream); @@ -379,13 +378,18 @@ gst_pulsevideo_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps) GstPulsevideoSrc *pvsrc; gchar *str; GBytes *format; + gboolean res; pvsrc = GST_PULSEVIDEO_SRC (bsrc); str = gst_caps_to_string (caps); format = g_bytes_new_take (str, strlen (str) + 1); - return pv_stream_start (pvsrc->stream, format, PV_STREAM_MODE_BUFFER); + g_mutex_lock (&pvsrc->lock); + res = pv_stream_start (pvsrc->stream, format, PV_STREAM_MODE_BUFFER); + g_mutex_unlock (&pvsrc->lock); + + return res; } static GstFlowReturn @@ -393,17 +397,26 @@ gst_pulsevideo_src_create (GstPushSrc * psrc, GstBuffer ** buffer) { GstPulsevideoSrc *pvsrc; PvBufferInfo info; + gint *fds, n_fds; + GstMemory *fdmem = NULL; pvsrc = GST_PULSEVIDEO_SRC (psrc); if (!pvsrc->negotiated) goto not_negotiated; +again: g_mutex_lock (&pvsrc->lock); while (TRUE) { + PvStreamState state; + g_cond_wait (&pvsrc->cond, &pvsrc->lock); - if (pv_stream_get_state (pvsrc->stream) != PV_STREAM_STATE_STREAMING) + state = pv_stream_get_state (pvsrc->stream); + if (state == PV_STREAM_STATE_ERROR) + goto streaming_error; + + if (state != PV_STREAM_STATE_STREAMING) goto streaming_stopped; pv_stream_capture_buffer (pvsrc->stream, &info); @@ -412,20 +425,19 @@ gst_pulsevideo_src_create (GstPushSrc * psrc, GstBuffer ** buffer) } g_mutex_unlock (&pvsrc->lock); + if (g_socket_control_message_get_msg_type (info.message) != SCM_RIGHTS) + goto again; + + fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (info.message), &n_fds); + if (n_fds < 1 || fds[0] < 0) + goto again; + + fdmem = gst_fd_allocator_alloc (pvsrc->fd_allocator, fds[0], + info.offset + info.size, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (fdmem, info.offset, info.size); + *buffer = gst_buffer_new (); - - if (g_socket_control_message_get_msg_type (info.message) == SCM_RIGHTS) { - gint *fds, n_fds; - GstMemory *fdmem = NULL; - - fds = g_unix_fd_message_steal_fds (G_UNIX_FD_MESSAGE (info.message), &n_fds); - - fdmem = gst_fd_allocator_alloc (pvsrc->fd_allocator, fds[0], - info.offset + info.size, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (fdmem, info.offset, info.size); - - gst_buffer_append_memory (*buffer, fdmem); - } + gst_buffer_append_memory (*buffer, fdmem); return GST_FLOW_OK; @@ -433,6 +445,11 @@ not_negotiated: { return GST_FLOW_NOT_NEGOTIATED; } +streaming_error: + { + g_mutex_unlock (&pvsrc->lock); + return GST_FLOW_ERROR; + } streaming_stopped: { g_mutex_unlock (&pvsrc->lock); @@ -452,14 +469,34 @@ gst_pulsevideo_src_stop (GstBaseSrc * basesrc) return TRUE; } +static GPrivate src_key; + +static gint +do_poll (GPollFD *ufds, guint nfsd, gint timeout_) +{ + gint res; + GstPulsevideoSrc *this = g_private_get (&src_key); + + g_mutex_unlock (&this->lock); + res = this->poll_func (ufds, nfsd, timeout_); + g_mutex_lock (&this->lock); + + return res; +} + static gpointer handle_mainloop (GstPulsevideoSrc *this) { + g_mutex_lock (&this->lock); + g_private_set (&src_key, this); + this->poll_func = g_main_context_get_poll_func (this->context); + g_main_context_set_poll_func (this->context, do_poll); g_main_context_push_thread_default (this->context); g_print ("run mainloop\n"); g_main_loop_run (this->loop); g_print ("quit mainloop\n"); g_main_context_pop_thread_default (this->context); + g_mutex_unlock (&this->lock); return NULL; } @@ -472,11 +509,9 @@ on_state_notify (GObject *gobject, GstPulsevideoSrc *pvsrc = user_data; PvContextState state; - g_mutex_lock (&pvsrc->lock); state = pv_context_get_state (pvsrc->ctx); g_print ("got context state %d\n", state); g_cond_broadcast (&pvsrc->cond); - g_mutex_unlock (&pvsrc->lock); if (state == PV_CONTEXT_STATE_ERROR) { GST_ELEMENT_ERROR (pvsrc, RESOURCE, FAILED, @@ -489,12 +524,12 @@ static gboolean gst_pulsevideo_src_open (GstPulsevideoSrc * pvsrc) { + g_mutex_lock (&pvsrc->lock); pvsrc->ctx = pv_context_new (pvsrc->context, "test-client", NULL); g_signal_connect (pvsrc->ctx, "notify::state", (GCallback) on_state_notify, pvsrc); pv_context_connect(pvsrc->ctx, PV_CONTEXT_FLAGS_NONE); - g_mutex_lock (&pvsrc->lock); while (TRUE) { PvContextState state = pv_context_get_state (pvsrc->ctx); @@ -506,11 +541,11 @@ gst_pulsevideo_src_open (GstPulsevideoSrc * pvsrc) g_cond_wait (&pvsrc->cond, &pvsrc->lock); } - g_mutex_unlock (&pvsrc->lock); pvsrc->stream = pv_stream_new (pvsrc->ctx, "test", NULL); g_signal_connect (pvsrc->stream, "notify::state", (GCallback) on_stream_notify, pvsrc); g_signal_connect (pvsrc->stream, "new-buffer", (GCallback) on_new_buffer, pvsrc); + g_mutex_unlock (&pvsrc->lock); return TRUE; diff --git a/src/gst/gstpvsrc.h b/src/gst/gstpvsrc.h index 93ad30e6d..01a196d46 100644 --- a/src/gst/gstpvsrc.h +++ b/src/gst/gstpvsrc.h @@ -65,6 +65,7 @@ struct _GstPulsevideoSrc { PvStream *stream; GstAllocator *fd_allocator; + GPollFunc poll_func; GMutex lock; GCond cond; };