From 124b1221a60d966711a5300a8757cfaf2aaec295 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 27 Oct 2020 14:57:15 +0100 Subject: [PATCH] pulse-server: add more introspection Emit new/change/remove events Handle suspended state of peer Handle direct_on_input record streams. Place the tag in error messages --- src/modules/module-protocol-pulse/manager.c | 70 +++++- src/modules/module-protocol-pulse/manager.h | 5 +- .../module-protocol-pulse/pulse-server.c | 224 ++++++++++++------ 3 files changed, 219 insertions(+), 80 deletions(-) diff --git a/src/modules/module-protocol-pulse/manager.c b/src/modules/module-protocol-pulse/manager.c index b77c9b8c9..ce809c0af 100644 --- a/src/modules/module-protocol-pulse/manager.c +++ b/src/modules/module-protocol-pulse/manager.c @@ -134,9 +134,20 @@ static void object_destroy(struct object *o) /* client */ static void client_event_info(void *object, const struct pw_client_info *info) { - struct object *o = object; - pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask); + struct object *o = object; + int changed = 0; + + pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask); + info = o->this.info = pw_client_info_update(o->this.info, info); + + if (info->change_mask & PW_CLIENT_CHANGE_MASK_PROPS) + changed++; + + if (changed) { + o->this.changed += changed; + core_sync(o->manager); + } } static const struct pw_client_events client_events = { @@ -162,8 +173,19 @@ static const struct object_info client_info = { static void module_event_info(void *object, const struct pw_module_info *info) { struct object *o = object; - pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask); + int changed = 0; + + pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask); + info = o->this.info = pw_module_info_update(o->this.info, info); + + if (info->change_mask & PW_MODULE_CHANGE_MASK_PROPS) + changed++; + + if (changed) { + o->this.changed += changed; + core_sync(o->manager); + } } static const struct pw_module_events module_events = { @@ -191,9 +213,13 @@ static void device_event_info(void *object, const struct pw_device_info *info) struct object *o = object; uint32_t i, changed = 0; - pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask); + pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask); + info = o->this.info = pw_device_info_update(o->this.info, info); + if (info->change_mask & PW_DEVICE_CHANGE_MASK_PROPS) + changed++; + if (info->change_mask & PW_DEVICE_CHANGE_MASK_PARAMS) { for (i = 0; i < info->n_params; i++) { uint32_t id = info->params[i].id; @@ -202,17 +228,19 @@ static void device_event_info(void *object, const struct pw_device_info *info) continue; info->params[i].user = 0; + changed++; clear_params(&o->this.param_list, id); if (!(info->params[i].flags & SPA_PARAM_INFO_READ)) continue; pw_device_enum_params((struct pw_device*)o->this.proxy, 0, id, 0, -1, NULL); - changed++; } } - if (changed) + if (changed) { + o->this.changed += changed; core_sync(o->manager); + } } static void device_event_param(void *object, int seq, @@ -248,9 +276,17 @@ static void node_event_info(void *object, const struct pw_node_info *info) { struct object *o = object; uint32_t i, changed = 0; - pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask); + + pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask); + info = o->this.info = pw_node_info_update(o->this.info, info); + if (info->change_mask & PW_NODE_CHANGE_MASK_STATE) + changed++; + + if (info->change_mask & PW_NODE_CHANGE_MASK_PROPS) + changed++; + if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) { for (i = 0; i < info->n_params; i++) { uint32_t id = info->params[i].id; @@ -259,17 +295,19 @@ static void node_event_info(void *object, const struct pw_node_info *info) continue; info->params[i].user = 0; + changed++; clear_params(&o->this.param_list, id); if (!(info->params[i].flags & SPA_PARAM_INFO_READ)) continue; pw_node_enum_params((struct pw_node*)o->this.proxy, 0, id, 0, -1, NULL); - changed++; } } - if (changed) + if (changed) { + o->this.changed += changed; core_sync(o->manager); + } } static void node_event_param(void *object, int seq, @@ -405,11 +443,11 @@ static void registry_event_global(void *data, uint32_t id, o->this.version = version; o->this.props = props ? pw_properties_new_dict(props) : NULL; o->this.proxy = proxy; + o->new = true; spa_list_init(&o->this.param_list); o->manager = m; o->info = info; - o->new = true; spa_list_append(&m->this.object_list, &o->this.link); m->this.n_objects++; @@ -446,9 +484,21 @@ static const struct pw_registry_events registry_events = { static void on_core_done(void *data, uint32_t id, int seq) { struct manager *m = data; + struct object *o; + if (id == PW_ID_CORE) { if (m->sync_seq == seq) manager_emit_sync(m); + + spa_list_for_each(o, &m->this.object_list, this.link) { + if (o->new) { + o->new = false; + manager_emit_added(m, &o->this); + } else if (o->this.changed > 0) { + manager_emit_updated(m, &o->this); + o->this.changed = 0; + } + } } } diff --git a/src/modules/module-protocol-pulse/manager.h b/src/modules/module-protocol-pulse/manager.h index 290bd82fc..7d6eb8dfe 100644 --- a/src/modules/module-protocol-pulse/manager.h +++ b/src/modules/module-protocol-pulse/manager.h @@ -75,9 +75,10 @@ struct pw_manager_object { char *type; uint32_t version; struct pw_properties *props; - - void *info; struct pw_proxy *proxy; + + int changed; + void *info; struct spa_list param_list; }; diff --git a/src/modules/module-protocol-pulse/pulse-server.c b/src/modules/module-protocol-pulse/pulse-server.c index bfbd325ea..df65052e9 100644 --- a/src/modules/module-protocol-pulse/pulse-server.c +++ b/src/modules/module-protocol-pulse/pulse-server.c @@ -492,57 +492,6 @@ static void manager_sync(void *data) } } -static void manager_added(void *data, struct pw_manager_object *o) -{ - struct client *client = data; - const char *str; - - if (strcmp(o->type, PW_TYPE_INTERFACE_Core) == 0 && o->info != NULL) { - struct pw_core_info *info = o->info; - - if (info->props && - (str = spa_dict_lookup(info->props, "default.clock.rate")) != NULL) - client->default_rate = atoi(str); - client->cookie = info->cookie; - } -} - -static void manager_metadata(void *data, uint32_t subject, const char *key, - const char *type, const char *value) -{ - struct client *client = data; - uint32_t val; - bool changed = false; - - pw_log_debug("meta %d %s %s %s", subject, key, type, value); - if (subject == PW_ID_CORE) { - val = (key && value) ? (uint32_t)atoi(value) : SPA_ID_INVALID; - if (key == NULL || strcmp(key, "default.audio.sink") == 0) { - changed = client->default_sink != val; - client->default_sink = val; - } - if (key == NULL || strcmp(key, "default.audio.source") == 0) { - changed = client->default_source != val; - client->default_source = val; - } - } - if (changed) { - if (client->subscribed & SUBSCRIPTION_MASK_SERVER) { - send_subscribe_event(client, - SUBSCRIPTION_EVENT_CHANGE | - SUBSCRIPTION_EVENT_SERVER, - -1); - } - } -} - -static const struct pw_manager_events manager_events = { - PW_VERSION_MANAGER_EVENTS, - .sync = manager_sync, - .added = manager_added, - .metadata = manager_metadata, -}; - static bool is_client(struct pw_manager_object *o) { return strcmp(o->type, PW_TYPE_INTERFACE_Client) == 0; @@ -608,11 +557,138 @@ static bool is_source_output(struct pw_manager_object *o) strcmp(str, "Stream/Input/Audio") == 0; } +static bool is_recordable(struct pw_manager_object *o) +{ + return is_source(o) || is_sink(o) || is_sink_input(o); +} + static bool is_link(struct pw_manager_object *o) { return strcmp(o->type, PW_TYPE_INTERFACE_Link) == 0; } +static uint32_t get_event_and_id(struct client *client, struct pw_manager_object *o, uint32_t *id) +{ + uint32_t event = 0, res_id = o->id; + + if (client->subscribed & SUBSCRIPTION_MASK_SINK && + is_sink(o)) { + event = SUBSCRIPTION_EVENT_SINK; + } + else if (client->subscribed & SUBSCRIPTION_MASK_SOURCE && + is_source_or_monitor(o)) { + if (!is_source(o)) + res_id |= 0x10000U; + event = SUBSCRIPTION_EVENT_SOURCE; + } + else if (client->subscribed & SUBSCRIPTION_MASK_SINK_INPUT && + is_sink_input(o)) { + event = SUBSCRIPTION_EVENT_SINK_INPUT; + } + else if (client->subscribed & SUBSCRIPTION_MASK_SOURCE_OUTPUT && + is_source_output(o)) { + event = SUBSCRIPTION_EVENT_SOURCE_OUTPUT; + } + else if (client->subscribed & SUBSCRIPTION_MASK_MODULE && + is_module(o)) { + event = SUBSCRIPTION_EVENT_MODULE; + } + else if (client->subscribed & SUBSCRIPTION_MASK_CLIENT && + is_client(o)) { + event = SUBSCRIPTION_EVENT_CLIENT; + } + else if (client->subscribed & SUBSCRIPTION_MASK_CARD && + is_card(o)) { + event = SUBSCRIPTION_EVENT_CARD; + } else + event = SPA_ID_INVALID; + if (id) + *id = res_id; + return event; +} + +static void manager_added(void *data, struct pw_manager_object *o) +{ + struct client *client = data; + const char *str; + uint32_t event, id; + + if (strcmp(o->type, PW_TYPE_INTERFACE_Core) == 0 && o->info != NULL) { + struct pw_core_info *info = o->info; + + if (info->props && + (str = spa_dict_lookup(info->props, "default.clock.rate")) != NULL) + client->default_rate = atoi(str); + client->cookie = info->cookie; + return; + } + + if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID) + send_subscribe_event(client, + event | SUBSCRIPTION_EVENT_NEW, + id); +} + +static void manager_updated(void *data, struct pw_manager_object *o) +{ + struct client *client = data; + uint32_t event, id; + + if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID) + send_subscribe_event(client, + event | SUBSCRIPTION_EVENT_CHANGE, + id); +} + +static void manager_removed(void *data, struct pw_manager_object *o) +{ + struct client *client = data; + uint32_t event, id; + + if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID) + send_subscribe_event(client, + event | SUBSCRIPTION_EVENT_REMOVE, + id); +} + +static void manager_metadata(void *data, uint32_t subject, const char *key, + const char *type, const char *value) +{ + struct client *client = data; + uint32_t val; + bool changed = false; + + pw_log_debug("meta %d %s %s %s", subject, key, type, value); + if (subject == PW_ID_CORE) { + val = (key && value) ? (uint32_t)atoi(value) : SPA_ID_INVALID; + if (key == NULL || strcmp(key, "default.audio.sink") == 0) { + changed = client->default_sink != val; + client->default_sink = val; + } + if (key == NULL || strcmp(key, "default.audio.source") == 0) { + changed = client->default_source != val; + client->default_source = val; + } + } + if (changed) { + if (client->subscribed & SUBSCRIPTION_MASK_SERVER) { + send_subscribe_event(client, + SUBSCRIPTION_EVENT_CHANGE | + SUBSCRIPTION_EVENT_SERVER, + -1); + } + } +} + +static const struct pw_manager_events manager_events = { + PW_VERSION_MANAGER_EVENTS, + .sync = manager_sync, + .added = manager_added, + .updated = manager_updated, + .removed = manager_removed, + .metadata = manager_metadata, +}; + struct selector { bool (*type) (struct pw_manager_object *o); uint32_t id; @@ -668,7 +744,7 @@ static struct pw_manager_object *find_linked(struct client *client, uint32_t obj return p; } if (direction == PW_DIRECTION_INPUT && obj_id == in_node) { - struct selector sel = { .id = out_node, .type = is_source_or_monitor, }; + struct selector sel = { .id = out_node, .type = is_recordable, }; if ((p = select_object(m, &sel)) != NULL) return p; } @@ -957,6 +1033,7 @@ static int reply_create_playback_stream(struct stream *stream) char latency[32]; struct pw_manager_object *peer; const char *peer_name; + bool peer_suspended; fix_playback_buffer_attr(stream, &stream->attr); @@ -980,13 +1057,17 @@ static int reply_create_playback_stream(struct stream *stream) stream->pending = size; peer = find_linked(client, stream->id, stream->direction); - if (peer) { + if (peer && is_sink(peer)) { + struct pw_node_info *info = peer->info; peer_id = peer->id; peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); + peer_suspended = info->state == PW_NODE_STATE_SUSPENDED; } else { peer_id = SPA_ID_INVALID; peer_name = NULL; + peer_suspended = false; } + pw_log_info("peer:%p id:%d name:%s", peer, peer_id, peer_name); if (client->version >= 9) { message_put(reply, @@ -1002,7 +1083,7 @@ static int reply_create_playback_stream(struct stream *stream) TAG_CHANNEL_MAP, &stream->map, TAG_U32, peer_id, /* sink index */ TAG_STRING, peer_name, /* sink name */ - TAG_BOOLEAN, false, /* sink suspended state */ + TAG_BOOLEAN, peer_suspended, /* sink suspended state */ TAG_INVALID); } if (client->version >= 13) { @@ -1059,6 +1140,7 @@ static int reply_create_record_stream(struct stream *stream) struct pw_manager_object *peer; const char *peer_name, *name; uint32_t peer_id; + bool peer_suspended; fix_record_buffer_attr(stream, &stream->attr); @@ -1077,9 +1159,12 @@ static int reply_create_record_stream(struct stream *stream) TAG_INVALID); peer = find_linked(client, stream->id, stream->direction); - if (peer) { + if (peer && is_sink_input(peer)) + peer = find_linked(client, peer->id, PW_DIRECTION_OUTPUT); + if (peer && is_source_or_monitor(peer)) { + struct pw_node_info *info = peer->info; name = pw_properties_get(peer->props, PW_KEY_NODE_NAME); - if (is_sink(peer)) { + if (!is_source(peer)) { size_t len = (name ? strlen(name) : 5) + 10; peer_id = peer->id | 0x10000u; peer_name = tmp = alloca(len); @@ -1088,10 +1173,13 @@ static int reply_create_record_stream(struct stream *stream) peer_id = peer->id; peer_name = name; } + peer_suspended = info->state == PW_NODE_STATE_SUSPENDED; } else { peer_id = SPA_ID_INVALID; peer_name = NULL; + peer_suspended = false; } + pw_log_info("peer:%p id:%d name:%s", peer, peer_id, peer_name); if (client->version >= 9) { message_put(reply, @@ -1105,7 +1193,7 @@ static int reply_create_record_stream(struct stream *stream) TAG_CHANNEL_MAP, &stream->map, TAG_U32, peer_id, /* source index */ TAG_STRING, peer_name, /* source name */ - TAG_BOOLEAN, false, /* source suspended state */ + TAG_BOOLEAN, peer_suspended, /* source suspended state */ TAG_INVALID); } if (client->version >= 13) { @@ -1840,7 +1928,11 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint if (no_move) flags |= PW_STREAM_FLAG_DONT_RECONNECT; - if (source_name != NULL) { + pw_log_info("direct %u", direct_on_input_idx); + + if (direct_on_input_idx != SPA_ID_INVALID) { + source_index = direct_on_input_idx; + } else if (source_name != NULL) { if ((id = atoi(source_name)) != 0) source_index = id; } @@ -2151,7 +2243,7 @@ error_protocol: res = ERR_PROTOCOL; goto error; error: - return reply_error(client, -1, res); + return reply_error(client, tag, res); } static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -2215,7 +2307,7 @@ error_protocol: res = ERR_PROTOCOL; goto error; error: - return reply_error(client, -1, res); + return reply_error(client, tag, res); } static int do_set_stream_name(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -2509,7 +2601,7 @@ error_noentity: res = ERR_INVALID; goto error; error: - return reply_error(client, -1, res); + return reply_error(client, tag, res); } static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m) @@ -2830,8 +2922,6 @@ static int fill_sink_info(struct client *client, struct message *m, break; } } - if (volume_info.volume.channels != map.channels) - volume_info.volume.channels = map.channels; message_put(m, TAG_U32, o->id, /* sink index */ @@ -2925,8 +3015,6 @@ static int fill_source_info(struct client *client, struct message *m, break; } } - if (volume_info.volume.channels != map.channels) - volume_info.volume.channels = map.channels; message_put(m, TAG_U32, is_monitor ? o->id | 0x10000 : o->id, /* source index */ @@ -3086,9 +3174,9 @@ static int fill_source_output_info(struct client *client, struct message *m, } peer = find_linked(client, o->id, PW_DIRECTION_INPUT); - if (peer) { + if (peer && is_source_or_monitor(peer)) { peer_id = peer->id; - if (is_sink(peer)) + if (!is_source(peer)) peer_id |= 0x10000u; } else { peer_id = SPA_ID_INVALID; @@ -3229,7 +3317,7 @@ error_invalid: error: if (reply) message_free(client, reply, false, false); - return reply_error(client, -1, err); + return reply_error(client, tag, err); } struct info_list_data {