pulse-server: add more introspection

Emit new/change/remove events
Handle suspended state of peer
Handle direct_on_input record streams.
Place the tag in error messages
This commit is contained in:
Wim Taymans 2020-10-27 14:57:15 +01:00
parent 2bf5cfa2f7
commit 124b1221a6
3 changed files with 219 additions and 80 deletions

View file

@ -135,8 +135,19 @@ static void object_destroy(struct object *o)
static void client_event_info(void *object, const struct pw_client_info *info)
{
struct object *o = object;
pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask);
int changed = 0;
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask);
info = o->this.info = pw_client_info_update(o->this.info, info);
if (info->change_mask & PW_CLIENT_CHANGE_MASK_PROPS)
changed++;
if (changed) {
o->this.changed += changed;
core_sync(o->manager);
}
}
static const struct pw_client_events client_events = {
@ -162,8 +173,19 @@ static const struct object_info client_info = {
static void module_event_info(void *object, const struct pw_module_info *info)
{
struct object *o = object;
pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask);
int changed = 0;
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask);
info = o->this.info = pw_module_info_update(o->this.info, info);
if (info->change_mask & PW_MODULE_CHANGE_MASK_PROPS)
changed++;
if (changed) {
o->this.changed += changed;
core_sync(o->manager);
}
}
static const struct pw_module_events module_events = {
@ -191,9 +213,13 @@ static void device_event_info(void *object, const struct pw_device_info *info)
struct object *o = object;
uint32_t i, changed = 0;
pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask);
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask);
info = o->this.info = pw_device_info_update(o->this.info, info);
if (info->change_mask & PW_DEVICE_CHANGE_MASK_PROPS)
changed++;
if (info->change_mask & PW_DEVICE_CHANGE_MASK_PARAMS) {
for (i = 0; i < info->n_params; i++) {
uint32_t id = info->params[i].id;
@ -202,18 +228,20 @@ static void device_event_info(void *object, const struct pw_device_info *info)
continue;
info->params[i].user = 0;
changed++;
clear_params(&o->this.param_list, id);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
pw_device_enum_params((struct pw_device*)o->this.proxy,
0, id, 0, -1, NULL);
changed++;
}
}
if (changed)
if (changed) {
o->this.changed += changed;
core_sync(o->manager);
}
}
static void device_event_param(void *object, int seq,
uint32_t id, uint32_t index, uint32_t next,
@ -248,9 +276,17 @@ static void node_event_info(void *object, const struct pw_node_info *info)
{
struct object *o = object;
uint32_t i, changed = 0;
pw_log_debug("object %p: id:%d change-mask:%"PRIu64, o, o->this.id, info->change_mask);
pw_log_debug("object %p: id:%d change-mask:%08"PRIx64, o, o->this.id, info->change_mask);
info = o->this.info = pw_node_info_update(o->this.info, info);
if (info->change_mask & PW_NODE_CHANGE_MASK_STATE)
changed++;
if (info->change_mask & PW_NODE_CHANGE_MASK_PROPS)
changed++;
if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) {
for (i = 0; i < info->n_params; i++) {
uint32_t id = info->params[i].id;
@ -259,18 +295,20 @@ static void node_event_info(void *object, const struct pw_node_info *info)
continue;
info->params[i].user = 0;
changed++;
clear_params(&o->this.param_list, id);
if (!(info->params[i].flags & SPA_PARAM_INFO_READ))
continue;
pw_node_enum_params((struct pw_node*)o->this.proxy,
0, id, 0, -1, NULL);
changed++;
}
}
if (changed)
if (changed) {
o->this.changed += changed;
core_sync(o->manager);
}
}
static void node_event_param(void *object, int seq,
uint32_t id, uint32_t index, uint32_t next,
@ -405,11 +443,11 @@ static void registry_event_global(void *data, uint32_t id,
o->this.version = version;
o->this.props = props ? pw_properties_new_dict(props) : NULL;
o->this.proxy = proxy;
o->new = true;
spa_list_init(&o->this.param_list);
o->manager = m;
o->info = info;
o->new = true;
spa_list_append(&m->this.object_list, &o->this.link);
m->this.n_objects++;
@ -446,9 +484,21 @@ static const struct pw_registry_events registry_events = {
static void on_core_done(void *data, uint32_t id, int seq)
{
struct manager *m = data;
struct object *o;
if (id == PW_ID_CORE) {
if (m->sync_seq == seq)
manager_emit_sync(m);
spa_list_for_each(o, &m->this.object_list, this.link) {
if (o->new) {
o->new = false;
manager_emit_added(m, &o->this);
} else if (o->this.changed > 0) {
manager_emit_updated(m, &o->this);
o->this.changed = 0;
}
}
}
}

View file

@ -75,9 +75,10 @@ struct pw_manager_object {
char *type;
uint32_t version;
struct pw_properties *props;
void *info;
struct pw_proxy *proxy;
int changed;
void *info;
struct spa_list param_list;
};

View file

@ -492,57 +492,6 @@ static void manager_sync(void *data)
}
}
static void manager_added(void *data, struct pw_manager_object *o)
{
struct client *client = data;
const char *str;
if (strcmp(o->type, PW_TYPE_INTERFACE_Core) == 0 && o->info != NULL) {
struct pw_core_info *info = o->info;
if (info->props &&
(str = spa_dict_lookup(info->props, "default.clock.rate")) != NULL)
client->default_rate = atoi(str);
client->cookie = info->cookie;
}
}
static void manager_metadata(void *data, uint32_t subject, const char *key,
const char *type, const char *value)
{
struct client *client = data;
uint32_t val;
bool changed = false;
pw_log_debug("meta %d %s %s %s", subject, key, type, value);
if (subject == PW_ID_CORE) {
val = (key && value) ? (uint32_t)atoi(value) : SPA_ID_INVALID;
if (key == NULL || strcmp(key, "default.audio.sink") == 0) {
changed = client->default_sink != val;
client->default_sink = val;
}
if (key == NULL || strcmp(key, "default.audio.source") == 0) {
changed = client->default_source != val;
client->default_source = val;
}
}
if (changed) {
if (client->subscribed & SUBSCRIPTION_MASK_SERVER) {
send_subscribe_event(client,
SUBSCRIPTION_EVENT_CHANGE |
SUBSCRIPTION_EVENT_SERVER,
-1);
}
}
}
static const struct pw_manager_events manager_events = {
PW_VERSION_MANAGER_EVENTS,
.sync = manager_sync,
.added = manager_added,
.metadata = manager_metadata,
};
static bool is_client(struct pw_manager_object *o)
{
return strcmp(o->type, PW_TYPE_INTERFACE_Client) == 0;
@ -608,11 +557,138 @@ static bool is_source_output(struct pw_manager_object *o)
strcmp(str, "Stream/Input/Audio") == 0;
}
static bool is_recordable(struct pw_manager_object *o)
{
return is_source(o) || is_sink(o) || is_sink_input(o);
}
static bool is_link(struct pw_manager_object *o)
{
return strcmp(o->type, PW_TYPE_INTERFACE_Link) == 0;
}
static uint32_t get_event_and_id(struct client *client, struct pw_manager_object *o, uint32_t *id)
{
uint32_t event = 0, res_id = o->id;
if (client->subscribed & SUBSCRIPTION_MASK_SINK &&
is_sink(o)) {
event = SUBSCRIPTION_EVENT_SINK;
}
else if (client->subscribed & SUBSCRIPTION_MASK_SOURCE &&
is_source_or_monitor(o)) {
if (!is_source(o))
res_id |= 0x10000U;
event = SUBSCRIPTION_EVENT_SOURCE;
}
else if (client->subscribed & SUBSCRIPTION_MASK_SINK_INPUT &&
is_sink_input(o)) {
event = SUBSCRIPTION_EVENT_SINK_INPUT;
}
else if (client->subscribed & SUBSCRIPTION_MASK_SOURCE_OUTPUT &&
is_source_output(o)) {
event = SUBSCRIPTION_EVENT_SOURCE_OUTPUT;
}
else if (client->subscribed & SUBSCRIPTION_MASK_MODULE &&
is_module(o)) {
event = SUBSCRIPTION_EVENT_MODULE;
}
else if (client->subscribed & SUBSCRIPTION_MASK_CLIENT &&
is_client(o)) {
event = SUBSCRIPTION_EVENT_CLIENT;
}
else if (client->subscribed & SUBSCRIPTION_MASK_CARD &&
is_card(o)) {
event = SUBSCRIPTION_EVENT_CARD;
} else
event = SPA_ID_INVALID;
if (id)
*id = res_id;
return event;
}
static void manager_added(void *data, struct pw_manager_object *o)
{
struct client *client = data;
const char *str;
uint32_t event, id;
if (strcmp(o->type, PW_TYPE_INTERFACE_Core) == 0 && o->info != NULL) {
struct pw_core_info *info = o->info;
if (info->props &&
(str = spa_dict_lookup(info->props, "default.clock.rate")) != NULL)
client->default_rate = atoi(str);
client->cookie = info->cookie;
return;
}
if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID)
send_subscribe_event(client,
event | SUBSCRIPTION_EVENT_NEW,
id);
}
static void manager_updated(void *data, struct pw_manager_object *o)
{
struct client *client = data;
uint32_t event, id;
if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID)
send_subscribe_event(client,
event | SUBSCRIPTION_EVENT_CHANGE,
id);
}
static void manager_removed(void *data, struct pw_manager_object *o)
{
struct client *client = data;
uint32_t event, id;
if ((event = get_event_and_id(client, o, &id)) != SPA_ID_INVALID)
send_subscribe_event(client,
event | SUBSCRIPTION_EVENT_REMOVE,
id);
}
static void manager_metadata(void *data, uint32_t subject, const char *key,
const char *type, const char *value)
{
struct client *client = data;
uint32_t val;
bool changed = false;
pw_log_debug("meta %d %s %s %s", subject, key, type, value);
if (subject == PW_ID_CORE) {
val = (key && value) ? (uint32_t)atoi(value) : SPA_ID_INVALID;
if (key == NULL || strcmp(key, "default.audio.sink") == 0) {
changed = client->default_sink != val;
client->default_sink = val;
}
if (key == NULL || strcmp(key, "default.audio.source") == 0) {
changed = client->default_source != val;
client->default_source = val;
}
}
if (changed) {
if (client->subscribed & SUBSCRIPTION_MASK_SERVER) {
send_subscribe_event(client,
SUBSCRIPTION_EVENT_CHANGE |
SUBSCRIPTION_EVENT_SERVER,
-1);
}
}
}
static const struct pw_manager_events manager_events = {
PW_VERSION_MANAGER_EVENTS,
.sync = manager_sync,
.added = manager_added,
.updated = manager_updated,
.removed = manager_removed,
.metadata = manager_metadata,
};
struct selector {
bool (*type) (struct pw_manager_object *o);
uint32_t id;
@ -668,7 +744,7 @@ static struct pw_manager_object *find_linked(struct client *client, uint32_t obj
return p;
}
if (direction == PW_DIRECTION_INPUT && obj_id == in_node) {
struct selector sel = { .id = out_node, .type = is_source_or_monitor, };
struct selector sel = { .id = out_node, .type = is_recordable, };
if ((p = select_object(m, &sel)) != NULL)
return p;
}
@ -957,6 +1033,7 @@ static int reply_create_playback_stream(struct stream *stream)
char latency[32];
struct pw_manager_object *peer;
const char *peer_name;
bool peer_suspended;
fix_playback_buffer_attr(stream, &stream->attr);
@ -980,13 +1057,17 @@ static int reply_create_playback_stream(struct stream *stream)
stream->pending = size;
peer = find_linked(client, stream->id, stream->direction);
if (peer) {
if (peer && is_sink(peer)) {
struct pw_node_info *info = peer->info;
peer_id = peer->id;
peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
peer_suspended = info->state == PW_NODE_STATE_SUSPENDED;
} else {
peer_id = SPA_ID_INVALID;
peer_name = NULL;
peer_suspended = false;
}
pw_log_info("peer:%p id:%d name:%s", peer, peer_id, peer_name);
if (client->version >= 9) {
message_put(reply,
@ -1002,7 +1083,7 @@ static int reply_create_playback_stream(struct stream *stream)
TAG_CHANNEL_MAP, &stream->map,
TAG_U32, peer_id, /* sink index */
TAG_STRING, peer_name, /* sink name */
TAG_BOOLEAN, false, /* sink suspended state */
TAG_BOOLEAN, peer_suspended, /* sink suspended state */
TAG_INVALID);
}
if (client->version >= 13) {
@ -1059,6 +1140,7 @@ static int reply_create_record_stream(struct stream *stream)
struct pw_manager_object *peer;
const char *peer_name, *name;
uint32_t peer_id;
bool peer_suspended;
fix_record_buffer_attr(stream, &stream->attr);
@ -1077,9 +1159,12 @@ static int reply_create_record_stream(struct stream *stream)
TAG_INVALID);
peer = find_linked(client, stream->id, stream->direction);
if (peer) {
if (peer && is_sink_input(peer))
peer = find_linked(client, peer->id, PW_DIRECTION_OUTPUT);
if (peer && is_source_or_monitor(peer)) {
struct pw_node_info *info = peer->info;
name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
if (is_sink(peer)) {
if (!is_source(peer)) {
size_t len = (name ? strlen(name) : 5) + 10;
peer_id = peer->id | 0x10000u;
peer_name = tmp = alloca(len);
@ -1088,10 +1173,13 @@ static int reply_create_record_stream(struct stream *stream)
peer_id = peer->id;
peer_name = name;
}
peer_suspended = info->state == PW_NODE_STATE_SUSPENDED;
} else {
peer_id = SPA_ID_INVALID;
peer_name = NULL;
peer_suspended = false;
}
pw_log_info("peer:%p id:%d name:%s", peer, peer_id, peer_name);
if (client->version >= 9) {
message_put(reply,
@ -1105,7 +1193,7 @@ static int reply_create_record_stream(struct stream *stream)
TAG_CHANNEL_MAP, &stream->map,
TAG_U32, peer_id, /* source index */
TAG_STRING, peer_name, /* source name */
TAG_BOOLEAN, false, /* source suspended state */
TAG_BOOLEAN, peer_suspended, /* source suspended state */
TAG_INVALID);
}
if (client->version >= 13) {
@ -1840,7 +1928,11 @@ static int do_create_record_stream(struct client *client, uint32_t command, uint
if (no_move)
flags |= PW_STREAM_FLAG_DONT_RECONNECT;
if (source_name != NULL) {
pw_log_info("direct %u", direct_on_input_idx);
if (direct_on_input_idx != SPA_ID_INVALID) {
source_index = direct_on_input_idx;
} else if (source_name != NULL) {
if ((id = atoi(source_name)) != 0)
source_index = id;
}
@ -2151,7 +2243,7 @@ error_protocol:
res = ERR_PROTOCOL;
goto error;
error:
return reply_error(client, -1, res);
return reply_error(client, tag, res);
}
static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -2215,7 +2307,7 @@ error_protocol:
res = ERR_PROTOCOL;
goto error;
error:
return reply_error(client, -1, res);
return reply_error(client, tag, res);
}
static int do_set_stream_name(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -2509,7 +2601,7 @@ error_noentity:
res = ERR_INVALID;
goto error;
error:
return reply_error(client, -1, res);
return reply_error(client, tag, res);
}
static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
@ -2830,8 +2922,6 @@ static int fill_sink_info(struct client *client, struct message *m,
break;
}
}
if (volume_info.volume.channels != map.channels)
volume_info.volume.channels = map.channels;
message_put(m,
TAG_U32, o->id, /* sink index */
@ -2925,8 +3015,6 @@ static int fill_source_info(struct client *client, struct message *m,
break;
}
}
if (volume_info.volume.channels != map.channels)
volume_info.volume.channels = map.channels;
message_put(m,
TAG_U32, is_monitor ? o->id | 0x10000 : o->id, /* source index */
@ -3086,9 +3174,9 @@ static int fill_source_output_info(struct client *client, struct message *m,
}
peer = find_linked(client, o->id, PW_DIRECTION_INPUT);
if (peer) {
if (peer && is_source_or_monitor(peer)) {
peer_id = peer->id;
if (is_sink(peer))
if (!is_source(peer))
peer_id |= 0x10000u;
} else {
peer_id = SPA_ID_INVALID;
@ -3229,7 +3317,7 @@ error_invalid:
error:
if (reply)
message_free(client, reply, false, false);
return reply_error(client, -1, err);
return reply_error(client, tag, err);
}
struct info_list_data {