loop: pass spa_dict to *_loop_new

Make the thread_loop alloc its own loop by default to simplify
some core. Add extra new_full method to pass a custom pw_loop.
Make other loop implementations ready to support custom loops
if we want that later.
This commit is contained in:
Wim Taymans 2019-12-13 11:34:25 +01:00
parent 828bd30879
commit 698ab911c3
21 changed files with 215 additions and 192 deletions

View file

@ -340,8 +340,8 @@ on_core_done (void *data, uint32_t id, int seq)
pw_log_debug("check %d %d", seq, self->seq);
if (seq == self->seq) {
self->end = true;
if (self->main_loop)
pw_thread_loop_signal (self->main_loop, FALSE);
if (self->loop)
pw_thread_loop_signal (self->loop, FALSE);
}
}
@ -357,7 +357,7 @@ on_core_error(void *data, uint32_t id, int seq, int res, const char *message)
if (id == 0) {
self->error = res;
}
pw_thread_loop_signal(self->main_loop, FALSE);
pw_thread_loop_signal(self->loop, FALSE);
}
static const struct pw_core_events core_events = {
@ -585,26 +585,25 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
GST_DEBUG_OBJECT (self, "starting provider");
self->loop = pw_loop_new (NULL);
self->list_only = FALSE;
spa_list_init(&self->pending);
if (!(self->main_loop = pw_thread_loop_new (self->loop, "pipewire-device-monitor"))) {
if (!(self->loop = pw_thread_loop_new ("pipewire-device-monitor", NULL))) {
GST_ERROR_OBJECT (self, "Could not create PipeWire mainloop");
goto failed_main_loop;
goto failed_loop;
}
if (!(self->context = pw_context_new (self->loop, NULL, sizeof(*data)))) {
if (!(self->context = pw_context_new (pw_thread_loop_get_loop(self->loop), NULL, sizeof(*data)))) {
GST_ERROR_OBJECT (self, "Could not create PipeWire context");
goto failed_context;
}
if (pw_thread_loop_start (self->main_loop) < 0) {
if (pw_thread_loop_start (self->loop) < 0) {
GST_ERROR_OBJECT (self, "Could not start PipeWire mainloop");
goto failed_start;
}
pw_thread_loop_lock (self->main_loop);
pw_thread_loop_lock (self->loop);
if ((self->core = pw_context_connect (self->context, NULL, 0)) == NULL) {
GST_ERROR_OBJECT (self, "Failed to connect");
@ -631,26 +630,24 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
break;
if (self->end)
break;
pw_thread_loop_wait (self->main_loop);
pw_thread_loop_wait (self->loop);
}
GST_DEBUG_OBJECT (self, "started");
pw_thread_loop_unlock (self->main_loop);
pw_thread_loop_unlock (self->loop);
return TRUE;
failed_connect:
pw_thread_loop_unlock (self->main_loop);
pw_thread_loop_unlock (self->loop);
failed_start:
pw_context_destroy (self->context);
self->context = NULL;
failed_context:
pw_thread_loop_destroy (self->main_loop);
self->main_loop = NULL;
failed_main_loop:
pw_loop_destroy (self->loop);
pw_thread_loop_destroy (self->loop);
self->loop = NULL;
failed_loop:
return TRUE;
}
@ -669,12 +666,8 @@ gst_pipewire_device_provider_stop (GstDeviceProvider * provider)
pw_context_destroy (self->context);
self->context = NULL;
}
if (self->main_loop) {
pw_thread_loop_destroy (self->main_loop);
self->main_loop = NULL;
}
if (self->loop) {
pw_loop_destroy (self->loop);
pw_thread_loop_destroy (self->loop);
self->loop = NULL;
}
}

View file

@ -83,8 +83,7 @@ struct _GstPipeWireDeviceProvider {
gchar *client_name;
struct pw_loop *loop;
struct pw_thread_loop *main_loop;
struct pw_thread_loop *loop;
struct pw_context *context;

View file

@ -119,10 +119,7 @@ gst_pipewire_sink_finalize (GObject * object)
g_object_unref (pwsink->pool);
pw_thread_loop_destroy (pwsink->main_loop);
pwsink->main_loop = NULL;
pw_loop_destroy (pwsink->loop);
pw_thread_loop_destroy (pwsink->loop);
pwsink->loop = NULL;
if (pwsink->properties)
@ -264,9 +261,9 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink)
SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header)));
pw_thread_loop_lock (sink->main_loop);
pw_thread_loop_lock (sink->loop);
pw_stream_update_params (sink->stream, port_params, 2);
pw_thread_loop_unlock (sink->main_loop);
pw_thread_loop_unlock (sink->loop);
}
static void
@ -281,10 +278,9 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
g_queue_init (&sink->queue);
sink->loop = pw_loop_new (NULL);
sink->main_loop = pw_thread_loop_new (sink->loop, "pipewire-sink-loop");
sink->context = pw_context_new (sink->loop, NULL, 0);
GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop);
sink->loop = pw_thread_loop_new ("pipewire-sink-loop", NULL);
sink->context = pw_context_new (pw_thread_loop_get_loop(sink->loop), NULL, 0);
GST_DEBUG ("loop %p context %p", sink->loop, sink->context);
}
static GstCaps *
@ -406,7 +402,7 @@ on_add_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = _data;
gst_pipewire_pool_wrap_buffer (pwsink->pool, b);
pw_thread_loop_signal (pwsink->main_loop, FALSE);
pw_thread_loop_signal (pwsink->loop, FALSE);
}
static void
@ -455,7 +451,7 @@ do_send_buffer (GstPipeWireSink *pwsink)
if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) {
g_warning ("can't send buffer %s", spa_strerror(res));
pw_thread_loop_signal (pwsink->main_loop, FALSE);
pw_thread_loop_signal (pwsink->loop, FALSE);
} else
pwsink->need_ready--;
}
@ -496,7 +492,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
("stream error: %s", error), (NULL));
break;
}
pw_thread_loop_signal (pwsink->main_loop, FALSE);
pw_thread_loop_signal (pwsink->loop, FALSE);
}
static void
@ -524,7 +520,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat);
pw_thread_loop_lock (pwsink->main_loop);
pw_thread_loop_lock (pwsink->loop);
state = pw_stream_get_state (pwsink->stream, &error);
if (state == PW_STREAM_STATE_ERROR)
@ -554,12 +550,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (state == PW_STREAM_STATE_ERROR)
goto start_error;
pw_thread_loop_wait (pwsink->main_loop);
pw_thread_loop_wait (pwsink->loop);
}
}
res = TRUE;
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
pwsink->negotiated = res;
@ -568,7 +564,7 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
start_error:
{
GST_ERROR ("could not start stream: %s", error);
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
g_ptr_array_unref (possible);
return FALSE;
}
@ -586,7 +582,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
if (!pwsink->negotiated)
goto not_negotiated;
pw_thread_loop_lock (pwsink->main_loop);
pw_thread_loop_lock (pwsink->loop);
if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING)
goto done;
@ -616,7 +612,7 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
do_send_buffer (pwsink);
done:
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
return res;
@ -664,7 +660,7 @@ gst_pipewire_sink_start (GstBaseSink * basesink)
props = NULL;
}
pw_thread_loop_lock (pwsink->main_loop);
pw_thread_loop_lock (pwsink->loop);
pwsink->stream = pw_stream_new (pwsink->core, pwsink->client_name, props);
pwsink->pool->stream = pwsink->stream;
@ -673,7 +669,7 @@ gst_pipewire_sink_start (GstBaseSink * basesink)
&stream_events,
pwsink);
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
return TRUE;
}
@ -683,14 +679,14 @@ gst_pipewire_sink_stop (GstBaseSink * basesink)
{
GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (basesink);
pw_thread_loop_lock (pwsink->main_loop);
pw_thread_loop_lock (pwsink->loop);
if (pwsink->stream) {
pw_stream_disconnect (pwsink->stream);
pw_stream_destroy (pwsink->stream);
pwsink->stream = NULL;
pwsink->pool->stream = NULL;
}
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
pwsink->negotiated = FALSE;
@ -700,10 +696,10 @@ gst_pipewire_sink_stop (GstBaseSink * basesink)
static gboolean
gst_pipewire_sink_open (GstPipeWireSink * pwsink)
{
if (pw_thread_loop_start (pwsink->main_loop) < 0)
if (pw_thread_loop_start (pwsink->loop) < 0)
goto mainloop_error;
pw_thread_loop_lock (pwsink->main_loop);
pw_thread_loop_lock (pwsink->loop);
if (pwsink->fd == -1)
pwsink->core = pw_context_connect (pwsink->context, NULL, 0);
@ -713,7 +709,7 @@ gst_pipewire_sink_open (GstPipeWireSink * pwsink)
if (pwsink->core == NULL)
goto connect_error;
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
return TRUE;
@ -728,7 +724,7 @@ connect_error:
{
GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED,
("Failed to connect"), (NULL));
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
return FALSE;
}
}
@ -736,7 +732,7 @@ connect_error:
static gboolean
gst_pipewire_sink_close (GstPipeWireSink * pwsink)
{
pw_thread_loop_lock (pwsink->main_loop);
pw_thread_loop_lock (pwsink->loop);
if (pwsink->stream) {
pw_stream_disconnect (pwsink->stream);
}
@ -744,9 +740,9 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink)
pw_core_disconnect (pwsink->core);
pwsink->core = NULL;
}
pw_thread_loop_unlock (pwsink->main_loop);
pw_thread_loop_unlock (pwsink->loop);
pw_thread_loop_stop (pwsink->main_loop);
pw_thread_loop_stop (pwsink->loop);
if (pwsink->stream) {
pw_stream_destroy (pwsink->stream);

View file

@ -83,8 +83,7 @@ struct _GstPipeWireSink {
/* video state */
gboolean negotiated;
struct pw_loop *loop;
struct pw_thread_loop *main_loop;
struct pw_thread_loop *loop;
struct pw_context *context;
struct pw_core *core;

View file

@ -210,9 +210,7 @@ gst_pipewire_src_finalize (GObject * object)
pw_context_destroy (pwsrc->context);
pwsrc->context = NULL;
pw_thread_loop_destroy (pwsrc->main_loop);
pwsrc->main_loop = NULL;
pw_loop_destroy (pwsrc->loop);
pw_thread_loop_destroy (pwsrc->loop);
pwsrc->loop = NULL;
if (pwsrc->properties)
@ -329,10 +327,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->client_name = g_strdup(pw_get_client_name ());
src->pool = gst_pipewire_pool_new ();
src->loop = pw_loop_new (NULL);
src->main_loop = pw_thread_loop_new (src->loop, "pipewire-main-loop");
src->context = pw_context_new (src->loop, NULL, 0);
GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop);
src->loop = pw_thread_loop_new ("pipewire-main-loop", NULL);
src->context = pw_context_new (pw_thread_loop_get_loop(src->loop), NULL, 0);
GST_DEBUG ("loop %p context %p", src->loop, src->context);
}
@ -349,9 +346,9 @@ buffer_recycle (GstMiniObject *obj)
src = data->owner;
GST_LOG_OBJECT (obj, "recycle buffer");
pw_thread_loop_lock (src->main_loop);
pw_thread_loop_lock (src->loop);
pw_stream_queue_buffer (src->stream, data->b);
pw_thread_loop_unlock (src->main_loop);
pw_thread_loop_unlock (src->loop);
return FALSE;
}
@ -438,7 +435,7 @@ on_process (void *_data)
gst_buffer_ref (buf);
g_queue_push_tail (&pwsrc->queue, buf);
pw_thread_loop_signal (pwsrc->main_loop, FALSE);
pw_thread_loop_signal (pwsrc->loop, FALSE);
return;
}
@ -462,7 +459,7 @@ on_state_changed (void *data,
("stream error: %s", error), (NULL));
break;
}
pw_thread_loop_signal (pwsrc->main_loop, FALSE);
pw_thread_loop_signal (pwsrc->loop, FALSE);
}
static void
@ -492,7 +489,7 @@ static gboolean
gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
{
const char *error = NULL;
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
GST_DEBUG_OBJECT (pwsrc, "doing stream start");
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
@ -504,21 +501,21 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
if (state == PW_STREAM_STATE_ERROR)
goto start_error;
pw_thread_loop_wait (pwsrc->main_loop);
pw_thread_loop_wait (pwsrc->loop);
}
parse_stream_properties (pwsrc, pw_stream_get_properties (pwsrc->stream));
GST_DEBUG_OBJECT (pwsrc, "signal started");
pwsrc->started = TRUE;
pw_thread_loop_signal (pwsrc->main_loop, FALSE);
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_signal (pwsrc->loop, FALSE);
pw_thread_loop_unlock (pwsrc->loop);
return TRUE;
start_error:
{
GST_DEBUG_OBJECT (pwsrc, "error starting stream: %s", error);
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return FALSE;
}
}
@ -529,7 +526,7 @@ wait_negotiated (GstPipeWireSrc *this)
enum pw_stream_state state;
const char *error = NULL;
pw_thread_loop_lock (this->main_loop);
pw_thread_loop_lock (this->loop);
while (TRUE) {
state = pw_stream_get_state (this->stream, &error);
@ -542,10 +539,10 @@ wait_negotiated (GstPipeWireSrc *this)
if (this->started)
break;
pw_thread_loop_wait (this->main_loop);
pw_thread_loop_wait (this->loop);
}
GST_DEBUG_OBJECT (this, "got started signal");
pw_thread_loop_unlock (this->main_loop);
pw_thread_loop_unlock (this->loop);
return state;
}
@ -592,7 +589,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
gst_caps_unref (caps);
/* first disconnect */
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream);
@ -608,7 +605,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
goto connect_error;
}
pw_thread_loop_wait (pwsrc->main_loop);
pw_thread_loop_wait (pwsrc->loop);
}
}
@ -632,9 +629,9 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
if (state == PW_STREAM_STATE_ERROR)
goto connect_error;
pw_thread_loop_wait (pwsrc->main_loop);
pw_thread_loop_wait (pwsrc->loop);
}
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
result = gst_pipewire_src_stream_start (pwsrc);
@ -669,7 +666,7 @@ no_common_caps:
}
connect_error:
{
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return FALSE;
}
}
@ -726,11 +723,11 @@ gst_pipewire_src_unlock (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
GST_DEBUG_OBJECT (pwsrc, "setting flushing");
pwsrc->flushing = TRUE;
pw_thread_loop_signal (pwsrc->main_loop, FALSE);
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_signal (pwsrc->loop, FALSE);
pw_thread_loop_unlock (pwsrc->loop);
return TRUE;
}
@ -740,10 +737,10 @@ gst_pipewire_src_unlock_stop (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
GST_DEBUG_OBJECT (pwsrc, "unsetting flushing");
pwsrc->flushing = FALSE;
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return TRUE;
}
@ -812,7 +809,7 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (!pwsrc->negotiated)
goto not_negotiated;
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
while (TRUE) {
enum pw_stream_state state;
@ -834,9 +831,9 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
if (buf != NULL)
break;
pw_thread_loop_wait (pwsrc->main_loop);
pw_thread_loop_wait (pwsrc->loop);
}
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
gst_buffer_unref (buf);
@ -876,12 +873,12 @@ not_negotiated:
}
streaming_error:
{
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return GST_FLOW_ERROR;
}
streaming_stopped:
{
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return GST_FLOW_FLUSHING;
}
}
@ -899,9 +896,9 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
pwsrc = GST_PIPEWIRE_SRC (basesrc);
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
clear_queue (pwsrc);
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return TRUE;
}
@ -934,10 +931,10 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
{
struct pw_properties *props;
if (pw_thread_loop_start (pwsrc->main_loop) < 0)
if (pw_thread_loop_start (pwsrc->loop) < 0)
goto mainloop_failed;
pw_thread_loop_lock (pwsrc->main_loop);
pw_thread_loop_lock (pwsrc->loop);
if (pwsrc->fd == -1)
pwsrc->core = pw_context_connect (pwsrc->context, NULL, 0);
@ -966,7 +963,7 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
pwsrc->clock = gst_pipewire_clock_new (pwsrc->stream, pwsrc->last_time);
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return TRUE;
@ -979,13 +976,13 @@ mainloop_failed:
connect_error:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't connect"), (NULL));
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return FALSE;
}
no_stream:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't create stream"), (NULL));
pw_thread_loop_unlock (pwsrc->main_loop);
pw_thread_loop_unlock (pwsrc->loop);
return FALSE;
}
}
@ -995,7 +992,7 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
{
clear_queue (pwsrc);
pw_thread_loop_stop (pwsrc->main_loop);
pw_thread_loop_stop (pwsrc->loop);
pwsrc->last_time = gst_clock_get_time (pwsrc->clock);

View file

@ -71,8 +71,7 @@ struct _GstPipeWireSrc {
GstClockTime min_latency;
GstClockTime max_latency;
struct pw_loop *loop;
struct pw_thread_loop *main_loop;
struct pw_thread_loop *loop;
struct pw_context *context;
struct pw_core *core;