core_proxy: prepare to rename pw_remote -> pw_core_proxy

The pw_remote object is really a wrapper around the pw_core_proxy.
The events it emits are also available in the core proxy and are
generally awkward to use.

With some clever new pw_core_proxy_* methods and a pw_core_connect
to create the core_proxy, we can convert all code away from pw_remote.

This is a first step in this conversion, using the pw_remote behind
the scenes. It leaks into some places because it really needs to become
its own struct in a next step.
This commit is contained in:
Wim Taymans 2019-12-06 11:48:40 +01:00
parent f8aabe69fe
commit 8a959ea7a1
37 changed files with 919 additions and 1185 deletions

View file

@ -28,6 +28,8 @@
#include <string.h>
#include <spa/utils/result.h>
#include <gst/gst.h>
#include "gstpipewireformat.h"
@ -167,7 +169,7 @@ struct pending {
void *data;
};
struct remote_data {
struct core_data {
int seq;
GstPipeWireDeviceProvider *self;
struct spa_hook core_listener;
@ -200,7 +202,7 @@ struct port_data {
struct pending pending;
};
static struct node_data *find_node_data(struct remote_data *rd, uint32_t id)
static struct node_data *find_node_data(struct core_data *rd, uint32_t id)
{
struct node_data *n;
spa_list_for_each(n, &rd->nodes, link) {
@ -343,36 +345,28 @@ on_core_done (void *data, uint32_t id, int seq)
}
}
static void
on_core_error(void *data, uint32_t id, int seq, int res, const char *message)
{
GstPipeWireDeviceProvider *self = data;
pw_log_error("error id:%u seq:%d res:%d (%s): %s",
id, seq, res, spa_strerror(res), message);
if (id == 0) {
self->error = res;
}
pw_thread_loop_signal(self->main_loop, FALSE);
}
static const struct pw_core_proxy_events core_events = {
PW_VERSION_CORE_EVENTS,
.info = on_core_info,
.done = on_core_done,
.error = on_core_error,
};
static void
on_state_changed (void *data, enum pw_remote_state old, enum pw_remote_state state, const char *error)
{
struct remote_data *rd = data;
GstPipeWireDeviceProvider *self = rd->self;
GST_DEBUG ("got remote state %d", state);
switch (state) {
case PW_REMOTE_STATE_CONNECTING:
case PW_REMOTE_STATE_UNCONNECTED:
break;
case PW_REMOTE_STATE_CONNECTED:
self->core_proxy = pw_remote_get_core_proxy(self->remote);
pw_core_proxy_add_listener(self->core_proxy, &rd->core_listener, &core_events, self);
break;
case PW_REMOTE_STATE_ERROR:
GST_ERROR_OBJECT (self, "remote error: %s", error);
break;
}
if (self->main_loop)
pw_thread_loop_signal (self->main_loop, FALSE);
}
static void port_event_info(void *data, const struct pw_port_info *info)
{
struct port_data *port_data = data;
@ -456,7 +450,7 @@ static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
uint32_t type, uint32_t version,
const struct spa_dict *props)
{
struct remote_data *rd = data;
struct core_data *rd = data;
GstPipeWireDeviceProvider *self = rd->self;
struct node_data *nd;
@ -525,62 +519,34 @@ static const struct pw_registry_proxy_events registry_events = {
.global_remove = registry_event_global_remove,
};
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_state_changed,
};
static GList *
gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
{
GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (provider);
struct pw_loop *l = NULL;
struct pw_core *c = NULL;
struct pw_remote *r = NULL;
struct remote_data *data;
struct spa_hook listener;
struct core_data *data;
GST_DEBUG_OBJECT (self, "starting probe");
if (!(l = pw_loop_new (NULL)))
return NULL;
if (!(c = pw_core_new (l, NULL, 0)))
if (!(c = pw_core_new (l, NULL, sizeof(*data))))
return NULL;
if (!(r = pw_remote_new (c, NULL, sizeof(*data))))
goto failed;
data = pw_remote_get_user_data(r);
data = pw_core_get_user_data(c);
data->self = self;
spa_list_init(&data->nodes);
spa_list_init(&data->ports);
spa_list_init(&self->pending);
pw_remote_add_listener(r, &listener, &remote_events, data);
if (pw_remote_connect (r) < 0)
self->core_proxy = pw_core_connect (c, NULL, 0);
if (self->core_proxy == NULL)
goto failed;
for (;;) {
enum pw_remote_state state;
const char *error = NULL;
state = pw_remote_get_state(r, &error);
if (state <= 0) {
GST_ERROR_OBJECT (self, "Failed to connect: %s", error);
goto failed;
}
if (state == PW_REMOTE_STATE_CONNECTED)
break;
/* Wait until something happens */
pw_loop_iterate (l, -1);
}
GST_DEBUG_OBJECT (self, "connected");
pw_core_proxy_add_listener(self->core_proxy, &data->core_listener, &core_events, self);
self->end = FALSE;
self->list_only = TRUE;
@ -588,18 +554,19 @@ gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
data->registry = pw_core_proxy_get_registry(self->core_proxy, PW_VERSION_REGISTRY_PROXY, 0);
pw_registry_proxy_add_listener(data->registry, &data->registry_listener, &registry_events, data);
pw_core_proxy_sync(self->core_proxy, 0, self->seq++);
for (;;) {
if (pw_remote_get_state(r, NULL) <= 0)
if (self->error < 0)
break;
if (self->end)
break;
pw_loop_iterate (l, -1);
}
pw_remote_disconnect (r);
pw_remote_destroy (r);
GST_DEBUG_OBJECT (self, "disconnect");
pw_core_proxy_disconnect (self->core_proxy);
pw_core_destroy (c);
pw_loop_destroy (l);
@ -614,7 +581,7 @@ static gboolean
gst_pipewire_device_provider_start (GstDeviceProvider * provider)
{
GstPipeWireDeviceProvider *self = GST_PIPEWIRE_DEVICE_PROVIDER (provider);
struct remote_data *data;
struct core_data *data;
GST_DEBUG_OBJECT (self, "starting provider");
@ -627,7 +594,7 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
goto failed_main_loop;
}
if (!(self->core = pw_core_new (self->loop, NULL, 0))) {
if (!(self->core = pw_core_new (self->loop, NULL, sizeof(*data)))) {
GST_ERROR_OBJECT (self, "Could not create PipeWire core");
goto failed_core;
}
@ -639,60 +606,41 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
pw_thread_loop_lock (self->main_loop);
if (!(self->remote = pw_remote_new (self->core, NULL, sizeof(*data)))) {
GST_ERROR_OBJECT (self, "Failed to create remote");
goto failed_remote;
if ((self->core_proxy = pw_core_connect (self->core, NULL, 0)) == NULL) {
GST_ERROR_OBJECT (self, "Failed to connect");
goto failed_connect;
}
data = pw_remote_get_user_data(self->remote);
GST_DEBUG_OBJECT (self, "connected");
data = pw_core_get_user_data(self->core);
data->self = self;
spa_list_init(&data->nodes);
spa_list_init(&data->ports);
pw_remote_add_listener (self->remote, &self->remote_listener, &remote_events, data);
if (pw_remote_connect (self->remote) < 0)
goto not_running;
for (;;) {
enum pw_remote_state state;
const char *error = NULL;
state = pw_remote_get_state(self->remote, &error);
if (state <= 0) {
GST_WARNING_OBJECT (self, "Failed to connect: %s", error);
goto not_running;
}
if (state == PW_REMOTE_STATE_CONNECTED)
break;
/* Wait until something happens */
pw_thread_loop_wait (self->main_loop);
}
GST_DEBUG_OBJECT (self, "connected");
pw_core_proxy_add_listener(self->core_proxy, &data->core_listener, &core_events, self);
self->registry = pw_core_proxy_get_registry(self->core_proxy, PW_VERSION_REGISTRY_PROXY, 0);
data->registry = self->registry;
pw_registry_proxy_add_listener(self->registry, &data->registry_listener, &registry_events, data);
pw_core_proxy_sync(self->core_proxy, 0, self->seq++);
for (;;) {
if (self->error < 0)
break;
if (self->end)
break;
pw_thread_loop_wait (self->main_loop);
}
GST_DEBUG_OBJECT (self, "started");
pw_thread_loop_unlock (self->main_loop);
return TRUE;
not_running:
pw_remote_destroy (self->remote);
self->remote = NULL;
failed_remote:
failed_connect:
pw_thread_loop_unlock (self->main_loop);
failed_start:
pw_core_destroy (self->core);
@ -713,10 +661,9 @@ gst_pipewire_device_provider_stop (GstDeviceProvider * provider)
GST_DEBUG_OBJECT (self, "stopping provider");
if (self->remote) {
pw_remote_disconnect (self->remote);
pw_remote_destroy (self->remote);
self->remote = NULL;
if (self->core_proxy) {
pw_core_proxy_disconnect (self->core_proxy);
self->core_proxy = NULL;
}
if (self->core) {
pw_core_destroy (self->core);

View file

@ -88,15 +88,13 @@ struct _GstPipeWireDeviceProvider {
struct pw_core *core;
struct pw_remote *remote;
struct spa_hook remote_listener;
struct pw_core_proxy *core_proxy;
struct spa_list pending;
int seq;
struct pw_registry_proxy *registry;
int error;
gboolean end;
gboolean list_only;
GList **devices;

View file

@ -665,7 +665,7 @@ gst_pipewire_sink_start (GstBaseSink * basesink)
}
pw_thread_loop_lock (pwsink->main_loop);
pwsink->stream = pw_stream_new (pwsink->remote, pwsink->client_name, props);
pwsink->stream = pw_stream_new (pwsink->core_proxy, pwsink->client_name, props);
pwsink->pool->stream = pwsink->stream;
pw_stream_add_listener(pwsink->stream,
@ -697,62 +697,22 @@ gst_pipewire_sink_stop (GstBaseSink * basesink)
return TRUE;
}
static void
on_remote_state_changed (void *data, enum pw_remote_state old, enum pw_remote_state state, const char *error)
{
GstPipeWireSink *pwsink = data;
GST_DEBUG ("got remote state %d", state);
switch (state) {
case PW_REMOTE_STATE_UNCONNECTED:
case PW_REMOTE_STATE_CONNECTING:
case PW_REMOTE_STATE_CONNECTED:
break;
case PW_REMOTE_STATE_ERROR:
GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED,
("remote error: %s", error), (NULL));
break;
}
pw_thread_loop_signal (pwsink->main_loop, FALSE);
}
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_remote_state_changed,
};
static gboolean
gst_pipewire_sink_open (GstPipeWireSink * pwsink)
{
const char *error = NULL;
if (pw_thread_loop_start (pwsink->main_loop) < 0)
goto mainloop_error;
pw_thread_loop_lock (pwsink->main_loop);
pwsink->remote = pw_remote_new (pwsink->core, NULL, 0);
pw_remote_add_listener (pwsink->remote,
&pwsink->remote_listener,
&remote_events, pwsink);
if (pwsink->fd == -1)
pw_remote_connect (pwsink->remote);
pwsink->core_proxy = pw_core_connect (pwsink->core, NULL, 0);
else
pw_remote_connect_fd (pwsink->remote, dup(pwsink->fd));
pwsink->core_proxy = pw_core_connect_fd (pwsink->core, dup(pwsink->fd), NULL, 0);
while (TRUE) {
enum pw_remote_state state = pw_remote_get_state (pwsink->remote, &error);
if (pwsink->core_proxy == NULL)
goto connect_error;
if (state == PW_REMOTE_STATE_CONNECTED)
break;
if (state == PW_REMOTE_STATE_ERROR)
goto connect_error;
pw_thread_loop_wait (pwsink->main_loop);
}
pw_thread_loop_unlock (pwsink->main_loop);
return TRUE;
@ -766,6 +726,8 @@ mainloop_error:
}
connect_error:
{
GST_ELEMENT_ERROR (pwsink, RESOURCE, FAILED,
("Failed to connect"), (NULL));
pw_thread_loop_unlock (pwsink->main_loop);
return FALSE;
}
@ -774,26 +736,13 @@ connect_error:
static gboolean
gst_pipewire_sink_close (GstPipeWireSink * pwsink)
{
const char *error = NULL;
pw_thread_loop_lock (pwsink->main_loop);
if (pwsink->stream) {
pw_stream_disconnect (pwsink->stream);
}
if (pwsink->remote) {
pw_remote_disconnect (pwsink->remote);
while (TRUE) {
enum pw_remote_state state = pw_remote_get_state (pwsink->remote, &error);
if (state == PW_REMOTE_STATE_UNCONNECTED)
break;
if (state == PW_REMOTE_STATE_ERROR)
break;
pw_thread_loop_wait (pwsink->main_loop);
}
if (pwsink->core_proxy) {
pw_core_proxy_disconnect (pwsink->core_proxy);
pwsink->core_proxy = NULL;
}
pw_thread_loop_unlock (pwsink->main_loop);
@ -803,12 +752,6 @@ gst_pipewire_sink_close (GstPipeWireSink * pwsink)
pw_stream_destroy (pwsink->stream);
pwsink->stream = NULL;
}
if (pwsink->remote) {
pw_remote_destroy (pwsink->remote);
pwsink->remote = NULL;
}
return TRUE;
}

View file

@ -87,8 +87,7 @@ struct _GstPipeWireSink {
struct pw_thread_loop *main_loop;
struct pw_core *core;
struct pw_remote *remote;
struct spa_hook remote_listener;
struct pw_core_proxy *core_proxy;
struct pw_stream *stream;
struct spa_hook stream_listener;

View file

@ -504,9 +504,6 @@ gst_pipewire_src_stream_start (GstPipeWireSrc *pwsrc)
if (state == PW_STREAM_STATE_ERROR)
goto start_error;
if (pw_remote_get_state(pwsrc->remote, &error) == PW_REMOTE_STATE_ERROR)
goto start_error;
pw_thread_loop_wait (pwsrc->main_loop);
}
@ -542,9 +539,6 @@ wait_negotiated (GstPipeWireSrc *this)
if (state == PW_STREAM_STATE_ERROR)
break;
if (pw_remote_get_state(this->remote, &error) == PW_REMOTE_STATE_ERROR)
break;
if (this->started)
break;
@ -638,9 +632,6 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
if (state == PW_STREAM_STATE_ERROR)
goto connect_error;
if (pw_remote_get_state(pwsrc->remote, &error) == PW_REMOTE_STATE_ERROR)
goto connect_error;
pw_thread_loop_wait (pwsrc->main_loop);
}
pw_thread_loop_unlock (pwsrc->main_loop);
@ -915,26 +906,6 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
return TRUE;
}
static void
on_remote_state_changed (void *data, enum pw_remote_state old, enum pw_remote_state state, const char *error)
{
GstPipeWireSrc *pwsrc = data;
GST_DEBUG ("got remote state %s", pw_remote_state_as_string (state));
switch (state) {
case PW_REMOTE_STATE_UNCONNECTED:
case PW_REMOTE_STATE_CONNECTING:
case PW_REMOTE_STATE_CONNECTED:
break;
case PW_REMOTE_STATE_ERROR:
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED,
("remote error: %s", error), (NULL));
break;
}
pw_thread_loop_signal (pwsrc->main_loop, FALSE);
}
static gboolean
copy_properties (GQuark field_id,
const GValue *value,
@ -949,11 +920,6 @@ copy_properties (GQuark field_id,
return TRUE;
}
static const struct pw_remote_events remote_events = {
PW_VERSION_REMOTE_EVENTS,
.state_changed = on_remote_state_changed,
};
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_state_changed,
@ -967,37 +933,20 @@ static gboolean
gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
{
struct pw_properties *props;
const char *error = NULL;
if (pw_thread_loop_start (pwsrc->main_loop) < 0)
goto mainloop_failed;
pw_thread_loop_lock (pwsrc->main_loop);
if ((pwsrc->remote = pw_remote_new (pwsrc->core, NULL, 0)) == NULL)
goto no_remote;
pw_remote_add_listener (pwsrc->remote,
&pwsrc->remote_listener,
&remote_events, pwsrc);
if (pwsrc->fd == -1)
pw_remote_connect (pwsrc->remote);
pwsrc->core_proxy = pw_core_connect (pwsrc->core, NULL, 0);
else
pw_remote_connect_fd (pwsrc->remote, dup(pwsrc->fd));
pwsrc->core_proxy = pw_core_connect_fd (pwsrc->core, dup(pwsrc->fd), NULL, 0);
while (TRUE) {
enum pw_remote_state state = pw_remote_get_state(pwsrc->remote, &error);
GST_DEBUG ("waiting for CONNECTED, now %s", pw_remote_state_as_string (state));
if (state == PW_REMOTE_STATE_CONNECTED)
break;
if (state == PW_REMOTE_STATE_ERROR)
if (pwsrc->core_proxy == NULL)
goto connect_error;
pw_thread_loop_wait (pwsrc->main_loop);
}
if (pwsrc->properties) {
props = pw_properties_new (NULL, NULL);
gst_structure_foreach (pwsrc->properties, copy_properties, props);
@ -1005,7 +954,8 @@ gst_pipewire_src_open (GstPipeWireSrc * pwsrc)
props = NULL;
}
if ((pwsrc->stream = pw_stream_new (pwsrc->remote, pwsrc->client_name, props)) == NULL)
if ((pwsrc->stream = pw_stream_new (pwsrc->core_proxy,
pwsrc->client_name, props)) == NULL)
goto no_stream;
@ -1026,14 +976,9 @@ mainloop_failed:
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("error starting mainloop"), (NULL));
return FALSE;
}
no_remote:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't create remote"), (NULL));
pw_thread_loop_unlock (pwsrc->main_loop);
return FALSE;
}
connect_error:
{
GST_ELEMENT_ERROR (pwsrc, RESOURCE, FAILED, ("can't connect"), (NULL));
pw_thread_loop_unlock (pwsrc->main_loop);
return FALSE;
}
@ -1065,8 +1010,8 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
pw_stream_destroy (pwsrc->stream);
pwsrc->stream = NULL;
pw_remote_destroy (pwsrc->remote);
pwsrc->remote = NULL;
pw_core_proxy_disconnect (pwsrc->core_proxy);
pwsrc->core_proxy = NULL;
}
static GstStateChangeReturn

View file

@ -75,8 +75,7 @@ struct _GstPipeWireSrc {
struct pw_thread_loop *main_loop;
struct pw_core *core;
struct pw_remote *remote;
struct spa_hook remote_listener;
struct pw_core_proxy *core_proxy;
struct pw_stream *stream;
struct spa_hook stream_listener;