gst: rework device provider

Use the node and port info to enum formats when available.
Use simple resync after each operation, when nothing is pending,
loop over the results.
This commit is contained in:
Wim Taymans 2020-08-11 17:26:04 +02:00
parent 799fe7f90c
commit 82b0644e23

View file

@ -162,13 +162,6 @@ enum
PROP_LAST PROP_LAST
}; };
struct pending {
struct spa_list link;
int seq;
void (*callback) (void *data);
void *data;
};
struct core_data { struct core_data {
int seq; int seq;
GstPipeWireDeviceProvider *self; GstPipeWireDeviceProvider *self;
@ -176,7 +169,6 @@ struct core_data {
struct pw_registry *registry; struct pw_registry *registry;
struct spa_hook registry_listener; struct spa_hook registry_listener;
struct spa_list nodes; struct spa_list nodes;
struct spa_list ports;
}; };
struct node_data { struct node_data {
@ -189,17 +181,14 @@ struct node_data {
struct pw_node_info *info; struct pw_node_info *info;
GstCaps *caps; GstCaps *caps;
GstDevice *dev; GstDevice *dev;
struct pending pending;
}; };
struct port_data { struct port_data {
struct spa_list link;
struct node_data *node_data; struct node_data *node_data;
struct pw_port *proxy; struct pw_port *proxy;
struct spa_hook proxy_listener; struct spa_hook proxy_listener;
uint32_t id; uint32_t id;
struct spa_hook port_listener; struct spa_hook port_listener;
struct pending pending;
}; };
static struct node_data *find_node_data(struct core_data *rd, uint32_t id) static struct node_data *find_node_data(struct core_data *rd, uint32_t id)
@ -259,15 +248,15 @@ new_node (GstPipeWireDeviceProvider *self, struct node_data *data)
return GST_DEVICE (gstdev); return GST_DEVICE (gstdev);
} }
static void do_add_node(void *data) static void do_add_nodes(struct core_data *rd)
{ {
struct port_data *p = data; GstPipeWireDeviceProvider *self = rd->self;
struct node_data *nd = p->node_data; struct node_data *nd;
GstPipeWireDeviceProvider *self = nd->self;
if (nd->dev)
return;
spa_list_for_each(nd, &rd->nodes, link) {
if (nd->dev != NULL)
continue;
pw_log_info("add node %d", nd->id);
nd->dev = new_node (self, nd); nd->dev = new_node (self, nd);
if (nd->dev) { if (nd->dev) {
if(self->list_only) if(self->list_only)
@ -275,42 +264,24 @@ static void do_add_node(void *data)
else else
gst_device_provider_device_add (GST_DEVICE_PROVIDER (self), nd->dev); gst_device_provider_device_add (GST_DEVICE_PROVIDER (self), nd->dev);
} }
}
static void add_pending(GstPipeWireDeviceProvider *self, struct pending *p,
void (*callback) (void *data), void *data)
{
spa_list_append(&self->pending, &p->link);
p->callback = callback;
p->data = data;
pw_log_debug("add pending %d", p->seq);
self->seq = p->seq = pw_core_sync(self->core, 0, self->seq);
}
static void remove_pending(struct pending *p)
{
if (p->seq != 0) {
pw_log_debug("remove pending %d", p->seq);
spa_list_remove(&p->link);
p->seq = 0;
} }
} }
static void resync(GstPipeWireDeviceProvider *self)
{
self->seq = pw_core_sync(self->core, PW_ID_CORE, self->seq);
pw_log_debug("resync %d", self->seq);
}
static void static void
on_core_done (void *data, uint32_t id, int seq) on_core_done (void *data, uint32_t id, int seq)
{ {
GstPipeWireDeviceProvider *self = data; struct core_data *rd = data;
struct pending *p, *t; GstPipeWireDeviceProvider *self = rd->self;
spa_list_for_each_safe(p, t, &self->pending, link) {
if (p->seq == seq) {
remove_pending(p);
if (p->callback)
p->callback(p->data);
}
}
pw_log_debug("check %d %d", seq, self->seq); pw_log_debug("check %d %d", seq, self->seq);
if (seq == self->seq) { if (id == PW_ID_CORE && seq == self->seq) {
do_add_nodes(rd);
self->end = true; self->end = true;
if (self->loop) if (self->loop)
pw_thread_loop_signal (self->loop, FALSE); pw_thread_loop_signal (self->loop, FALSE);
@ -321,7 +292,8 @@ on_core_done (void *data, uint32_t id, int seq)
static void static void
on_core_error(void *data, uint32_t id, int seq, int res, const char *message) on_core_error(void *data, uint32_t id, int seq, int res, const char *message)
{ {
GstPipeWireDeviceProvider *self = data; struct core_data *rd = data;
GstPipeWireDeviceProvider *self = rd->self;
pw_log_error("error id:%u seq:%d res:%d (%s): %s", pw_log_error("error id:%u seq:%d res:%d (%s): %s",
id, seq, res, spa_strerror(res), message); id, seq, res, spa_strerror(res), message);
@ -341,7 +313,24 @@ static const struct pw_core_events core_events = {
static void port_event_info(void *data, const struct pw_port_info *info) static void port_event_info(void *data, const struct pw_port_info *info)
{ {
struct port_data *port_data = data; struct port_data *port_data = data;
struct node_data *node_data = port_data->node_data;
uint32_t i;
pw_log_debug("%p", port_data); pw_log_debug("%p", port_data);
if (info->change_mask & PW_PORT_CHANGE_MASK_PARAMS) {
for (i = 0; i < info->n_params; i++) {
uint32_t id = info->params[i].id;
if (id == SPA_PARAM_EnumFormat &&
info->params[i].flags & SPA_PARAM_INFO_READ &&
node_data->caps == NULL) {
node_data->caps = gst_caps_new_empty ();
pw_port_enum_params(port_data->proxy, 0, id, 0, UINT32_MAX, NULL);
resync(node_data->self);
}
}
}
} }
static void port_event_param(void *data, int seq, uint32_t id, static void port_event_param(void *data, int seq, uint32_t id,
@ -351,8 +340,6 @@ static void port_event_param(void *data, int seq, uint32_t id,
struct node_data *node_data = port_data->node_data; struct node_data *node_data = port_data->node_data;
GstCaps *c1; GstCaps *c1;
pw_log_debug("%p", port_data);
c1 = gst_caps_from_format (param); c1 = gst_caps_from_format (param);
if (c1 && node_data->caps) if (c1 && node_data->caps)
gst_caps_append (node_data->caps, c1); gst_caps_append (node_data->caps, c1);
@ -367,13 +354,42 @@ static const struct pw_port_events port_events = {
static void node_event_info(void *data, const struct pw_node_info *info) static void node_event_info(void *data, const struct pw_node_info *info)
{ {
struct node_data *node_data = data; struct node_data *node_data = data;
uint32_t i;
pw_log_debug("%p", node_data->proxy); pw_log_debug("%p", node_data->proxy);
node_data->info = pw_node_info_update(node_data->info, info);
info = node_data->info = pw_node_info_update(node_data->info, info);
if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) {
for (i = 0; i < info->n_params; i++) {
uint32_t id = info->params[i].id;
if (id == SPA_PARAM_EnumFormat &&
info->params[i].flags & SPA_PARAM_INFO_READ &&
node_data->caps == NULL) {
node_data->caps = gst_caps_new_empty ();
pw_node_enum_params(node_data->proxy, 0, id, 0, UINT32_MAX, NULL);
resync(node_data->self);
}
}
}
}
static void node_event_param(void *data, int seq, uint32_t id,
uint32_t index, uint32_t next, const struct spa_pod *param)
{
struct node_data *node_data = data;
GstCaps *c1;
c1 = gst_caps_from_format (param);
if (c1 && node_data->caps)
gst_caps_append (node_data->caps, c1);
} }
static const struct pw_node_events node_events = { static const struct pw_node_events node_events = {
PW_VERSION_NODE_EVENTS, PW_VERSION_NODE_EVENTS,
.info = node_event_info .info = node_event_info,
.param = node_event_param
}; };
static void static void
@ -392,8 +408,6 @@ destroy_node (void *data)
pw_log_debug("destroy %p", nd); pw_log_debug("destroy %p", nd);
remove_pending(&nd->pending);
if (nd->dev != NULL) { if (nd->dev != NULL) {
gst_device_provider_device_remove (provider, GST_DEVICE (nd->dev)); gst_device_provider_device_remove (provider, GST_DEVICE (nd->dev));
} }
@ -423,8 +437,6 @@ destroy_port (void *data)
{ {
struct port_data *pd = data; struct port_data *pd = data;
pw_log_debug("destroy %p", pd); pw_log_debug("destroy %p", pd);
remove_pending(&pd->pending);
spa_list_remove(&pd->link);
} }
static const struct pw_proxy_events proxy_port_events = { static const struct pw_proxy_events proxy_port_events = {
@ -465,11 +477,10 @@ static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
nd->self = self; nd->self = self;
nd->proxy = node; nd->proxy = node;
nd->id = id; nd->id = id;
nd->caps = gst_caps_new_empty ();
spa_list_append(&rd->nodes, &nd->link); spa_list_append(&rd->nodes, &nd->link);
pw_node_add_listener(node, &nd->node_listener, &node_events, nd); pw_node_add_listener(node, &nd->node_listener, &node_events, nd);
pw_proxy_add_listener((struct pw_proxy*)node, &nd->proxy_listener, &proxy_node_events, nd); pw_proxy_add_listener((struct pw_proxy*)node, &nd->proxy_listener, &proxy_node_events, nd);
add_pending(self, &nd->pending, NULL, NULL); resync(self);
} }
else if (strcmp(type, PW_TYPE_INTERFACE_Port) == 0) { else if (strcmp(type, PW_TYPE_INTERFACE_Port) == 0) {
struct pw_port *port; struct pw_port *port;
@ -490,12 +501,9 @@ static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
pd->node_data = nd; pd->node_data = nd;
pd->proxy = port; pd->proxy = port;
pd->id = id; pd->id = id;
spa_list_append(&rd->ports, &pd->link);
pw_port_add_listener(port, &pd->port_listener, &port_events, pd); pw_port_add_listener(port, &pd->port_listener, &port_events, pd);
pw_proxy_add_listener((struct pw_proxy*)port, &pd->proxy_listener, &proxy_port_events, pd); pw_proxy_add_listener((struct pw_proxy*)port, &pd->proxy_listener, &proxy_port_events, pd);
pw_port_enum_params((struct pw_port*)port, resync(self);
0, SPA_PARAM_EnumFormat, 0, 0, NULL);
add_pending(self, &pd->pending, do_add_node, pd);
} }
return; return;
@ -534,7 +542,6 @@ gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
data = pw_context_get_user_data(c); data = pw_context_get_user_data(c);
data->self = self; data->self = self;
spa_list_init(&data->nodes); spa_list_init(&data->nodes);
spa_list_init(&data->ports);
spa_list_init(&self->pending); spa_list_init(&self->pending);
self->core = pw_context_connect (c, NULL, 0); self->core = pw_context_connect (c, NULL, 0);
@ -542,7 +549,7 @@ gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
goto failed; goto failed;
GST_DEBUG_OBJECT (self, "connected"); GST_DEBUG_OBJECT (self, "connected");
pw_core_add_listener(self->core, &data->core_listener, &core_events, self); pw_core_add_listener(self->core, &data->core_listener, &core_events, data);
self->end = FALSE; self->end = FALSE;
self->list_only = TRUE; self->list_only = TRUE;
@ -551,7 +558,7 @@ gst_pipewire_device_provider_probe (GstDeviceProvider * provider)
data->registry = pw_core_get_registry(self->core, PW_VERSION_REGISTRY, 0); data->registry = pw_core_get_registry(self->core, PW_VERSION_REGISTRY, 0);
pw_registry_add_listener(data->registry, &data->registry_listener, &registry_events, data); pw_registry_add_listener(data->registry, &data->registry_listener, &registry_events, data);
pw_core_sync(self->core, 0, self->seq++); resync(self);
for (;;) { for (;;) {
if (self->error < 0) if (self->error < 0)
@ -612,15 +619,14 @@ gst_pipewire_device_provider_start (GstDeviceProvider * provider)
data = pw_context_get_user_data(self->context); data = pw_context_get_user_data(self->context);
data->self = self; data->self = self;
spa_list_init(&data->nodes); spa_list_init(&data->nodes);
spa_list_init(&data->ports);
pw_core_add_listener(self->core, &data->core_listener, &core_events, self); pw_core_add_listener(self->core, &data->core_listener, &core_events, data);
self->registry = pw_core_get_registry(self->core, PW_VERSION_REGISTRY, 0); self->registry = pw_core_get_registry(self->core, PW_VERSION_REGISTRY, 0);
data->registry = self->registry; data->registry = self->registry;
pw_registry_add_listener(self->registry, &data->registry_listener, &registry_events, data); pw_registry_add_listener(self->registry, &data->registry_listener, &registry_events, data);
pw_core_sync(self->core, 0, self->seq++); resync(self);
for (;;) { for (;;) {
if (self->error < 0) if (self->error < 0)