gst: share the core between connections

Make all sources in the same process with the same fd share the
connection to the server. This makes it possible to set the same
fd on multiple sources/sinks and have them all use the same
connection, like when capturing multiple monitors from screencast
with the portal.

Fixes #241
This commit is contained in:
Wim Taymans 2020-07-13 17:42:09 +02:00
parent ba96eecba0
commit 70652d1a37
7 changed files with 328 additions and 188 deletions

View file

@ -231,11 +231,6 @@ gst_pipewire_src_finalize (GObject * object)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (object);
pw_context_destroy (pwsrc->context);
pwsrc->context = NULL;
pw_thread_loop_destroy (pwsrc->loop);
pwsrc->loop = NULL;
if (pwsrc->properties)
gst_structure_free (pwsrc->properties);
if (pwsrc->clock)
@ -380,10 +375,6 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->client_name = g_strdup(pw_get_client_name ());
src->pool = gst_pipewire_pool_new ();
src->loop = pw_thread_loop_new ("pipewire-main-loop", NULL);
src->context = pw_context_new (pw_thread_loop_get_loop(src->loop), NULL, 0);
GST_DEBUG ("loop %p context %p", src->loop, src->context);
}
static gboolean
@ -400,10 +391,10 @@ buffer_recycle (GstMiniObject *obj)
data->queued = TRUE;
GST_LOG_OBJECT (obj, "recycle buffer");
pw_thread_loop_lock (src->loop);
pw_thread_loop_lock (src->core->loop);
if (src->stream)
pw_stream_queue_buffer (src->stream, data->b);
pw_thread_loop_unlock (src->loop);
pw_thread_loop_unlock (src->core->loop);
return FALSE;
}
@ -488,7 +479,7 @@ static void
on_process (void *_data)
{
GstPipeWireSrc *pwsrc = _data;
pw_thread_loop_signal (pwsrc->loop, FALSE);
pw_thread_loop_signal (pwsrc->core->loop, FALSE);
}
static void
@ -511,7 +502,7 @@ on_state_changed (void *data,
("stream error: %s", error), (NULL));
break;
}
pw_thread_loop_signal (pwsrc->loop, FALSE);
pw_thread_loop_signal (pwsrc->core->loop, FALSE);
}
static void
@ -540,7 +531,7 @@ static gboolean
gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
{
const char *error = NULL;
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
GST_DEBUG_OBJECT (pwsrc, "doing stream start");
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
@ -552,21 +543,21 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
if (state == PW_STREAM_STATE_ERROR)
goto start_error;
pw_thread_loop_wait (pwsrc->loop);
pw_thread_loop_wait (pwsrc->core->loop);
}
parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream));
GST_DEBUG_OBJECT (pwsrc, "signal started");
pwsrc->started = TRUE;
pw_thread_loop_signal (pwsrc->loop, FALSE);
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_signal (pwsrc->core->loop, FALSE);
pw_thread_loop_unlock (pwsrc->core->loop);
return TRUE;
start_error:
{
GST_DEBUG_OBJECT (pwsrc, "error starting stream: %s", error);
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
@ -577,7 +568,7 @@ wait_negotiated (GstPipeWireSrc *this)
enum pw_stream_state state;
const char *error = NULL;
pw_thread_loop_lock (this->loop);
pw_thread_loop_lock (this->core->loop);
while (TRUE) {
state = pw_stream_get_state (this->stream, &error);
@ -590,10 +581,10 @@ wait_negotiated (GstPipeWireSrc *this)
if (this->started)
break;
pw_thread_loop_wait (this->loop);
pw_thread_loop_wait (this->core->loop);
}
GST_DEBUG_OBJECT (this, "got started signal");
pw_thread_loop_unlock (this->loop);
pw_thread_loop_unlock (this->core->loop);
return state;
}
@ -640,7 +631,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
gst_caps_unref (caps);
/* first disconnect */
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream);
@ -656,7 +647,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
goto connect_error;
}
pw_thread_loop_wait (pwsrc->loop);
pw_thread_loop_wait (pwsrc->core->loop);
}
}
@ -680,9 +671,9 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
if (state == PW_STREAM_STATE_ERROR)
goto connect_error;
pw_thread_loop_wait (pwsrc->loop);
pw_thread_loop_wait (pwsrc->core->loop);
}
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
result = gst_pipewire_src_stream_start (pwsrc);
@ -717,7 +708,7 @@ no_common_caps:
}
connect_error:
{
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
@ -777,11 +768,11 @@ gst_pipewire_src_unlock (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
GST_DEBUG_OBJECT (pwsrc, "setting flushing");
pwsrc->flushing = TRUE;
pw_thread_loop_signal (pwsrc->loop, FALSE);
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_signal (pwsrc->core->loop, FALSE);
pw_thread_loop_unlock (pwsrc->core->loop);
return TRUE;
}
@ -791,10 +782,10 @@ gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
GST_DEBUG_OBJECT (pwsrc, "unsetting flushing");
pwsrc->flushing = FALSE;
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return TRUE;
}
@ -863,7 +854,7 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (!pwsrc->negotiated)
goto not_negotiated;
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
while (TRUE) {
enum pw_stream_state state;
@ -895,9 +886,9 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
break;
}
}
pw_thread_loop_wait (pwsrc->loop);
pw_thread_loop_wait (pwsrc->core->loop);
}
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
if (pwsrc->always_copy) {
*buffer = gst_buffer_copy_deep (buf);
@ -946,17 +937,17 @@ not_negotiated:
}
streaming_eos:
{
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return GST_FLOW_EOS;
}
streaming_error:
{
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return GST_FLOW_ERROR;
}
streaming_stopped:
{
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return GST_FLOW_FLUSHING;
}
}
@ -974,10 +965,10 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
pwsrc->eos = false;
gst_buffer_replace (&pwsrc->last_buffer, NULL);
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return TRUE;
}
@ -1005,52 +996,16 @@ static const struct pw_stream_events stream_events = {
.process = on_process,
};
static void on_core_done (void *object, uint32_t id, int seq)
{
GstPipeWireSrc * pwsrc = object;
if (id == PW_ID_CORE) {
pwsrc->last_seq = seq;
pw_thread_loop_signal (pwsrc->loop, FALSE);
}
}
static const struct pw_core_events core_events = {
PW_VERSION_CORE_EVENTS,
.done = on_core_done,
};
static void do_sync(GstPipeWireSrc * pwsrc)
{
pwsrc->pending_seq = pw_core_sync(pwsrc->core, 0, pwsrc->pending_seq);
while (true) {
if (pwsrc->last_seq == pwsrc->pending_seq || pwsrc->last_error < 0)
break;
pw_thread_loop_wait (pwsrc->loop);
}
}
static gboolean
gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
{
struct pw_properties *props;
if (pw_thread_loop_start (pwsrc->loop) < 0)
goto mainloop_failed;
pw_thread_loop_lock (pwsrc->loop);
if (pwsrc->fd == -1)
pwsrc->core = pw_context_connect (pwsrc->context, NULL, 0);
else
pwsrc->core = pw_context_connect_fd (pwsrc->context, dup(pwsrc->fd), NULL, 0);
pwsrc->core = gst_pipewire_core_get(pwsrc->fd);
if (pwsrc->core == NULL)
goto connect_error;
pw_core_add_listener(pwsrc->core,
&pwsrc->core_listener,
&core_events,
pwsrc);
pw_thread_loop_lock (pwsrc->core->loop);
if (pwsrc->properties) {
props = pw_properties_new (NULL, NULL);
@ -1059,7 +1014,7 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
props = NULL;
}
if ((pwsrc->stream = pw_stream_new (pwsrc->core,
if ((pwsrc->stream = pw_stream_new (pwsrc->core->core,
pwsrc->client_name, props)) == NULL)
goto no_stream;
@ -1070,26 +1025,20 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
pwsrc);
pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time);
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return TRUE;
/* ERRORS */
mainloop_failed:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("error starting mainloop"), (NULL));
return FALSE;
}
connect_error:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't connect"), (NULL));
pw_thread_loop_unlock (pwsrc->loop);
return FALSE;
}
no_stream:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't create stream"), (NULL));
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
@ -1107,19 +1056,17 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
g_clear_object (&pwsrc->clock);
GST_OBJECT_UNLOCK (pwsrc);
pw_thread_loop_lock (pwsrc->loop);
pw_thread_loop_lock (pwsrc->core->loop);
if (pwsrc->stream) {
pw_stream_destroy (pwsrc->stream);
pwsrc->stream = NULL;
}
pw_thread_loop_unlock (pwsrc->core->loop);
if (pwsrc->core) {
do_sync(pwsrc);
pw_core_disconnect (pwsrc->core);
gst_pipewire_core_release (pwsrc->core);
pwsrc->core = NULL;
}
pw_thread_loop_unlock (pwsrc->loop);
pw_thread_loop_stop (pwsrc->loop);
}
static gboolean
@ -1131,10 +1078,10 @@ gst_pipewire_src_send_event (GstElement * elem, GstEvent * event)
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
GST_DEBUG_OBJECT (this, "got EOS");
pw_thread_loop_lock (this->loop);
pw_thread_loop_lock (this->core->loop);
this->eos = true;
pw_thread_loop_signal (this->loop, FALSE);
pw_thread_loop_unlock (this->loop);
pw_thread_loop_signal (this->core->loop, FALSE);
pw_thread_loop_unlock (this->core->loop);
ret = TRUE;
break;
default:
@ -1159,11 +1106,15 @@ gst_pipewire_src_change_state (GstElement * element, GstStateChange transition)
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
/* uncork and start recording */
pw_thread_loop_lock (this->core->loop);
pw_stream_set_active(this->stream, true);
pw_thread_loop_unlock (this->core->loop);
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
/* stop recording ASAP by corking */
pw_thread_loop_lock (this->core->loop);
pw_stream_set_active(this->stream, false);
pw_thread_loop_unlock (this->core->loop);
break;
default:
break;