diff --git a/src/gst/gstpipewirecore.c b/src/gst/gstpipewirecore.c new file mode 100644 index 000000000..b01cd8a5c --- /dev/null +++ b/src/gst/gstpipewirecore.c @@ -0,0 +1,191 @@ +/* GStreamer + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include + +#include + +#include "gstpipewirecore.h" + +/* a list of global cores indexed by fd. */ +G_LOCK_DEFINE_STATIC (cores_lock); +static GList *cores; + +static void +on_core_error(void *data, uint32_t id, int seq, int res, const char *message) +{ + GstPipeWireCore *core = data; + + pw_log_error("error id:%u seq:%d res:%d (%s): %s", + id, seq, res, spa_strerror(res), message); + + if (id == PW_ID_CORE) { + core->last_error = res; + } + pw_thread_loop_signal(core->loop, FALSE); +} + +static void on_core_done (void *object, uint32_t id, int seq) +{ + GstPipeWireCore * core = object; + if (id == PW_ID_CORE) { + core->last_seq = seq; + pw_thread_loop_signal (core->loop, FALSE); + } +} + +static const struct pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = on_core_done, + .error = on_core_error, +}; + +static GstPipeWireCore *make_core (int fd) +{ + GstPipeWireCore *core; + + core = g_new (GstPipeWireCore, 1); + core->refcount = 1; + core->fd = fd; + core->loop = pw_thread_loop_new ("pipewire-main-loop", NULL); + core->context = pw_context_new (pw_thread_loop_get_loop(core->loop), NULL, 0); + GST_DEBUG ("loop %p context %p", core->loop, core->context); + + if (pw_thread_loop_start (core->loop) < 0) + goto mainloop_failed; + + pw_thread_loop_lock (core->loop); + + if (fd == -1) + core->core = pw_context_connect (core->context, NULL, 0); + else + core->core = pw_context_connect_fd (core->context, dup(fd), NULL, 0); + + if (core->core == NULL) + goto connection_failed; + + pw_core_add_listener(core->core, + &core->core_listener, + &core_events, + core); + + pw_thread_loop_unlock (core->loop); + + return core; + +mainloop_failed: + { + GST_ERROR ("error starting mainloop"); + return NULL; + } +connection_failed: + { + GST_ERROR ("error connect: %m"); + pw_thread_loop_unlock (core->loop); + return NULL; + } +} + +typedef struct { + int fd; +} FindData; + +static gint +core_find (GstPipeWireCore * core, FindData * data) +{ + /* fd's must match */ + if (core->fd == data->fd) + return 0; + return 1; +} + +GstPipeWireCore *gst_pipewire_core_get (int fd) +{ + GstPipeWireCore *core; + FindData data; + GList *found; + + data.fd = fd; + + G_LOCK (cores_lock); + found = g_list_find_custom (cores, &data, (GCompareFunc) core_find); + if (found != NULL) { + core = (GstPipeWireCore *) found->data; + core->refcount++; + GST_DEBUG ("found core %p", core); + } else { + core = make_core(fd); + if (core != NULL) { + GST_DEBUG ("created core %p", core); + /* add to list on success */ + cores = g_list_prepend (cores, core); + } else { + GST_WARNING ("could not create core"); + } + } + G_UNLOCK (cores_lock); + + return core; +} + +static void do_sync(GstPipeWireCore * core) +{ + core->pending_seq = pw_core_sync(core->core, 0, core->pending_seq); + while (true) { + if (core->last_seq == core->pending_seq || core->last_error < 0) + break; + pw_thread_loop_wait (core->loop); + } +} + +void gst_pipewire_core_release (GstPipeWireCore *core) +{ + gboolean zero; + + G_LOCK (cores_lock); + core->refcount--; + if ((zero = (core->refcount == 0))) { + GST_DEBUG ("closing core %p", core); + /* remove from list, we can release the mutex after removing the connection + * from the list because after that, nobody can access the connection anymore. */ + cores = g_list_remove (cores, core); + } + G_UNLOCK (cores_lock); + + if (zero) { + pw_thread_loop_lock (core->loop); + do_sync(core); + + pw_core_disconnect (core->core); + pw_thread_loop_unlock (core->loop); + pw_thread_loop_stop (core->loop); + pw_context_destroy (core->context); + pw_thread_loop_destroy (core->loop); + + free(core); + } +} diff --git a/src/gst/gstpipewirecore.h b/src/gst/gstpipewirecore.h new file mode 100644 index 000000000..90b6bd38b --- /dev/null +++ b/src/gst/gstpipewirecore.h @@ -0,0 +1,58 @@ +/* GStreamer + * + * Copyright © 2020 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef __GST_PIPEWIRE_CORE_H__ +#define __GST_PIPEWIRE_CORE_H__ + +#include + +#include + +G_BEGIN_DECLS + +typedef struct _GstPipeWireCore GstPipeWireCore; + +/** + * GstPipeWireCore: + * + * Opaque data structure. + */ +struct _GstPipeWireCore { + gint refcount; + int fd; + struct pw_thread_loop *loop; + struct pw_context *context; + struct pw_core *core; + struct spa_hook core_listener; + int last_error; + int last_seq; + int pending_seq; +}; + +GstPipeWireCore *gst_pipewire_core_get (int fd); +void gst_pipewire_core_release (GstPipeWireCore *core); + +G_END_DECLS + +#endif /* __GST_PIPEWIRE_CORE_H__ */ diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 521909bff..629806ec1 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -119,9 +119,6 @@ gst_pipewire_sink_finalize (GObject * object) g_object_unref (pwsink->pool); - pw_thread_loop_destroy (pwsink->loop); - pwsink->loop = NULL; - if (pwsink->properties) gst_structure_free (pwsink->properties); g_free (pwsink->path); @@ -261,9 +258,9 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header))); - pw_thread_loop_lock (sink->loop); + pw_thread_loop_lock (sink->core->loop); pw_stream_update_params (sink->stream, port_params, 2); - pw_thread_loop_unlock (sink->loop); + pw_thread_loop_unlock (sink->core->loop); } static void @@ -275,10 +272,6 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) sink->fd = -1; g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink); - - sink->loop = pw_thread_loop_new ("pipewire-sink-loop", NULL); - sink->context = pw_context_new (pw_thread_loop_get_loop(sink->loop), NULL, 0); - GST_DEBUG ("loop %p context %p", sink->loop, sink->context); } static GstCaps * @@ -469,7 +462,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta ("stream error: %s", error), (NULL)); break; } - pw_thread_loop_signal (pwsink->loop, FALSE); + pw_thread_loop_signal (pwsink->core->loop, FALSE); } static void @@ -501,7 +494,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); - pw_thread_loop_lock (pwsink->loop); + pw_thread_loop_lock (pwsink->core->loop); state = pw_stream_get_state (pwsink->stream, &error); if (state == PW_STREAM_STATE_ERROR) @@ -531,7 +524,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (state == PW_STREAM_STATE_ERROR) goto start_error; - pw_thread_loop_wait (pwsink->loop); + pw_thread_loop_wait (pwsink->core->loop); } } res = TRUE; @@ -541,7 +534,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->pool), config); - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); pwsink->negotiated = res; @@ -550,7 +543,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) start_error: { GST_ERROR ("could not start stream: %s", error); - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); g_ptr_array_unref (possible); return FALSE; } @@ -568,7 +561,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) if (!pwsink->negotiated) goto not_negotiated; - pw_thread_loop_lock (pwsink->loop); + pw_thread_loop_lock (pwsink->core->loop); if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING) goto done_unlock; @@ -576,7 +569,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GstBuffer *b = NULL; GstMapInfo info = { 0, }; - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); if (!gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool))) gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pwsink->pool), TRUE); @@ -590,7 +583,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_resize (b, 0, gst_buffer_get_size (buffer)); buffer = b; - pw_thread_loop_lock (pwsink->loop); + pw_thread_loop_lock (pwsink->core->loop); if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING) goto done_unlock; } @@ -599,7 +592,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) do_send_buffer (pwsink, buffer); done_unlock: - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); done: return res; @@ -647,8 +640,8 @@ gst_pipewire_sink_start (GstBaseSink * basesink) props = NULL; } - pw_thread_loop_lock (pwsink->loop); - if ((pwsink->stream = pw_stream_new (pwsink->core, pwsink->client_name, props)) == NULL) + pw_thread_loop_lock (pwsink->core->loop); + if ((pwsink->stream = pw_stream_new (pwsink->core->core, pwsink->client_name, props)) == NULL) goto no_stream; pwsink->pool->stream = pwsink->stream; @@ -658,14 +651,14 @@ gst_pipewire_sink_start (GstBaseSink * basesink) &stream_events, pwsink); - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); return TRUE; no_stream: { GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, ("can't create stream"), (NULL)); - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); return FALSE; } } @@ -675,81 +668,33 @@ gst_pipewire_sink_stop (GstBaseSink * basesink) { GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink); - pw_thread_loop_lock (pwsink->loop); + pw_thread_loop_lock (pwsink->core->loop); if (pwsink->stream) { - pw_stream_disconnect (pwsink->stream); pw_stream_destroy (pwsink->stream); pwsink->stream = NULL; pwsink->pool->stream = NULL; } - pw_thread_loop_unlock (pwsink->loop); + pw_thread_loop_unlock (pwsink->core->loop); pwsink->negotiated = FALSE; return TRUE; } -static void on_core_done (void *object, uint32_t id, int seq) -{ - GstPipeWireSink * pwsink = object; - if (id == PW_ID_CORE) { - pwsink->last_seq = seq; - pw_thread_loop_signal (pwsink->loop, FALSE); - } -} - -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .done = on_core_done, -}; - -static void do_sync(GstPipeWireSink * pwsink) -{ - pwsink->pending_seq = pw_core_sync(pwsink->core, 0, pwsink->pending_seq); - while (true) { - if (pwsink->last_seq == pwsink->pending_seq || pwsink->last_error < 0) - break; - pw_thread_loop_wait (pwsink->loop); - } -} - static gboolean gst_pipewire_sink_open (GstPipeWireSink * pwsink) { - if (pw_thread_loop_start (pwsink->loop) < 0) - goto mainloop_error; - - pw_thread_loop_lock (pwsink->loop); - - if (pwsink->fd == -1) - pwsink->core = pw_context_connect (pwsink->context, NULL, 0); - else - pwsink->core = pw_context_connect_fd (pwsink->context, dup(pwsink->fd), NULL, 0); - + pwsink->core = gst_pipewire_core_get(pwsink->fd); if (pwsink->core == NULL) - goto connect_error; - - pw_core_add_listener(pwsink->core, - &pwsink->core_listener, - &core_events, - pwsink); - - pw_thread_loop_unlock (pwsink->loop); + goto connect_error; return TRUE; /* ERRORS */ -mainloop_error: - { - GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, - ("Failed to start mainloop"), (NULL)); - return FALSE; - } connect_error: { GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED, ("Failed to connect"), (NULL)); - pw_thread_loop_unlock (pwsink->loop); return FALSE; } } @@ -757,20 +702,17 @@ connect_error: static gboolean gst_pipewire_sink_close (GstPipeWireSink * pwsink) { - pw_thread_loop_lock (pwsink->loop); + pw_thread_loop_lock (pwsink->core->loop); if (pwsink->stream) { pw_stream_destroy (pwsink->stream); pwsink->stream = NULL; } + pw_thread_loop_unlock (pwsink->core->loop); + if (pwsink->core) { - do_sync(pwsink); - pw_core_disconnect (pwsink->core); + gst_pipewire_core_release (pwsink->core); pwsink->core = NULL; } - pw_thread_loop_unlock (pwsink->loop); - - pw_thread_loop_stop (pwsink->loop); - return TRUE; } @@ -789,11 +731,15 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* uncork and start play */ + pw_thread_loop_lock (this->core->loop); pw_stream_set_active(this->stream, true); + pw_thread_loop_unlock (this->core->loop); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* stop play ASAP by corking */ + pw_thread_loop_lock (this->core->loop); pw_stream_set_active(this->stream, false); + pw_thread_loop_unlock (this->core->loop); break; default: break; diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 88b210d5d..a93fd77df 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -30,6 +30,7 @@ #include #include +#include G_BEGIN_DECLS @@ -83,14 +84,8 @@ struct _GstPipeWireSink { /* video state */ gboolean negotiated; - struct pw_thread_loop *loop; - - struct pw_context *context; - struct pw_core *core; + GstPipeWireCore *core; struct spa_hook core_listener; - int pending_seq; - int last_seq; - int last_error; struct pw_stream *stream; struct spa_hook stream_listener; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index a8c62ad3b..51c23afbb 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -231,11 +231,6 @@ gst_pipewire_src_finalize (GObject * object) { GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (object); - pw_context_destroy (pwsrc->context); - pwsrc->context = NULL; - pw_thread_loop_destroy (pwsrc->loop); - pwsrc->loop = NULL; - if (pwsrc->properties) gst_structure_free (pwsrc->properties); if (pwsrc->clock) @@ -380,10 +375,6 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->client_name = g_strdup(pw_get_client_name ()); src->pool = gst_pipewire_pool_new (); - src->loop = pw_thread_loop_new ("pipewire-main-loop", NULL); - src->context = pw_context_new (pw_thread_loop_get_loop(src->loop), NULL, 0); - GST_DEBUG ("loop %p context %p", src->loop, src->context); - } static gboolean @@ -400,10 +391,10 @@ buffer_recycle (GstMiniObject *obj) data->queued = TRUE; GST_LOG_OBJECT (obj, "recycle buffer"); - pw_thread_loop_lock (src->loop); + pw_thread_loop_lock (src->core->loop); if (src->stream) pw_stream_queue_buffer (src->stream, data->b); - pw_thread_loop_unlock (src->loop); + pw_thread_loop_unlock (src->core->loop); return FALSE; } @@ -488,7 +479,7 @@ static void on_process (void *_data) { GstPipeWireSrc *pwsrc = _data; - pw_thread_loop_signal (pwsrc->loop, FALSE); + pw_thread_loop_signal (pwsrc->core->loop, FALSE); } static void @@ -511,7 +502,7 @@ on_state_changed (void *data, ("stream error: %s", error), (NULL)); break; } - pw_thread_loop_signal (pwsrc->loop, FALSE); + pw_thread_loop_signal (pwsrc->core->loop, FALSE); } static void @@ -540,7 +531,7 @@ static gboolean gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) { const char *error = NULL; - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); GST_DEBUG_OBJECT (pwsrc, "doing stream start"); while (TRUE) { enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); @@ -552,21 +543,21 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc) if (state == PW_STREAM_STATE_ERROR) goto start_error; - pw_thread_loop_wait (pwsrc->loop); + pw_thread_loop_wait (pwsrc->core->loop); } parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream)); GST_DEBUG_OBJECT (pwsrc, "signal started"); pwsrc->started = TRUE; - pw_thread_loop_signal (pwsrc->loop, FALSE); - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); return TRUE; start_error: { GST_DEBUG_OBJECT (pwsrc, "error starting stream: %s", error); - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return FALSE; } } @@ -577,7 +568,7 @@ wait_negotiated (GstPipeWireSrc *this) enum pw_stream_state state; const char *error = NULL; - pw_thread_loop_lock (this->loop); + pw_thread_loop_lock (this->core->loop); while (TRUE) { state = pw_stream_get_state (this->stream, &error); @@ -590,10 +581,10 @@ wait_negotiated (GstPipeWireSrc *this) if (this->started) break; - pw_thread_loop_wait (this->loop); + pw_thread_loop_wait (this->core->loop); } GST_DEBUG_OBJECT (this, "got started signal"); - pw_thread_loop_unlock (this->loop); + pw_thread_loop_unlock (this->core->loop); return state; } @@ -640,7 +631,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) gst_caps_unref (caps); /* first disconnect */ - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { GST_DEBUG_OBJECT (basesrc, "disconnect capture"); pw_stream_disconnect (pwsrc->stream); @@ -656,7 +647,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) goto connect_error; } - pw_thread_loop_wait (pwsrc->loop); + pw_thread_loop_wait (pwsrc->core->loop); } } @@ -680,9 +671,9 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) if (state == PW_STREAM_STATE_ERROR) goto connect_error; - pw_thread_loop_wait (pwsrc->loop); + pw_thread_loop_wait (pwsrc->core->loop); } - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); result = gst_pipewire_src_stream_start (pwsrc); @@ -717,7 +708,7 @@ no_common_caps: } connect_error: { - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return FALSE; } } @@ -777,11 +768,11 @@ gst_pipewire_src_unlock (GstBaseSrc * basesrc) { GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); GST_DEBUG_OBJECT (pwsrc, "setting flushing"); pwsrc->flushing = TRUE; - pw_thread_loop_signal (pwsrc->loop, FALSE); - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); return TRUE; } @@ -791,10 +782,10 @@ gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc) { GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); GST_DEBUG_OBJECT (pwsrc, "unsetting flushing"); pwsrc->flushing = FALSE; - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return TRUE; } @@ -863,7 +854,7 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) if (!pwsrc->negotiated) goto not_negotiated; - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); while (TRUE) { enum pw_stream_state state; @@ -895,9 +886,9 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) break; } } - pw_thread_loop_wait (pwsrc->loop); + pw_thread_loop_wait (pwsrc->core->loop); } - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); if (pwsrc->always_copy) { *buffer = gst_buffer_copy_deep (buf); @@ -946,17 +937,17 @@ not_negotiated: } streaming_eos: { - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return GST_FLOW_EOS; } streaming_error: { - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return GST_FLOW_ERROR; } streaming_stopped: { - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return GST_FLOW_FLUSHING; } } @@ -974,10 +965,10 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc) pwsrc = GST_PIPEWIRE_SRC (basesrc); - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); pwsrc->eos = false; gst_buffer_replace (&pwsrc->last_buffer, NULL); - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return TRUE; } @@ -1005,52 +996,16 @@ static const struct pw_stream_events stream_events = { .process = on_process, }; -static void on_core_done (void *object, uint32_t id, int seq) -{ - GstPipeWireSrc * pwsrc = object; - if (id == PW_ID_CORE) { - pwsrc->last_seq = seq; - pw_thread_loop_signal (pwsrc->loop, FALSE); - } -} - -static const struct pw_core_events core_events = { - PW_VERSION_CORE_EVENTS, - .done = on_core_done, -}; - -static void do_sync(GstPipeWireSrc * pwsrc) -{ - pwsrc->pending_seq = pw_core_sync(pwsrc->core, 0, pwsrc->pending_seq); - while (true) { - if (pwsrc->last_seq == pwsrc->pending_seq || pwsrc->last_error < 0) - break; - pw_thread_loop_wait (pwsrc->loop); - } -} - static gboolean gst_pipewire_src_open (GstPipeWireSrc * pwsrc) { struct pw_properties *props; - if (pw_thread_loop_start (pwsrc->loop) < 0) - goto mainloop_failed; - - pw_thread_loop_lock (pwsrc->loop); - - if (pwsrc->fd == -1) - pwsrc->core = pw_context_connect (pwsrc->context, NULL, 0); - else - pwsrc->core = pw_context_connect_fd (pwsrc->context, dup(pwsrc->fd), NULL, 0); - + pwsrc->core = gst_pipewire_core_get(pwsrc->fd); if (pwsrc->core == NULL) goto connect_error; - pw_core_add_listener(pwsrc->core, - &pwsrc->core_listener, - &core_events, - pwsrc); + pw_thread_loop_lock (pwsrc->core->loop); if (pwsrc->properties) { props = pw_properties_new (NULL, NULL); @@ -1059,7 +1014,7 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) props = NULL; } - if ((pwsrc->stream = pw_stream_new (pwsrc->core, + if ((pwsrc->stream = pw_stream_new (pwsrc->core->core, pwsrc->client_name, props)) == NULL) goto no_stream; @@ -1070,26 +1025,20 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc) pwsrc); pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time); - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return TRUE; /* ERRORS */ -mainloop_failed: - { - GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("error starting mainloop"), (NULL)); - return FALSE; - } connect_error: { GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't connect"), (NULL)); - pw_thread_loop_unlock (pwsrc->loop); return FALSE; } no_stream: { GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't create stream"), (NULL)); - pw_thread_loop_unlock (pwsrc->loop); + pw_thread_loop_unlock (pwsrc->core->loop); return FALSE; } } @@ -1107,19 +1056,17 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) g_clear_object (&pwsrc->clock); GST_OBJECT_UNLOCK (pwsrc); - pw_thread_loop_lock (pwsrc->loop); + pw_thread_loop_lock (pwsrc->core->loop); if (pwsrc->stream) { pw_stream_destroy (pwsrc->stream); pwsrc->stream = NULL; } + pw_thread_loop_unlock (pwsrc->core->loop); + if (pwsrc->core) { - do_sync(pwsrc); - pw_core_disconnect (pwsrc->core); + gst_pipewire_core_release (pwsrc->core); pwsrc->core = NULL; } - pw_thread_loop_unlock (pwsrc->loop); - - pw_thread_loop_stop (pwsrc->loop); } static gboolean @@ -1131,10 +1078,10 @@ gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: GST_DEBUG_OBJECT (this, "got EOS"); - pw_thread_loop_lock (this->loop); + pw_thread_loop_lock (this->core->loop); this->eos = true; - pw_thread_loop_signal (this->loop, FALSE); - pw_thread_loop_unlock (this->loop); + pw_thread_loop_signal (this->core->loop, FALSE); + pw_thread_loop_unlock (this->core->loop); ret = TRUE; break; default: @@ -1159,11 +1106,15 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: /* uncork and start recording */ + pw_thread_loop_lock (this->core->loop); pw_stream_set_active(this->stream, true); + pw_thread_loop_unlock (this->core->loop); break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: /* stop recording ASAP by corking */ + pw_thread_loop_lock (this->core->loop); pw_stream_set_active(this->stream, false); + pw_thread_loop_unlock (this->core->loop); break; default: break; diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index e7ea864e1..2fd06a606 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -30,6 +30,7 @@ #include #include +#include G_BEGIN_DECLS @@ -75,12 +76,8 @@ struct _GstPipeWireSrc { GstClockTime min_latency; GstClockTime max_latency; - struct pw_thread_loop *loop; - - struct pw_context *context; - struct pw_core *core; + GstPipeWireCore *core; struct spa_hook core_listener; - int last_error; int last_seq; int pending_seq; diff --git a/src/gst/meson.build b/src/gst/meson.build index ad0e08016..e6c097f1e 100644 --- a/src/gst/meson.build +++ b/src/gst/meson.build @@ -1,5 +1,6 @@ pipewire_gst_sources = [ 'gstpipewire.c', + 'gstpipewirecore.c', 'gstpipewireclock.c', 'gstpipewiredeviceprovider.c', 'gstpipewireformat.c', @@ -10,6 +11,7 @@ pipewire_gst_sources = [ pipewire_gst_headers = [ 'gstpipewireclock.h', + 'gstpipewirecore.h', 'gstpipewiredeviceprovider.h', 'gstpipewireformat.h', 'gstpipewirepool.h',