pulse: rework sync

Use a global sync that both syncs the globals and completes the
operations.

In the case of a card profile change, first the nodes are removed
and added and then the Profile and Route info updated. We need to
be able to bind to the new node and get the device.profile.id
before we can find the active port of the node.

See #279
This commit is contained in:
Wim Taymans 2020-09-07 14:36:25 +02:00
parent 185a3d4c36
commit 71441565fd
3 changed files with 98 additions and 75 deletions

View file

@ -270,16 +270,6 @@ static const char *str_efac(pa_subscription_event_type_t event)
return "invalid"; return "invalid";
} }
static void global_sync(struct global *g)
{
pa_operation *o;
pa_context *c = g->context;
g->pending_seq = pw_proxy_sync(g->proxy, 0);
spa_list_for_each(o, &c->operations, link)
o->seq = g->pending_seq;
}
static void emit_event(pa_context *c, struct global *g, pa_subscription_event_type_t event) static void emit_event(pa_context *c, struct global *g, pa_subscription_event_type_t event)
{ {
if (c->subscribe_callback && (c->subscribe_mask & g->mask)) { if (c->subscribe_callback && (c->subscribe_mask & g->mask)) {
@ -301,6 +291,35 @@ static void emit_event(pa_context *c, struct global *g, pa_subscription_event_ty
} }
} }
static void do_global_sync(struct global *g)
{
pa_subscription_event_type_t event;
if (g->ginfo && g->ginfo->sync)
g->ginfo->sync(g);
if (g->init) {
if ((g->mask & (PA_SUBSCRIPTION_MASK_SINK_INPUT | PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT))) {
if (g->node_info.device_index == SPA_ID_INVALID ||
(g->stream && g->stream->state != PA_STREAM_READY))
return;
}
g->init = false;
event = PA_SUBSCRIPTION_EVENT_NEW;
} else {
event = PA_SUBSCRIPTION_EVENT_CHANGE;
}
pw_log_debug("emit because of pending");
emit_event(g->context, g, event);
}
static void global_sync(struct global *g)
{
pa_context *c = g->context;
c->pending_seq = pw_core_sync(c->core, PW_ID_CORE, c->pending_seq);
g->sync = true;
}
static struct param *add_param(struct spa_list *params, uint32_t id, const struct spa_pod *param) static struct param *add_param(struct spa_list *params, uint32_t id, const struct spa_pod *param)
{ {
struct param *p; struct param *p;
@ -394,8 +413,11 @@ static void device_event_info(void *object, const struct pw_device_info *info)
remove_params(&g->card_info.ports, id); remove_params(&g->card_info.ports, id);
g->card_info.n_ports = 0; g->card_info.n_ports = 0;
break; break;
case SPA_PARAM_Profile:
case SPA_PARAM_Route: case SPA_PARAM_Route:
remove_params(&g->card_info.routes, id);
g->card_info.n_routes = 0;
break;
case SPA_PARAM_Profile:
break; break;
default: default:
do_enum = false; do_enum = false;
@ -500,7 +522,6 @@ static void device_event_param(void *object, int seq,
const struct spa_pod *param) const struct spa_pod *param)
{ {
struct global *g = object; struct global *g = object;
pa_context *c = g->context;
switch (id) { switch (id) {
case SPA_PARAM_EnumProfile: case SPA_PARAM_EnumProfile:
@ -557,29 +578,21 @@ static void device_event_param(void *object, int seq,
{ {
uint32_t index, device; uint32_t index, device;
enum spa_direction direction; enum spa_direction direction;
struct spa_pod *props = NULL;
struct global *ng;
if (spa_pod_parse_object(param, if (spa_pod_parse_object(param,
SPA_TYPE_OBJECT_ParamRoute, NULL, SPA_TYPE_OBJECT_ParamRoute, NULL,
SPA_PARAM_ROUTE_index, SPA_POD_Int(&index), SPA_PARAM_ROUTE_index, SPA_POD_Int(&index),
SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction), SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction),
SPA_PARAM_ROUTE_device, SPA_POD_Int(&device), SPA_PARAM_ROUTE_device, SPA_POD_Int(&device)) < 0) {
SPA_PARAM_ROUTE_props, SPA_POD_OPT_Pod(&props)) < 0) {
pw_log_warn("device %d: can't parse route", g->id); pw_log_warn("device %d: can't parse route", g->id);
return; return;
} }
if (add_param(&g->card_info.routes, id, param))
g->card_info.n_routes++;
pw_log_debug("device %d: active %s route %d", g->id, pw_log_debug("device %d: active %s route %d device %d", g->id,
direction == SPA_DIRECTION_OUTPUT ? "output" : "input", direction == SPA_DIRECTION_OUTPUT ? "output" : "input",
index); index, device);
ng = find_node_for_route(c, g, device);
if (props && ng && ng->node_info.active_port != index) {
ng->node_info.active_port = index;
parse_props(ng, props, true);
emit_event(c, ng, PA_SUBSCRIPTION_EVENT_CHANGE);
}
break; break;
} }
default: default:
@ -702,6 +715,7 @@ static void device_clear_ports(struct global *g)
static void device_sync_ports(struct global *g) static void device_sync_ports(struct global *g)
{ {
pa_card_info *i = &g->card_info.info; pa_card_info *i = &g->card_info.info;
pa_context *c = g->context;
uint32_t n_ports, j; uint32_t n_ports, j;
struct param *p; struct param *p;
@ -741,6 +755,8 @@ static void device_sync_ports(struct global *g)
continue; continue;
} }
pw_log_debug("port %d: name:%s", j, name);
pi = i->ports[j] = &g->card_info.card_ports[j]; pi = i->ports[j] = &g->card_info.card_ports[j];
spa_zero(*pi); spa_zero(*pi);
pi->name = name; pi->name = name;
@ -800,6 +816,30 @@ static void device_sync_ports(struct global *g)
i->n_ports = j; i->n_ports = j;
if (i->n_ports == 0) if (i->n_ports == 0)
i->ports = NULL; i->ports = NULL;
spa_list_for_each(p, &g->card_info.routes, link) {
struct global *ng;
uint32_t index, device;
enum spa_direction direction;
struct spa_pod *props = NULL;
if (spa_pod_parse_object(p->param,
SPA_TYPE_OBJECT_ParamRoute, NULL,
SPA_PARAM_ROUTE_index, SPA_POD_Int(&index),
SPA_PARAM_ROUTE_direction, SPA_POD_Id(&direction),
SPA_PARAM_ROUTE_device, SPA_POD_Int(&device),
SPA_PARAM_ROUTE_props, SPA_POD_OPT_Pod(&props)) < 0) {
pw_log_warn("device %d: can't parse route", g->id);
continue;
}
ng = find_node_for_route(c, g, device);
if (props && ng && ng->node_info.active_port != index) {
ng->node_info.active_port = index;
parse_props(ng, props, true);
emit_event(c, ng, PA_SUBSCRIPTION_EVENT_CHANGE);
}
}
} }
static void device_sync(struct global *g) static void device_sync(struct global *g)
@ -831,6 +871,7 @@ static void device_destroy(void *data)
device_clear_ports(global); device_clear_ports(global);
device_clear_profiles(global); device_clear_profiles(global);
remove_params(&global->card_info.routes, SPA_ID_INVALID);
remove_params(&global->card_info.ports, SPA_ID_INVALID); remove_params(&global->card_info.ports, SPA_ID_INVALID);
remove_params(&global->card_info.profiles, SPA_ID_INVALID); remove_params(&global->card_info.profiles, SPA_ID_INVALID);
@ -1078,35 +1119,10 @@ static void proxy_destroy(void *data)
g->proxy = NULL; g->proxy = NULL;
} }
static void proxy_done(void *data, int seq)
{
struct global *g = data;
pa_subscription_event_type_t event;
if (g->pending_seq == seq) {
if (g->ginfo && g->ginfo->sync)
g->ginfo->sync(g);
if (g->init) {
if ((g->mask & (PA_SUBSCRIPTION_MASK_SINK_INPUT | PA_SUBSCRIPTION_MASK_SOURCE_OUTPUT))) {
if (g->node_info.device_index == SPA_ID_INVALID ||
(g->stream && g->stream->state != PA_STREAM_READY))
return;
}
g->init = false;
event = PA_SUBSCRIPTION_EVENT_NEW;
} else {
event = PA_SUBSCRIPTION_EVENT_CHANGE;
}
pw_log_debug("emit because of pending");
emit_event(g->context, g, event);
}
}
static const struct pw_proxy_events proxy_events = { static const struct pw_proxy_events proxy_events = {
PW_VERSION_PROXY_EVENTS, PW_VERSION_PROXY_EVENTS,
.removed = proxy_removed, .removed = proxy_removed,
.destroy = proxy_destroy, .destroy = proxy_destroy,
.done = proxy_done,
}; };
static void update_link(pa_context *c, uint32_t src_node_id, uint32_t dst_node_id) static void update_link(pa_context *c, uint32_t src_node_id, uint32_t dst_node_id)
@ -1153,6 +1169,7 @@ static int set_mask(pa_context *c, struct global *g)
ginfo = &device_info; ginfo = &device_info;
spa_list_init(&g->card_info.profiles); spa_list_init(&g->card_info.profiles);
spa_list_init(&g->card_info.ports); spa_list_init(&g->card_info.ports);
spa_list_init(&g->card_info.routes);
} else if (strcmp(g->type, PW_TYPE_INTERFACE_Node) == 0) { } else if (strcmp(g->type, PW_TYPE_INTERFACE_Node) == 0) {
if (g->props == NULL) if (g->props == NULL)
return 0; return 0;
@ -1347,19 +1364,6 @@ static const struct pw_registry_events registry_events =
.global_remove = registry_event_global_remove, .global_remove = registry_event_global_remove,
}; };
static void complete_operations(pa_context *c, int seq)
{
pa_operation *o, *t;
spa_list_for_each_safe(o, t, &c->operations, link) {
if (o->seq != seq)
continue;
pa_operation_ref(o);
if (o->callback)
o->callback(o, o->userdata);
pa_operation_unref(o);
}
}
static void core_info(void *data, const struct pw_core_info *info) static void core_info(void *data, const struct pw_core_info *info)
{ {
pa_context *c = data; pa_context *c = data;
@ -1394,8 +1398,25 @@ static void core_error(void *data, uint32_t id, int seq, int res, const char *me
static void core_done(void *data, uint32_t id, int seq) static void core_done(void *data, uint32_t id, int seq)
{ {
pa_context *c = data; pa_context *c = data;
pw_log_debug("done id:%u seq:%d", id, seq); pa_operation *o, *t;
complete_operations(c, seq); struct global *g;
pw_log_debug("done id:%u seq:%d/%d", id, seq, c->pending_seq);
if (c->pending_seq != seq)
return;
spa_list_for_each(g, &c->globals, link) {
if (g->sync) {
do_global_sync(g);
g->sync = false;
}
}
spa_list_for_each_safe(o, t, &c->operations, link) {
pa_operation_ref(o);
if (o->callback)
o->callback(o, o->userdata);
pa_operation_unref(o);
}
} }
static const struct pw_core_events core_events = { static const struct pw_core_events core_events = {

View file

@ -256,8 +256,8 @@ struct global {
pa_subscription_event_type_t event; pa_subscription_event_type_t event;
int priority_driver; int priority_driver;
int pending_seq;
int init:1; int init:1;
int sync:1;
void *info; void *info;
struct global_info *ginfo; struct global_info *ginfo;
@ -306,6 +306,8 @@ struct global {
uint32_t active_profile; uint32_t active_profile;
struct spa_list ports; struct spa_list ports;
uint32_t n_ports; uint32_t n_ports;
struct spa_list routes;
uint32_t n_routes;
pa_card_info info; pa_card_info info;
pa_card_profile_info2 *card_profiles; pa_card_profile_info2 *card_profiles;
unsigned int pending_profiles:1; unsigned int pending_profiles:1;
@ -371,6 +373,8 @@ struct pa_context {
int no_fail:1; int no_fail:1;
int disconnect:1; int disconnect:1;
int pending_seq;
struct global *metadata; struct global *metadata;
uint32_t default_sink; uint32_t default_sink;
uint32_t default_source; uint32_t default_source;
@ -485,7 +489,6 @@ struct pa_operation
pa_context *context; pa_context *context;
pa_stream *stream; pa_stream *stream;
int seq;
pa_operation_state_t state; pa_operation_state_t state;
pa_operation_cb_t callback; pa_operation_cb_t callback;

View file

@ -36,7 +36,6 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb
o->refcount = 1; o->refcount = 1;
o->context = c; o->context = c;
o->stream = s ? pa_stream_ref(s) : NULL; o->stream = s ? pa_stream_ref(s) : NULL;
o->seq = SPA_ID_INVALID;
o->state = PA_OPERATION_RUNNING; o->state = PA_OPERATION_RUNNING;
o->callback = cb; o->callback = cb;
@ -52,8 +51,8 @@ pa_operation *pa_operation_new(pa_context *c, pa_stream *s, pa_operation_cb_t cb
int pa_operation_sync(pa_operation *o) int pa_operation_sync(pa_operation *o)
{ {
pa_context *c = o->context; pa_context *c = o->context;
o->seq = pw_core_sync(c->core, PW_ID_CORE, 0); c->pending_seq = pw_core_sync(c->core, PW_ID_CORE, 0);
pw_log_debug("operation %p: sync seq:%d", o, o->seq); pw_log_debug("operation %p: sync seq:%d", o, c->pending_seq);
return 0; return 0;
} }
@ -70,14 +69,14 @@ static void operation_free(pa_operation *o)
{ {
pa_assert(!o->context); pa_assert(!o->context);
pa_assert(!o->stream); pa_assert(!o->stream);
pw_log_debug("%p seq:%d", o, o->seq); pw_log_debug("%p", o);
free(o); free(o);
} }
static void operation_unlink(pa_operation *o) { static void operation_unlink(pa_operation *o) {
pa_assert(o); pa_assert(o);
pw_log_debug("%p seq:%d", o, o->seq); pw_log_debug("%p", o);
if (o->context) { if (o->context) {
pa_assert(o->refcount >= 2); pa_assert(o->refcount >= 2);
@ -100,7 +99,7 @@ void pa_operation_unref(pa_operation *o)
{ {
pa_assert(o); pa_assert(o);
pa_assert(o->refcount >= 1); pa_assert(o->refcount >= 1);
pw_log_debug("%p seq:%d ref:%d", o, o->seq, o->refcount); pw_log_debug("%p ref:%d", o, o->refcount);
if (--o->refcount == 0) if (--o->refcount == 0)
operation_free(o); operation_free(o);
} }
@ -114,7 +113,7 @@ static void operation_set_state(pa_operation *o, pa_operation_state_t st) {
pa_operation_ref(o); pa_operation_ref(o);
pw_log_debug("new state %p seq:%d state:%d", o, o->seq, st); pw_log_debug("new state %p state:%d", o, st);
o->state = st; o->state = st;
if (o->state_callback) if (o->state_callback)
@ -132,7 +131,7 @@ void pa_operation_cancel(pa_operation *o)
{ {
pa_assert(o); pa_assert(o);
pa_assert(o->refcount >= 1); pa_assert(o->refcount >= 1);
pw_log_debug("%p seq:%d", o, o->seq); pw_log_debug("%p", o);
operation_set_state(o, PA_OPERATION_CANCELED); operation_set_state(o, PA_OPERATION_CANCELED);
} }