From ed9614077cfbc1a9917d4e6707a78e2337c8296e Mon Sep 17 00:00:00 2001 From: Pauli Virtanen Date: Tue, 16 Feb 2021 22:03:08 +0200 Subject: [PATCH] pulse-server: implement node latency offset Also add facility for storing user data on objects. --- src/modules/module-protocol-pulse/manager.c | 47 +++++ src/modules/module-protocol-pulse/manager.h | 2 + .../module-protocol-pulse/pulse-server.c | 197 +++++++++++++++++- 3 files changed, 237 insertions(+), 9 deletions(-) diff --git a/src/modules/module-protocol-pulse/manager.c b/src/modules/module-protocol-pulse/manager.c index 97a05e14b..5c1372190 100644 --- a/src/modules/module-protocol-pulse/manager.c +++ b/src/modules/module-protocol-pulse/manager.c @@ -54,6 +54,12 @@ struct object_info { void (*destroy) (struct object *object); }; +struct object_data { + struct spa_list link; + const char *id; + size_t size; +}; + struct object { struct pw_manager_object this; @@ -65,6 +71,8 @@ struct object { struct spa_hook proxy_listener; struct spa_hook object_listener; + + struct spa_list data_list; }; static void core_sync(struct manager *m) @@ -160,6 +168,7 @@ static void object_update_params(struct object *o) static void object_destroy(struct object *o) { struct manager *m = o->manager; + struct object_data *d; spa_list_remove(&o->this.link); m->this.n_objects--; if (o->this.proxy) @@ -168,6 +177,10 @@ static void object_destroy(struct object *o) pw_properties_free(o->this.props); clear_params(&o->this.param_list, SPA_ID_INVALID); clear_params(&o->pending_list, SPA_ID_INVALID); + spa_list_consume(d, &o->data_list, link) { + spa_list_remove(&d->link); + free(d); + } free(o); } @@ -555,6 +568,7 @@ static void registry_event_global(void *data, uint32_t id, o->this.creating = true; spa_list_init(&o->this.param_list); spa_list_init(&o->pending_list); + spa_list_init(&o->data_list); o->manager = m; o->info = info; @@ -747,3 +761,36 @@ void pw_manager_destroy(struct pw_manager *manager) free(m); } + +static struct object_data *object_find_data(struct object *o, const char *id) +{ + struct object_data *d; + spa_list_for_each(d, &o->data_list, link) { + if (strcmp(d->id, id) == 0) + return d; + } + return NULL; +} + +void *pw_manager_object_add_data(struct pw_manager_object *obj, const char *id, size_t size) +{ + struct object *o = SPA_CONTAINER_OF(obj, struct object, this); + struct object_data *d; + + d = object_find_data(o, id); + if (d != NULL) { + if (d->size == size) + goto done; + spa_list_remove(&d->link); + free(d); + } + + d = calloc(1, sizeof(struct object_data) + size); + d->id = id; + d->size = size; + + spa_list_append(&o->data_list, &d->link); + +done: + return SPA_MEMBER(d, sizeof(struct object_data), void); +} diff --git a/src/modules/module-protocol-pulse/manager.h b/src/modules/module-protocol-pulse/manager.h index c568eb12b..5d6ec75ba 100644 --- a/src/modules/module-protocol-pulse/manager.h +++ b/src/modules/module-protocol-pulse/manager.h @@ -104,6 +104,8 @@ int pw_manager_for_each_object(struct pw_manager *manager, int (*callback) (void *data, struct pw_manager_object *object), void *data); +void *pw_manager_object_add_data(struct pw_manager_object *o, const char *id, size_t size); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index 6d405832b..1cce96203 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -146,6 +146,11 @@ struct client { struct pw_manager_object *prev_default_source; }; +struct latency_offset_data { + int64_t prev_latency_offset; + unsigned int initialized:1; +}; + struct buffer_attr { uint32_t maxlength; uint32_t tlength; @@ -724,6 +729,61 @@ static uint32_t get_event_and_id(struct client *client, struct pw_manager_object static struct pw_manager_object *find_device(struct client *client, uint32_t id, const char *name, bool sink); +static int64_t get_node_latency_offset(struct pw_manager_object *o) +{ + int64_t latency_offset = 0LL; + struct pw_manager_param *p; + + spa_list_for_each(p, &o->param_list, link) { + if (p->id != SPA_PARAM_Props) + continue; + if (spa_pod_parse_object(p->param, + SPA_TYPE_OBJECT_Props, NULL, + SPA_PROP_latencyOffsetNsec, SPA_POD_Long(&latency_offset)) == 1) + break; + } + return latency_offset; +} + +static void send_latency_offset_subscribe_event(struct client *client, struct pw_manager_object *o) +{ + struct latency_offset_data *d; + struct pw_node_info *info; + const char *str; + uint32_t card_id = SPA_ID_INVALID; + int64_t latency_offset = 0LL; + bool changed = false; + + if (!object_is_sink(o) && !object_is_source_or_monitor(o)) + return; + + /* + * Pulseaudio sends card change events on latency offset change. + */ + + if ((info = o->info) == NULL || info->props == NULL) + return; + if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL) + card_id = (uint32_t)atoi(str); + if (card_id == SPA_ID_INVALID) + return; + + d = pw_manager_object_add_data(o, "latency_offset_data", sizeof(struct latency_offset_data)); + if (d == NULL) + return; + + latency_offset = get_node_latency_offset(o); + changed = (!d->initialized || latency_offset != d->prev_latency_offset); + + d->prev_latency_offset = latency_offset; + d->initialized = true; + + if (changed) + send_subscribe_event(client, + SUBSCRIPTION_EVENT_CARD | SUBSCRIPTION_EVENT_CHANGE, + card_id); +} + static void send_default_change_subscribe_event(struct client *client, bool sink, bool source) { struct pw_manager_object *def; @@ -800,6 +860,7 @@ static void manager_updated(void *data, struct pw_manager_object *o) event | SUBSCRIPTION_EVENT_CHANGE, id); + send_latency_offset_subscribe_event(client, o); send_default_change_subscribe_event(client, object_is_sink(o), object_is_source_or_monitor(o)); } @@ -2815,8 +2876,8 @@ static int set_node_volume_mute(struct pw_manager_object *o, return 0; } -static int set_card_volume_mute(struct pw_manager_object *o, uint32_t id, - uint32_t device_id, struct volume *vol, bool *mute) +static int set_card_volume_mute_delay(struct pw_manager_object *o, uint32_t id, + uint32_t device_id, struct volume *vol, bool *mute, int64_t *latency_offset) { char buf[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf)); @@ -2844,6 +2905,9 @@ static int set_card_volume_mute(struct pw_manager_object *o, uint32_t id, if (mute) spa_pod_builder_add(&b, SPA_PROP_mute, SPA_POD_Bool(*mute), 0); + if (latency_offset) + spa_pod_builder_add(&b, + SPA_PROP_latencyOffsetNsec, SPA_POD_Long(*latency_offset), 0); spa_pod_builder_pop(&b, &f[1]); param = spa_pod_builder_pop(&b, &f[0]); @@ -3024,8 +3088,8 @@ static int do_set_volume(struct client *client, uint32_t command, uint32_t tag, goto done; if (card != NULL && dev_info.active_port != SPA_ID_INVALID) - res = set_card_volume_mute(card, dev_info.active_port, - dev_info.device, &volume, NULL); + res = set_card_volume_mute_delay(card, dev_info.active_port, + dev_info.device, &volume, NULL, NULL); else res = set_node_volume_mute(o, &volume, NULL); @@ -3089,8 +3153,8 @@ static int do_set_mute(struct client *client, uint32_t command, uint32_t tag, st goto done; if (card != NULL && dev_info.active_port != SPA_ID_INVALID) - res = set_card_volume_mute(card, dev_info.active_port, - dev_info.device, NULL, &mute); + res = set_card_volume_mute_delay(card, dev_info.active_port, + dev_info.device, NULL, &mute, NULL); else res = set_node_volume_mute(o, NULL, &mute); @@ -3156,6 +3220,77 @@ static int do_set_port(struct client *client, uint32_t command, uint32_t tag, st return reply_simple_ack(client, tag); } +static int do_set_port_latency_offset(struct client *client, uint32_t command, uint32_t tag, struct message *m) +{ + struct impl *impl = client->impl; + struct pw_manager *manager = client->manager; + const char *port_name = NULL; + struct pw_manager_object *card; + struct selector sel; + struct card_info card_info = CARD_INFO_INIT; + struct port_info *port_info; + int64_t offset; + int64_t value; + int res; + uint32_t n_ports; + size_t i; + + spa_zero(sel); + sel.key = PW_KEY_DEVICE_NAME; + sel.type = object_is_card; + + if ((res = message_get(m, + TAG_U32, &sel.id, + TAG_STRING, &sel.value, + TAG_STRING, &port_name, + TAG_S64, &offset, + TAG_INVALID)) < 0) + return -EPROTO; + + pw_log_info(NAME" %p: [%s] %s tag:%u index:%u card_name:%s port_name:%s offset:%"PRIi64, impl, + client->name, commands[command].name, tag, sel.id, sel.value, port_name, offset); + + if ((sel.id == SPA_ID_INVALID && sel.value == NULL) || + (sel.id != SPA_ID_INVALID && sel.value != NULL)) + return -EINVAL; + if (port_name == NULL) + return -EINVAL; + + value = offset * 1000; /* to nsec */ + + if ((card = select_object(manager, &sel)) == NULL) + return -ENOENT; + + collect_card_info(card, &card_info); + port_info = alloca(card_info.n_ports * sizeof(*port_info)); + card_info.active_profile = SPA_ID_INVALID; + n_ports = collect_port_info(card, &card_info, NULL, port_info); + + /* Set offset on all devices of the port */ + res = -ENOENT; + for (i = 0; i < n_ports; i++) { + struct port_info *pi = &port_info[i]; + size_t j; + + if (strcmp(pi->name, port_name) != 0) + continue; + + res = 0; + for (j = 0; j < pi->n_devices; ++j) { + res = set_card_volume_mute_delay(card, pi->id, pi->devices[j], NULL, NULL, &value); + if (res < 0) + break; + } + + if (res < 0) + break; + + return reply_simple_ack(client, tag); + } + + return res; +} + static int do_set_stream_name(struct client *client, uint32_t command, uint32_t tag, struct message *m) { struct impl *impl = client->impl; @@ -3484,6 +3619,49 @@ static int fill_module_info(struct client *client, struct message *m, return 0; } +static int64_t get_port_latency_offset(struct client *client, struct pw_manager_object *card, struct port_info *pi) +{ + struct pw_manager *m = client->manager; + struct pw_manager_object *o; + size_t j; + + /* + * The latency offset is a property of nodes in Pipewire, so we look it up on the + * nodes. We'll return the latency offset of the first node in the port. + * + * This is also because we need to be consistent with + * send_latency_offset_subscribe_event, which sends events on node changes. The + * route data might not be updated yet when these events arrive. + */ + for (j = 0; j < pi->n_devices; ++j) { + spa_list_for_each(o, &m->object_list, link) { + const char *str; + uint32_t card_id = SPA_ID_INVALID; + uint32_t device_id = SPA_ID_INVALID; + struct pw_node_info *info; + + if (o->creating || o->removing) + continue; + if (!object_is_sink(o) && !object_is_source_or_monitor(o)) + continue; + if ((info = o->info) == NULL || info->props == NULL) + continue; + if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL) + card_id = (uint32_t)atoi(str); + if (card_id != card->id) + continue; + + if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL) + device_id = (uint32_t)atoi(str); + + if (device_id == pi->devices[j]) + return get_node_latency_offset(o); + } + } + + return 0LL; +} + static int fill_card_info(struct client *client, struct message *m, struct pw_manager_object *o) { @@ -3583,8 +3761,9 @@ static int fill_card_info(struct client *client, struct message *m, TAG_INVALID); } if (client->version >= 27) { + int64_t latency_offset = get_port_latency_offset(client, o, pi); message_put(m, - TAG_S64, 0LL, /* port latency */ + TAG_S64, latency_offset / 1000, /* port latency offset */ TAG_INVALID); } if (client->version >= 34) { @@ -4758,7 +4937,7 @@ static int do_error_access(struct client *client, uint32_t command, uint32_t tag return -EACCES; } -static int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m) +static SPA_UNUSED int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m) { return -ENOSYS; } @@ -4911,7 +5090,7 @@ static const struct command commands[COMMAND_MAX] = [COMMAND_SET_SOURCE_OUTPUT_MUTE] = { "SET_SOURCE_OUTPUT_MUTE", do_set_stream_mute, }, /* Supported since protocol v27 (3.0) */ - [COMMAND_SET_PORT_LATENCY_OFFSET] = { "SET_PORT_LATENCY_OFFSET", do_error_not_implemented, }, + [COMMAND_SET_PORT_LATENCY_OFFSET] = { "SET_PORT_LATENCY_OFFSET", do_set_port_latency_offset, }, /* Supported since protocol v30 (6.0) */ /* BOTH DIRECTIONS */