mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-11-02 09:01:50 -05:00
session-manager: endpoint: implement caching info & params
The info structure needs to be cached because there is no way to request it from the implementation, unless we hack the add_listener API to be used for making info requests or add a new method that will be used just in the implementation (both are bad ideas). The params are cached because 1) a client doing enum_params + sync will not work correctly, since the sync call syncs with the server and not the implementation... we could block the client to solve that, but then there is also #2 2) the implementation is not aware of the clients and therefore it cannot keep track of who is subscribed and who is not, this needs to happen in the server. Then if we only keep track of the subscriptions in the server and keep requesting params from the impl, there is no way to know if a param event coming from the impl matches a call to enum_params or to subscribe or if it's just an update that needs to be forwarded to subscribers.
This commit is contained in:
parent
74718d6def
commit
444d180218
1 changed files with 242 additions and 67 deletions
|
|
@ -25,7 +25,13 @@
|
||||||
|
|
||||||
#include <pipewire/impl.h>
|
#include <pipewire/impl.h>
|
||||||
#include <extensions/session-manager.h>
|
#include <extensions/session-manager.h>
|
||||||
|
#include <extensions/session-manager/introspect-funcs.h>
|
||||||
|
|
||||||
#include <spa/utils/result.h>
|
#include <spa/utils/result.h>
|
||||||
|
#include <spa/pod/builder.h>
|
||||||
|
#include <spa/pod/filter.h>
|
||||||
|
|
||||||
|
#define MAX_PARAMS 32
|
||||||
|
|
||||||
#define NAME "endpoint"
|
#define NAME "endpoint"
|
||||||
|
|
||||||
|
|
@ -43,6 +49,20 @@ struct impl
|
||||||
struct pw_resource *resource;
|
struct pw_resource *resource;
|
||||||
};
|
};
|
||||||
struct spa_hook resource_listener;
|
struct spa_hook resource_listener;
|
||||||
|
struct spa_hook endpoint_listener;
|
||||||
|
|
||||||
|
struct pw_endpoint_info *cached_info;
|
||||||
|
struct spa_list cached_params;
|
||||||
|
|
||||||
|
int ping_seq;
|
||||||
|
bool registered;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct param_data
|
||||||
|
{
|
||||||
|
struct spa_list link;
|
||||||
|
uint32_t id;
|
||||||
|
struct pw_array params;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct resource_data
|
struct resource_data
|
||||||
|
|
@ -50,9 +70,10 @@ struct resource_data
|
||||||
struct impl *impl;
|
struct impl *impl;
|
||||||
|
|
||||||
struct pw_resource *resource;
|
struct pw_resource *resource;
|
||||||
struct spa_hook resource_listener;
|
|
||||||
struct spa_hook object_listener;
|
struct spa_hook object_listener;
|
||||||
struct spa_hook endpoint_listener;
|
|
||||||
|
uint32_t n_subscribe_ids;
|
||||||
|
uint32_t subscribe_ids[32];
|
||||||
};
|
};
|
||||||
|
|
||||||
struct factory_data
|
struct factory_data
|
||||||
|
|
@ -65,13 +86,13 @@ struct factory_data
|
||||||
struct pw_export_type export;
|
struct pw_export_type export;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int method_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids)
|
#define pw_endpoint_resource(r,m,v,...) \
|
||||||
{
|
pw_resource_call(r,struct pw_endpoint_events,m,v,__VA_ARGS__)
|
||||||
struct resource_data *d = object;
|
|
||||||
struct impl *impl = d->impl;
|
#define pw_endpoint_resource_info(r,...) \
|
||||||
pw_endpoint_subscribe_params(impl->endpoint, ids, n_ids);
|
pw_endpoint_resource(r,info,0,__VA_ARGS__)
|
||||||
return 0;
|
#define pw_endpoint_resource_param(r,...) \
|
||||||
}
|
pw_endpoint_resource(r,param,0,__VA_ARGS__)
|
||||||
|
|
||||||
static int method_enum_params(void *object, int seq,
|
static int method_enum_params(void *object, int seq,
|
||||||
uint32_t id, uint32_t start, uint32_t num,
|
uint32_t id, uint32_t start, uint32_t num,
|
||||||
|
|
@ -79,7 +100,59 @@ static int method_enum_params(void *object, int seq,
|
||||||
{
|
{
|
||||||
struct resource_data *d = object;
|
struct resource_data *d = object;
|
||||||
struct impl *impl = d->impl;
|
struct impl *impl = d->impl;
|
||||||
pw_endpoint_enum_params(impl->endpoint, seq, id, start, num, filter);
|
struct param_data *pdata;
|
||||||
|
struct spa_pod *result;
|
||||||
|
struct spa_pod *param;
|
||||||
|
uint8_t buffer[1024];
|
||||||
|
struct spa_pod_builder b = { 0 };
|
||||||
|
uint32_t index;
|
||||||
|
uint32_t next = start;
|
||||||
|
uint32_t count = 0;
|
||||||
|
|
||||||
|
pw_log_debug(NAME" %p: param %u %d/%d", impl, id, start, num);
|
||||||
|
|
||||||
|
spa_list_for_each(pdata, &impl->cached_params, link) {
|
||||||
|
if (pdata->id != id)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
index = next++;
|
||||||
|
if (index >= pw_array_get_len(&pdata->params, void*))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
param = *pw_array_get_unchecked(&pdata->params, index, struct spa_pod*);
|
||||||
|
|
||||||
|
spa_pod_builder_init(&b, buffer, sizeof(buffer));
|
||||||
|
if (spa_pod_filter(&b, &result, param, filter) != 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
pw_log_debug(NAME" %p: %d param %u", impl, seq, index);
|
||||||
|
|
||||||
|
pw_endpoint_resource_param(d->resource, seq, id, index, next, result);
|
||||||
|
|
||||||
|
if (++count == num)
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int method_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids)
|
||||||
|
{
|
||||||
|
struct resource_data *d = object;
|
||||||
|
struct impl *impl = d->impl;
|
||||||
|
uint32_t i;
|
||||||
|
|
||||||
|
n_ids = SPA_MIN(n_ids, SPA_N_ELEMENTS(d->subscribe_ids));
|
||||||
|
d->n_subscribe_ids = n_ids;
|
||||||
|
|
||||||
|
for (i = 0; i < n_ids; i++) {
|
||||||
|
d->subscribe_ids[i] = ids[i];
|
||||||
|
pw_log_debug(NAME" %p: resource %d subscribe param %u",
|
||||||
|
impl, pw_resource_get_id(d->resource), ids[i]);
|
||||||
|
method_enum_params(object, 1, ids[i], 0, UINT32_MAX, NULL);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -88,6 +161,8 @@ static int method_set_param(void *object, uint32_t id, uint32_t flags,
|
||||||
{
|
{
|
||||||
struct resource_data *d = object;
|
struct resource_data *d = object;
|
||||||
struct impl *impl = d->impl;
|
struct impl *impl = d->impl;
|
||||||
|
/* store only on the implementation; our cache will be updated
|
||||||
|
by the param event, since we are subscribed */
|
||||||
pw_endpoint_set_param(impl->endpoint, id, flags, param);
|
pw_endpoint_set_param(impl->endpoint, id, flags, param);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -108,46 +183,6 @@ static const struct pw_endpoint_methods endpoint_methods = {
|
||||||
.create_link = method_create_link,
|
.create_link = method_create_link,
|
||||||
};
|
};
|
||||||
|
|
||||||
#define pw_endpoint_resource(r,m,v,...) \
|
|
||||||
pw_resource_call(r,struct pw_endpoint_events,m,v,__VA_ARGS__)
|
|
||||||
|
|
||||||
#define pw_endpoint_resource_info(r,...) \
|
|
||||||
pw_endpoint_resource(r,info,0,__VA_ARGS__)
|
|
||||||
#define pw_endpoint_resource_param(r,...) \
|
|
||||||
pw_endpoint_resource(r,param,0,__VA_ARGS__)
|
|
||||||
|
|
||||||
static void event_info(void *object, const struct pw_endpoint_info *info)
|
|
||||||
{
|
|
||||||
struct resource_data *d = object;
|
|
||||||
pw_endpoint_resource_info(d->resource, info);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void event_param(void *object, int seq,
|
|
||||||
uint32_t id, uint32_t index, uint32_t next,
|
|
||||||
const struct spa_pod *param)
|
|
||||||
{
|
|
||||||
struct resource_data *d = object;
|
|
||||||
pw_endpoint_resource_param (d->resource, seq, id, index, next, param);
|
|
||||||
}
|
|
||||||
|
|
||||||
static const struct pw_endpoint_events endpoint_events = {
|
|
||||||
PW_VERSION_ENDPOINT_EVENTS,
|
|
||||||
.info = event_info,
|
|
||||||
.param = event_param,
|
|
||||||
};
|
|
||||||
|
|
||||||
static void global_unbind(void *data)
|
|
||||||
{
|
|
||||||
struct resource_data *d = data;
|
|
||||||
if (d->resource)
|
|
||||||
spa_hook_remove(&d->endpoint_listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
static const struct pw_resource_events resource_events = {
|
|
||||||
PW_VERSION_RESOURCE_EVENTS,
|
|
||||||
.destroy = global_unbind,
|
|
||||||
};
|
|
||||||
|
|
||||||
static int global_bind(void *_data, struct pw_impl_client *client,
|
static int global_bind(void *_data, struct pw_impl_client *client,
|
||||||
uint32_t permissions, uint32_t version, uint32_t id)
|
uint32_t permissions, uint32_t version, uint32_t id)
|
||||||
{
|
{
|
||||||
|
|
@ -167,19 +202,12 @@ static int global_bind(void *_data, struct pw_impl_client *client,
|
||||||
|
|
||||||
pw_global_add_resource(impl->global, resource);
|
pw_global_add_resource(impl->global, resource);
|
||||||
|
|
||||||
/* listen for when the resource goes away */
|
|
||||||
pw_resource_add_listener(resource,
|
|
||||||
&data->resource_listener,
|
|
||||||
&resource_events, data);
|
|
||||||
|
|
||||||
/* resource methods -> implemention */
|
/* resource methods -> implemention */
|
||||||
pw_resource_add_object_listener(resource,
|
pw_resource_add_object_listener(resource,
|
||||||
&data->object_listener,
|
&data->object_listener,
|
||||||
&endpoint_methods, data);
|
&endpoint_methods, data);
|
||||||
/* implementation events -> resource */
|
|
||||||
pw_endpoint_add_listener(impl->endpoint,
|
pw_endpoint_resource_info(resource, impl->cached_info);
|
||||||
&data->endpoint_listener,
|
|
||||||
&endpoint_events, data);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -198,18 +226,158 @@ static const struct pw_global_events global_events = {
|
||||||
.destroy = global_destroy,
|
.destroy = global_destroy,
|
||||||
};
|
};
|
||||||
|
|
||||||
static void global_resource_destroy(void *data)
|
static void impl_resource_destroy(void *data)
|
||||||
{
|
{
|
||||||
struct impl *impl = data;
|
struct impl *impl = data;
|
||||||
|
struct param_data *pdata, *tmp;
|
||||||
|
|
||||||
spa_hook_remove(&impl->resource_listener);
|
spa_hook_remove(&impl->resource_listener);
|
||||||
impl->resource = NULL;
|
impl->resource = NULL;
|
||||||
|
|
||||||
|
/* clear cache */
|
||||||
|
if (impl->cached_info)
|
||||||
|
pw_endpoint_info_free(impl->cached_info);
|
||||||
|
spa_list_for_each_safe(pdata, tmp, &impl->cached_params, link) {
|
||||||
|
struct spa_pod **pod;
|
||||||
|
pw_array_for_each(pod, &pdata->params)
|
||||||
|
free(*pod);
|
||||||
|
pw_array_clear(&pdata->params);
|
||||||
|
spa_list_remove(&pdata->link);
|
||||||
|
free(pdata);
|
||||||
|
}
|
||||||
|
|
||||||
if (impl->global)
|
if (impl->global)
|
||||||
pw_global_destroy(impl->global);
|
pw_global_destroy(impl->global);
|
||||||
}
|
}
|
||||||
|
|
||||||
static const struct pw_resource_events global_resource_events = {
|
static void register_global(struct impl *impl)
|
||||||
|
{
|
||||||
|
impl->cached_info->id = pw_global_get_id (impl->global);
|
||||||
|
pw_resource_set_bound_id(impl->resource, impl->cached_info->id);
|
||||||
|
pw_global_register(impl->global);
|
||||||
|
impl->registered = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void impl_resource_pong (void *data, int seq)
|
||||||
|
{
|
||||||
|
struct impl *impl = data;
|
||||||
|
|
||||||
|
/* complete registration, if this was the initial sync */
|
||||||
|
if (!impl->registered && seq == impl->ping_seq) {
|
||||||
|
register_global(impl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static const struct pw_resource_events impl_resource_events = {
|
||||||
PW_VERSION_RESOURCE_EVENTS,
|
PW_VERSION_RESOURCE_EVENTS,
|
||||||
.destroy = global_resource_destroy,
|
.destroy = impl_resource_destroy,
|
||||||
|
.pong = impl_resource_pong,
|
||||||
|
};
|
||||||
|
|
||||||
|
static int emit_info(void *data, struct pw_resource *resource)
|
||||||
|
{
|
||||||
|
const struct pw_endpoint_info *info = data;
|
||||||
|
pw_endpoint_resource_info(resource, info);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void event_info(void *object, const struct pw_endpoint_info *info)
|
||||||
|
{
|
||||||
|
struct impl *impl = object;
|
||||||
|
uint32_t changed_ids[MAX_PARAMS], n_changed_ids = 0;
|
||||||
|
uint32_t i;
|
||||||
|
|
||||||
|
/* figure out changes to params */
|
||||||
|
if (info->change_mask & PW_ENDPOINT_CHANGE_MASK_PARAMS) {
|
||||||
|
for (i = 0; i < info->n_params; i++) {
|
||||||
|
if ((!impl->cached_info ||
|
||||||
|
info->params[i].flags != impl->cached_info->params[i].flags)
|
||||||
|
&& info->params[i].flags & SPA_PARAM_INFO_READ)
|
||||||
|
changed_ids[n_changed_ids++] = info->params[i].id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* cache for new clients */
|
||||||
|
impl->cached_info = pw_endpoint_info_update (impl->cached_info, info);
|
||||||
|
|
||||||
|
/* notify existing clients */
|
||||||
|
pw_global_for_each_resource(impl->global, emit_info, (void*) info);
|
||||||
|
|
||||||
|
/* cache params & register */
|
||||||
|
if (n_changed_ids > 0) {
|
||||||
|
/* prepare params storage */
|
||||||
|
for (i = 0; i < n_changed_ids; i++) {
|
||||||
|
struct param_data *pdata = calloc(1, sizeof(struct param_data));
|
||||||
|
pdata->id = changed_ids[i];
|
||||||
|
pw_array_init(&pdata->params, sizeof(void*));
|
||||||
|
spa_list_append(&impl->cached_params, &pdata->link);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* subscribe to impl */
|
||||||
|
pw_endpoint_subscribe_params(impl->endpoint, changed_ids, n_changed_ids);
|
||||||
|
|
||||||
|
/* register asynchronously on the pong event */
|
||||||
|
impl->ping_seq = pw_resource_ping(impl->resource, 0);
|
||||||
|
}
|
||||||
|
else if (!impl->registered) {
|
||||||
|
register_global(impl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct param_event_args
|
||||||
|
{
|
||||||
|
uint32_t id, index, next;
|
||||||
|
const struct spa_pod *param;
|
||||||
|
};
|
||||||
|
|
||||||
|
static int emit_param(void *_data, struct pw_resource *resource)
|
||||||
|
{
|
||||||
|
struct param_event_args *args = _data;
|
||||||
|
struct resource_data *data;
|
||||||
|
uint32_t i;
|
||||||
|
|
||||||
|
data = pw_resource_get_user_data(resource);
|
||||||
|
for (i = 0; i < data->n_subscribe_ids; i++) {
|
||||||
|
if (data->subscribe_ids[i] == args->id) {
|
||||||
|
pw_endpoint_resource_param(resource, 1,
|
||||||
|
args->id, args->index, args->next, args->param);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void event_param(void *object, int seq,
|
||||||
|
uint32_t id, uint32_t index, uint32_t next,
|
||||||
|
const struct spa_pod *param)
|
||||||
|
{
|
||||||
|
struct impl *impl = object;
|
||||||
|
struct param_data *pdata;
|
||||||
|
struct spa_pod **pod;
|
||||||
|
struct param_event_args args = { id, index, next, param };
|
||||||
|
|
||||||
|
/* cache for new requests */
|
||||||
|
spa_list_for_each(pdata, &impl->cached_params, link) {
|
||||||
|
if (pdata->id != id)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (!pw_array_check_index(&pdata->params, index, void*)) {
|
||||||
|
while (pw_array_get_len(&pdata->params, void*) <= index)
|
||||||
|
pw_array_add_ptr(&pdata->params, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
pod = pw_array_get_unchecked(&pdata->params, index, struct spa_pod*);
|
||||||
|
free(*pod);
|
||||||
|
*pod = spa_pod_copy(param);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* notify existing clients */
|
||||||
|
pw_global_for_each_resource(impl->global, emit_param, &args);
|
||||||
|
}
|
||||||
|
|
||||||
|
static const struct pw_endpoint_events endpoint_events = {
|
||||||
|
PW_VERSION_ENDPOINT_EVENTS,
|
||||||
|
.info = event_info,
|
||||||
|
.param = event_param,
|
||||||
};
|
};
|
||||||
|
|
||||||
static void *endpoint_new(struct pw_context *context,
|
static void *endpoint_new(struct pw_context *context,
|
||||||
|
|
@ -235,16 +403,23 @@ static void *endpoint_new(struct pw_context *context,
|
||||||
}
|
}
|
||||||
impl->resource = resource;
|
impl->resource = resource;
|
||||||
|
|
||||||
|
spa_list_init(&impl->cached_params);
|
||||||
|
|
||||||
|
/* handle destroy events */
|
||||||
pw_global_add_listener(impl->global,
|
pw_global_add_listener(impl->global,
|
||||||
&impl->global_listener,
|
&impl->global_listener,
|
||||||
&global_events, impl);
|
&global_events, impl);
|
||||||
|
pw_resource_add_listener(impl->resource,
|
||||||
pw_resource_add_listener(resource,
|
|
||||||
&impl->resource_listener,
|
&impl->resource_listener,
|
||||||
&global_resource_events, impl);
|
&impl_resource_events, impl);
|
||||||
|
|
||||||
pw_resource_set_bound_id(resource, pw_global_get_id (impl->global));
|
/* handle implementation events -> cache + client resources */
|
||||||
pw_global_register(impl->global);
|
pw_endpoint_add_listener(impl->endpoint,
|
||||||
|
&impl->endpoint_listener,
|
||||||
|
&endpoint_events, impl);
|
||||||
|
|
||||||
|
/* global is not registered here on purpose;
|
||||||
|
we first cache info + params and then expose the global */
|
||||||
|
|
||||||
return impl;
|
return impl;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue