pulse-server: calculate event mask from facility and type

Make `client_queue_subscribe_event()` take the facility and type
separately, and calculate the mask itself, so that the caller
does not need to be concerned with that.
This commit is contained in:
Barnabás Pőcze 2024-06-28 01:39:25 +02:00
parent e3a7035e8f
commit 27b76ae686
6 changed files with 115 additions and 65 deletions

View file

@ -301,23 +301,23 @@ static bool drop_from_out_queue(struct client *client, struct message *m)
}
/* returns true if an event with the (mask, event, index) triplet should be dropped because it is redundant */
static bool client_prune_subscribe_events(struct client *client, uint32_t event, uint32_t index)
static bool client_prune_subscribe_events(struct client *client, uint32_t facility, uint32_t type, uint32_t index)
{
struct message *m, *t;
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW)
if (type == SUBSCRIPTION_EVENT_NEW)
return false;
/* NOTE: reverse iteration */
spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
if (m->type != MESSAGE_TYPE_SUBSCRIPTION_EVENT)
continue;
if ((m->u.subscription_event.event ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK)
if ((m->u.subscription_event.event & SUBSCRIPTION_EVENT_FACILITY_MASK) != facility)
continue;
if (m->u.subscription_event.index != index)
continue;
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) {
if (type == SUBSCRIPTION_EVENT_REMOVE) {
/* This object is being removed, hence there is
* point in keeping the old events regarding
* entry in the queue. */
@ -338,8 +338,7 @@ static bool client_prune_subscribe_events(struct client *client, uint32_t event,
if (is_new)
break;
}
if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) {
else if (type == SUBSCRIPTION_EVENT_CHANGE) {
/* This object has changed. If a "new" or "change" event for
* this object is still in the queue we can exit. */
goto drop;
@ -349,28 +348,46 @@ static bool client_prune_subscribe_events(struct client *client, uint32_t event,
return false;
drop:
pw_log_debug("client %p: dropped redundant event for object %u", client, index);
pw_log_debug("client %p: dropped redundant event '%s' on %s #%u",
client,
subscription_event_type_to_string(type), subscription_event_facility_to_string(facility),
index);
return true;
}
int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t index)
int client_queue_subscribe_event(struct client *client, uint32_t facility, uint32_t type, uint32_t index)
{
spa_assert(
type == SUBSCRIPTION_EVENT_NEW ||
type == SUBSCRIPTION_EVENT_CHANGE ||
type == SUBSCRIPTION_EVENT_REMOVE
);
const uint32_t mask = 1u << facility;
spa_assert(SUBSCRIPTION_MASK_ALL & mask);
if (client->disconnect)
return -ENOTCONN;
if (!(client->subscribed & mask))
return 0;
pw_log_debug("client %p: SUBSCRIBE event:%08x index:%u", client, event, index);
pw_log_debug("client %p: SUBSCRIBE facility:%s (%u) type:%s (0x%02x) index:%u",
client,
subscription_event_facility_to_string(facility), facility,
subscription_event_type_to_string(type), type,
index);
if (client_prune_subscribe_events(client, event, index))
if (client_prune_subscribe_events(client, facility, type, index))
return 0;
struct message *reply = message_alloc(client->impl, -1, 0);
if (!reply)
return -errno;
const uint32_t event = facility | type;
reply->type = MESSAGE_TYPE_SUBSCRIPTION_EVENT;
reply->u.subscription_event.event = event;
reply->u.subscription_event.index = index;

View file

@ -99,7 +99,7 @@ void client_disconnect(struct client *client);
void client_free(struct client *client);
int client_queue_message(struct client *client, struct message *msg);
int client_flush_messages(struct client *client);
int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id);
int client_queue_subscribe_event(struct client *client, uint32_t facility, uint32_t type, uint32_t index);
void client_update_routes(struct client *client, const char *key, const char *value);

View file

@ -153,21 +153,6 @@ static inline int err_to_res(int err)
return -EIO;
}
enum {
SUBSCRIPTION_MASK_NULL = 0x0000U,
SUBSCRIPTION_MASK_SINK = 0x0001U,
SUBSCRIPTION_MASK_SOURCE = 0x0002U,
SUBSCRIPTION_MASK_SINK_INPUT = 0x0004U,
SUBSCRIPTION_MASK_SOURCE_OUTPUT = 0x0008U,
SUBSCRIPTION_MASK_MODULE = 0x0010U,
SUBSCRIPTION_MASK_CLIENT = 0x0020U,
SUBSCRIPTION_MASK_SAMPLE_CACHE = 0x0040U,
SUBSCRIPTION_MASK_SERVER = 0x0080U,
SUBSCRIPTION_MASK_AUTOLOAD = 0x0100U,
SUBSCRIPTION_MASK_CARD = 0x0200U,
SUBSCRIPTION_MASK_ALL = 0x02ffU
};
enum {
SUBSCRIPTION_EVENT_SINK = 0x0000U,
SUBSCRIPTION_EVENT_SOURCE = 0x0001U,
@ -177,7 +162,7 @@ enum {
SUBSCRIPTION_EVENT_CLIENT = 0x0005U,
SUBSCRIPTION_EVENT_SAMPLE_CACHE = 0x0006U,
SUBSCRIPTION_EVENT_SERVER = 0x0007U,
SUBSCRIPTION_EVENT_AUTOLOAD = 0x0008U,
// SUBSCRIPTION_EVENT_AUTOLOAD = 0x0008U,
SUBSCRIPTION_EVENT_CARD = 0x0009U,
SUBSCRIPTION_EVENT_FACILITY_MASK = 0x000FU,
@ -187,6 +172,31 @@ enum {
SUBSCRIPTION_EVENT_TYPE_MASK = 0x0030U
};
enum {
SUBSCRIPTION_MASK_NULL = 0,
SUBSCRIPTION_MASK_SINK = 1u << SUBSCRIPTION_EVENT_SINK,
SUBSCRIPTION_MASK_SOURCE = 1u << SUBSCRIPTION_EVENT_SOURCE,
SUBSCRIPTION_MASK_SINK_INPUT = 1u << SUBSCRIPTION_EVENT_SINK_INPUT,
SUBSCRIPTION_MASK_SOURCE_OUTPUT = 1u << SUBSCRIPTION_EVENT_SOURCE_OUTPUT,
SUBSCRIPTION_MASK_MODULE = 1u << SUBSCRIPTION_EVENT_MODULE,
SUBSCRIPTION_MASK_CLIENT = 1u << SUBSCRIPTION_EVENT_CLIENT,
SUBSCRIPTION_MASK_SAMPLE_CACHE = 1u << SUBSCRIPTION_EVENT_SAMPLE_CACHE,
SUBSCRIPTION_MASK_SERVER = 1u << SUBSCRIPTION_EVENT_SERVER,
// SUBSCRIPTION_MASK_AUTOLOAD = 1u << SUBSCRIPTION_EVENT_AUTOLOAD,
SUBSCRIPTION_MASK_CARD = 1u << SUBSCRIPTION_EVENT_CARD,
SUBSCRIPTION_MASK_ALL =
SUBSCRIPTION_MASK_SINK
| SUBSCRIPTION_MASK_SOURCE
| SUBSCRIPTION_MASK_SINK_INPUT
| SUBSCRIPTION_MASK_SOURCE_OUTPUT
| SUBSCRIPTION_MASK_MODULE
| SUBSCRIPTION_MASK_CLIENT
| SUBSCRIPTION_MASK_SAMPLE_CACHE
| SUBSCRIPTION_MASK_SERVER
// | SUBSCRIPTION_MASK_AUTOLOAD
| SUBSCRIPTION_MASK_CARD
};
enum {
STATE_INVALID = -1,
STATE_RUNNING = 0,
@ -236,6 +246,41 @@ enum {
SOURCE_FLAT_VOLUME = 0x0080U,
};
static inline const char *subscription_event_type_to_string(uint32_t type)
{
switch (type) {
case SUBSCRIPTION_EVENT_NEW:
return "new";
case SUBSCRIPTION_EVENT_CHANGE:
return "change";
case SUBSCRIPTION_EVENT_REMOVE:
return "remove";
}
return NULL;
}
static inline const char *subscription_event_facility_to_string(uint32_t facility)
{
static const char * const strings[] = {
[SUBSCRIPTION_EVENT_SINK] = "sink",
[SUBSCRIPTION_EVENT_SOURCE] = "source",
[SUBSCRIPTION_EVENT_SINK_INPUT] = "sink-input",
[SUBSCRIPTION_EVENT_SOURCE_OUTPUT] = "source-output",
[SUBSCRIPTION_EVENT_MODULE] = "module",
[SUBSCRIPTION_EVENT_CLIENT] = "client",
[SUBSCRIPTION_EVENT_SAMPLE_CACHE] = "sample-cache",
[SUBSCRIPTION_EVENT_SERVER] = "server",
// [SUBSCRIPTION_EVENT_AUTOLOAD] = "autoload",
[SUBSCRIPTION_EVENT_CARD] = "card",
};
if (facility >= SPA_N_ELEMENTS(strings))
return NULL;
return strings[facility];
}
static const char * const port_types[] = {
"unknown",
"aux",

View file

@ -83,6 +83,6 @@ void impl_add_listener(struct impl *impl,
struct spa_hook *listener,
const struct impl_events *events, void *data);
void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t id);
void broadcast_subscribe_event(struct impl *impl, uint32_t facility, uint32_t type, uint32_t id);
#endif

View file

@ -106,8 +106,8 @@ int module_unload(struct module *module)
if (module->loaded)
broadcast_subscribe_event(impl,
SUBSCRIPTION_MASK_MODULE,
SUBSCRIPTION_EVENT_REMOVE | SUBSCRIPTION_EVENT_MODULE,
SUBSCRIPTION_EVENT_MODULE,
SUBSCRIPTION_EVENT_REMOVE,
module->index);
module_free(module);

View file

@ -100,13 +100,13 @@ static struct sample *find_sample(struct impl *impl, uint32_t index, const char
return NULL;
}
void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t index)
void broadcast_subscribe_event(struct impl *impl, uint32_t facility, uint32_t type, uint32_t index)
{
struct server *s;
spa_list_for_each(s, &impl->servers, link) {
struct client *c;
spa_list_for_each(c, &s->clients, link)
client_queue_subscribe_event(c, mask, event, index);
client_queue_subscribe_event(c, facility, type, index);
}
}
@ -203,46 +203,36 @@ static struct stream *find_stream(struct client *client, uint32_t index)
static int send_object_event(struct client *client, struct pw_manager_object *o,
uint32_t type)
{
uint32_t event = 0, mask = 0, res_index = o->index;
uint32_t event = 0, res_index = o->index;
pw_log_debug("index:%d id:%d %08" PRIx64 " type:%u", o->index, o->id, o->change_mask, type);
if (pw_manager_object_is_sink(o) && o->change_mask & PW_MANAGER_OBJECT_FLAG_SINK) {
client_queue_subscribe_event(client,
SUBSCRIPTION_MASK_SINK,
SUBSCRIPTION_EVENT_SINK | type,
SUBSCRIPTION_EVENT_SINK,
type,
res_index);
}
if (pw_manager_object_is_source_or_monitor(o) && o->change_mask & PW_MANAGER_OBJECT_FLAG_SOURCE) {
mask = SUBSCRIPTION_MASK_SOURCE;
if (pw_manager_object_is_source_or_monitor(o) && o->change_mask & PW_MANAGER_OBJECT_FLAG_SOURCE)
event = SUBSCRIPTION_EVENT_SOURCE;
}
else if (pw_manager_object_is_sink_input(o)) {
mask = SUBSCRIPTION_MASK_SINK_INPUT;
else if (pw_manager_object_is_sink_input(o))
event = SUBSCRIPTION_EVENT_SINK_INPUT;
}
else if (pw_manager_object_is_source_output(o)) {
mask = SUBSCRIPTION_MASK_SOURCE_OUTPUT;
else if (pw_manager_object_is_source_output(o))
event = SUBSCRIPTION_EVENT_SOURCE_OUTPUT;
}
else if (pw_manager_object_is_module(o)) {
mask = SUBSCRIPTION_MASK_MODULE;
else if (pw_manager_object_is_module(o))
event = SUBSCRIPTION_EVENT_MODULE;
}
else if (pw_manager_object_is_client(o)) {
mask = SUBSCRIPTION_MASK_CLIENT;
else if (pw_manager_object_is_client(o))
event = SUBSCRIPTION_EVENT_CLIENT;
}
else if (pw_manager_object_is_card(o)) {
mask = SUBSCRIPTION_MASK_CARD;
else if (pw_manager_object_is_card(o))
event = SUBSCRIPTION_EVENT_CARD;
} else
else
event = SPA_ID_INVALID;
if (event != SPA_ID_INVALID)
client_queue_subscribe_event(client,
mask,
event | type,
event,
type,
res_index);
return 0;
}
@ -370,8 +360,8 @@ static void send_latency_offset_subscribe_event(struct client *client, struct pw
if (changed)
client_queue_subscribe_event(client,
SUBSCRIPTION_MASK_CARD,
SUBSCRIPTION_EVENT_CARD | SUBSCRIPTION_EVENT_CHANGE,
SUBSCRIPTION_EVENT_CARD,
SUBSCRIPTION_EVENT_CHANGE,
id_to_index(manager, card_id));
}
@ -398,9 +388,8 @@ static void send_default_change_subscribe_event(struct client *client, bool sink
if (changed)
client_queue_subscribe_event(client,
SUBSCRIPTION_MASK_SERVER,
SUBSCRIPTION_EVENT_CHANGE |
SUBSCRIPTION_EVENT_SERVER,
SUBSCRIPTION_EVENT_CHANGE,
-1);
}
@ -2381,8 +2370,8 @@ static int do_finish_upload_stream(struct client *client, uint32_t command, uint
stream_free(stream);
broadcast_subscribe_event(impl,
SUBSCRIPTION_MASK_SAMPLE_CACHE,
event | SUBSCRIPTION_EVENT_SAMPLE_CACHE,
SUBSCRIPTION_EVENT_SAMPLE_CACHE,
event,
sample->index);
return reply_simple_ack(client, tag);
@ -2591,9 +2580,8 @@ static int do_remove_sample(struct client *client, uint32_t command, uint32_t ta
return -ENOENT;
broadcast_subscribe_event(impl,
SUBSCRIPTION_MASK_SAMPLE_CACHE,
SUBSCRIPTION_EVENT_REMOVE |
SUBSCRIPTION_EVENT_SAMPLE_CACHE,
SUBSCRIPTION_EVENT_REMOVE,
sample->index);
pw_map_remove(&impl->samples, sample->index);
@ -4848,8 +4836,8 @@ static void handle_module_loaded(struct module *module, struct client *client, u
module->loaded = true;
broadcast_subscribe_event(impl,
SUBSCRIPTION_MASK_MODULE,
SUBSCRIPTION_EVENT_NEW | SUBSCRIPTION_EVENT_MODULE,
SUBSCRIPTION_EVENT_MODULE,
SUBSCRIPTION_EVENT_NEW,
module->index);
if (client != NULL) {