media-session: add media session helpers

Move all the media-session object monitoring into one place and
provide an API to get to the session objects.

Make API to add module specific info to objects.

Add methods to export and create objects in the session. This should
make it possible to link proxy to implementation and avoid a server
roundtrip in some cases.
This commit is contained in:
Wim Taymans 2019-11-14 18:35:29 +01:00
parent 3f3dfbc67e
commit 161cf46898
9 changed files with 1140 additions and 1054 deletions

View file

@ -39,6 +39,7 @@
#include "pipewire/pipewire.h"
#include "pipewire/private.h"
#include "extensions/session-manager.h"
#include "media-session.h"
#define NAME "stream-monitor"
@ -48,76 +49,31 @@
struct client_endpoint;
struct impl {
struct timespec now;
struct sm_media_session *session;
struct spa_hook listener;
struct pw_core *core;
struct pw_remote *remote;
struct spa_hook remote_listener;
uint32_t session_id;
struct pw_core_proxy *core_proxy;
struct spa_hook core_listener;
struct pw_registry_proxy *registry_proxy;
struct spa_hook registry_listener;
struct pw_map globals;
struct spa_list client_list;
struct spa_list node_list;
int seq;
};
struct object {
struct impl *impl;
uint32_t id;
uint32_t type;
struct pw_proxy *proxy;
struct spa_hook listener;
};
struct node {
struct object obj;
struct sm_node *obj;
struct spa_list l;
struct client_endpoint *endpoint;
struct impl *impl;
struct spa_hook proxy_listener;
struct spa_hook listener;
struct pw_node_info *info;
struct spa_list port_list;
uint32_t id;
enum pw_direction direction;
#define NODE_TYPE_UNKNOWN 0
#define NODE_TYPE_STREAM 1
uint32_t type;
char *media;
struct client_endpoint *endpoint;
uint32_t media_type;
uint32_t media_subtype;
struct spa_audio_info_raw format;
};
struct endpoint {
struct object obj;
};
struct port {
struct object obj;
struct spa_list l;
enum pw_direction direction;
struct pw_port_info *info;
struct node *node;
#define PORT_FLAG_NONE 0
#define PORT_FLAG_DSP (1<<0)
#define PORT_FLAG_SKIP (1<<1)
uint32_t flags;
struct spa_hook listener;
};
struct stream {
struct pw_properties *props;
struct pw_endpoint_stream_info info;
@ -138,29 +94,9 @@ struct client_endpoint {
struct pw_endpoint_info info;
struct stream stream;
uint32_t pending_config;
};
static void add_object(struct impl *impl, struct object *obj)
{
size_t size = pw_map_get_size(&impl->globals);
while (obj->id > size)
pw_map_insert_at(&impl->globals, size++, NULL);
pw_map_insert_at(&impl->globals, obj->id, obj);
}
static void remove_object(struct impl *impl, struct object *obj)
{
pw_map_insert_at(&impl->globals, obj->id, NULL);
}
static void *find_object(struct impl *impl, uint32_t id)
{
void *obj;
if ((obj = pw_map_lookup(&impl->globals, id)) != NULL)
return obj;
return NULL;
}
static int client_endpoint_set_id(void *object, uint32_t id)
{
struct client_endpoint *endpoint = object;
@ -199,7 +135,7 @@ static int client_endpoint_create_link(void *object, const struct spa_dict *prop
struct client_endpoint *endpoint = object;
struct impl *impl = endpoint->impl;
const char *str;
struct endpoint *ep;
struct sm_object *obj;
struct node *node = endpoint->node;
struct pw_properties *p;
int res;
@ -214,14 +150,25 @@ static int client_endpoint_create_link(void *object, const struct spa_dict *prop
return -errno;
if (endpoint->info.direction == PW_DIRECTION_OUTPUT) {
pw_properties_setf(p, PW_KEY_LINK_OUTPUT_NODE, "%d", endpoint->node->info->id);
pw_properties_setf(p, PW_KEY_LINK_OUTPUT_NODE, "%d", endpoint->node->id);
pw_properties_setf(p, PW_KEY_LINK_OUTPUT_PORT, "-1");
str = spa_dict_lookup(props, PW_KEY_LINK_INPUT_NODE);
} else {
pw_properties_setf(p, PW_KEY_LINK_INPUT_NODE, "%d", endpoint->node->info->id);
pw_properties_setf(p, PW_KEY_LINK_INPUT_NODE, "%d", endpoint->node->id);
pw_properties_setf(p, PW_KEY_LINK_INPUT_PORT, "-1");
str = spa_dict_lookup(props, PW_KEY_LINK_OUTPUT_NODE);
}
if (str == NULL) {
pw_log_warn(NAME" %p: no target endpoint given", impl);
res = -EINVAL;
goto exit;
}
obj = sm_media_session_find_object(impl->session, atoi(str));
if (obj == NULL || obj->type != PW_TYPE_INTERFACE_Endpoint) {
pw_log_warn(NAME" %p: could not find object %s (%p)", impl, str, obj);
res = -EINVAL;
goto exit;
}
if (!endpoint->stream.active) {
char buf[1024];
@ -242,25 +189,15 @@ static int client_endpoint_create_link(void *object, const struct spa_dict *prop
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_pod(2, NULL, param);
pw_node_proxy_set_param((struct pw_node_proxy*)node->obj.proxy,
pw_node_proxy_set_param((struct pw_node_proxy*)node->obj->obj.proxy,
SPA_PARAM_PortConfig, 0, param);
endpoint->pending_config = pw_proxy_sync(node->obj->obj.proxy, 0);
endpoint->stream.active = true;
}
str = spa_dict_lookup(props, PW_KEY_LINK_INPUT_NODE);
if (str == NULL) {
res = -EINVAL;
goto exit;
}
ep = find_object(impl, atoi(str));
if (ep == NULL) {
res = -EINVAL;
goto exit;
}
pw_endpoint_proxy_create_link((struct pw_endpoint_proxy*)ep->obj.proxy, &p->dict);
pw_endpoint_proxy_create_link((struct pw_endpoint_proxy*)obj->proxy, &p->dict);
res = 0;
@ -281,19 +218,19 @@ static const struct pw_client_endpoint_proxy_events client_endpoint_events = {
static struct client_endpoint *make_endpoint(struct node *node)
{
struct impl *impl = node->obj.impl;
struct impl *impl = node->impl;
struct pw_properties *props;
struct client_endpoint *endpoint;
struct stream *s;
struct pw_proxy *proxy;
const char *str, *media_class = NULL, *name = NULL;
struct spa_dict *dict = node->info->props;
props = pw_properties_new(NULL, NULL);
if (props == NULL)
return NULL;
if (node->info && node->info->props) {
if (node->obj->info && node->obj->info->props) {
struct spa_dict *dict = node->obj->info->props;
if ((media_class = spa_dict_lookup(dict, PW_KEY_MEDIA_CLASS)) != NULL)
pw_properties_set(props, PW_KEY_MEDIA_CLASS, media_class);
if ((name = spa_dict_lookup(dict, PW_KEY_MEDIA_NAME)) != NULL)
@ -302,7 +239,7 @@ static struct client_endpoint *make_endpoint(struct node *node)
pw_properties_set(props, PW_KEY_ENDPOINT_AUTOCONNECT, str);
}
proxy = pw_core_proxy_create_object(impl->core_proxy,
proxy = sm_media_session_create_object(impl->session,
"client-endpoint",
PW_TYPE_INTERFACE_ClientEndpoint,
PW_VERSION_CLIENT_ENDPOINT_PROXY,
@ -318,9 +255,9 @@ static struct client_endpoint *make_endpoint(struct node *node)
endpoint->props = props;
endpoint->client_endpoint = (struct pw_client_endpoint_proxy *) proxy;
endpoint->info.version = PW_VERSION_ENDPOINT_INFO;
endpoint->info.name = (char*)pw_properties_get(endpoint->props, PW_KEY_ENDPOINT_NAME);
endpoint->info.media_class = (char*)spa_dict_lookup(node->info->props, PW_KEY_MEDIA_CLASS);
endpoint->info.session_id = impl->session_id;
endpoint->info.name = (char*)pw_properties_get(props, PW_KEY_ENDPOINT_NAME);
endpoint->info.media_class = (char*)pw_properties_get(props, PW_KEY_MEDIA_CLASS);
endpoint->info.session_id = impl->session->info.id;
endpoint->info.direction = node->direction;
endpoint->info.flags = 0;
endpoint->info.change_mask =
@ -337,7 +274,7 @@ static struct client_endpoint *make_endpoint(struct node *node)
s = &endpoint->stream;
s->props = pw_properties_new(NULL, NULL);
if ((str = spa_dict_lookup(dict, PW_KEY_MEDIA_CLASS)) != NULL)
if ((str = pw_properties_get(props, PW_KEY_MEDIA_CLASS)) != NULL)
pw_properties_set(s->props, PW_KEY_MEDIA_CLASS, str);
if (node->direction == PW_DIRECTION_OUTPUT)
pw_properties_set(s->props, PW_KEY_STREAM_NAME, "Playback");
@ -351,7 +288,7 @@ static struct client_endpoint *make_endpoint(struct node *node)
s->info.change_mask = PW_ENDPOINT_STREAM_CHANGE_MASK_PROPS;
s->info.props = &s->props->dict;
pw_log_debug("stream %d", node->obj.id);
pw_log_debug("stream %d", node->id);
pw_client_endpoint_proxy_stream_update(endpoint->client_endpoint,
s->info.id,
PW_CLIENT_ENDPOINT_STREAM_UPDATE_INFO,
@ -365,24 +302,15 @@ static void destroy_endpoint(struct client_endpoint *endpoint)
pw_proxy_destroy((struct pw_proxy*)endpoint->client_endpoint);
}
static void node_event_info(void *object, const struct pw_node_info *info)
{
struct node *n = object;
struct impl *impl = n->obj.impl;
pw_log_debug(NAME" %p: info for node %d type %d", impl, n->obj.id, n->type);
n->info = pw_node_info_update(n->info, info);
}
static void node_event_param(void *object, int seq,
uint32_t id, uint32_t index, uint32_t next,
const struct spa_pod *param)
{
struct node *n = object;
struct impl *impl = n->obj.impl;
struct impl *impl = n->impl;
struct spa_audio_info_raw info = { 0, };
pw_log_debug(NAME" %p: param for node %d, %d", impl, n->obj.id, id);
pw_log_debug(NAME" %p: param for node %d, %d", impl, n->id, id);
if (id != SPA_PARAM_EnumFormat)
goto error;
@ -418,47 +346,60 @@ static void node_event_param(void *object, int seq,
static const struct pw_node_proxy_events node_events = {
PW_VERSION_NODE_PROXY_EVENTS,
.info = node_event_info,
.param = node_event_param,
};
static void node_proxy_destroy(void *data)
{
struct node *n = data;
struct impl *impl = n->obj.impl;
struct port *p, *t;
struct impl *impl = n->impl;
pw_log_debug(NAME " %p: proxy destroy node %d", impl, n->obj.id);
pw_log_debug(NAME " %p: proxy destroy node %d", impl, n->id);
spa_list_remove(&n->l);
spa_list_for_each_safe(p, t, &n->port_list, l) {
spa_list_remove(&p->l);
p->node = NULL;
}
if (n->info)
pw_node_info_free(n->info);
if (n->endpoint)
destroy_endpoint(n->endpoint);
free(n->media);
}
static void node_proxy_done(void *data, int seq)
{
struct node *n = data;
struct impl *impl = n->impl;
struct client_endpoint *endpoint = n->endpoint;
if (endpoint == NULL)
return;
if (endpoint->pending_config != 0) {
pw_log_debug(NAME" %p: config complete", impl);
endpoint->pending_config = 0;
}
}
static void node_proxy_error(void *data, int seq, int res, const char *message)
{
struct node *n = data;
struct impl *impl = n->impl;
pw_log_error(NAME " %p: proxy seq:%d got error %d: %s", impl, seq, res, message);
}
static const struct pw_proxy_events node_proxy_events = {
PW_VERSION_PROXY_EVENTS,
.destroy = node_proxy_destroy,
.done = node_proxy_done,
.error = node_proxy_error,
};
static int
handle_node(struct impl *impl, uint32_t id,
uint32_t type, const struct spa_dict *props)
handle_node(struct impl *impl, struct sm_object *obj)
{
const char *media_class;
enum pw_direction direction;
struct pw_proxy *p;
struct node *node;
media_class = props ? spa_dict_lookup(props, PW_KEY_MEDIA_CLASS) : NULL;
if (sm_object_get_data(obj, "stream-monitor") != NULL)
return 0;
media_class = obj->props ? pw_properties_get(obj->props, PW_KEY_MEDIA_CLASS) : NULL;
pw_log_debug(NAME" %p: node "PW_KEY_MEDIA_CLASS" %s", impl, media_class);
@ -481,188 +422,23 @@ handle_node(struct impl *impl, uint32_t id,
else
return 0;
p = pw_registry_proxy_bind(impl->registry_proxy,
id, type, PW_VERSION_NODE_PROXY,
sizeof(struct node));
node = pw_proxy_get_user_data(p);
node->obj.impl = impl;
node->obj.id = id;
node->obj.type = type;
node->obj.proxy = p;
spa_list_init(&node->port_list);
pw_proxy_add_listener(p, &node->obj.listener, &node_proxy_events, node);
pw_proxy_add_object_listener(p, &node->listener, &node_events, node);
add_object(impl, &node->obj);
spa_list_append(&impl->node_list, &node->l);
node->type = NODE_TYPE_UNKNOWN;
node = sm_object_add_data(obj, "stream-monitor", sizeof(struct node));
node->obj = (struct sm_node*)obj;
node->impl = impl;
node->id = obj->id;
node->direction = direction;
node->type = NODE_TYPE_STREAM;
node->media = strdup(media_class);
pw_log_debug(NAME "%p: node %d is stream %s", impl, id, node->media);
pw_log_debug(NAME "%p: node %d is stream %s", impl, node->id, node->media);
pw_node_proxy_enum_params((struct pw_node_proxy*)p,
pw_proxy_add_listener(obj->proxy, &node->proxy_listener, &node_proxy_events, node);
pw_proxy_add_object_listener(obj->proxy, &node->listener, &node_events, node);
pw_node_proxy_enum_params((struct pw_node_proxy*)obj->proxy,
0, SPA_PARAM_EnumFormat,
0, -1, NULL);
return 1;
}
static void port_event_info(void *object, const struct pw_port_info *info)
{
struct port *p = object;
pw_log_debug(NAME" %p: info for port %d", p->obj.impl, p->obj.id);
p->info = pw_port_info_update(p->info, info);
}
static const struct pw_port_proxy_events port_events = {
PW_VERSION_PORT_PROXY_EVENTS,
.info = port_event_info,
};
static void port_proxy_destroy(void *data)
{
struct port *p = data;
pw_log_debug(NAME " %p: proxy destroy port %d", p->obj.impl, p->obj.id);
if (p->node) {
spa_list_remove(&p->l);
p->node = NULL;
}
if (p->info)
pw_port_info_free(p->info);
}
static const struct pw_proxy_events port_proxy_events = {
PW_VERSION_PROXY_EVENTS,
.destroy = port_proxy_destroy,
};
static int
handle_port(struct impl *impl, uint32_t id, uint32_t type,
const struct spa_dict *props)
{
struct port *port;
struct pw_proxy *p;
struct node *node;
const char *str;
uint32_t node_id;
if (props == NULL || (str = spa_dict_lookup(props, PW_KEY_NODE_ID)) == NULL)
return -EINVAL;
node_id = atoi(str);
if ((node = find_object(impl, node_id)) == NULL)
return 0;
if (props == NULL || (str = spa_dict_lookup(props, PW_KEY_PORT_DIRECTION)) == NULL)
return -EINVAL;
p = pw_registry_proxy_bind(impl->registry_proxy,
id, type, PW_VERSION_PORT_PROXY,
sizeof(struct port));
port = pw_proxy_get_user_data(p);
port->obj.impl = impl;
port->obj.id = id;
port->obj.type = type;
port->obj.proxy = p;
port->node = node;
port->direction = strcmp(str, "out") ? PW_DIRECTION_OUTPUT : PW_DIRECTION_INPUT;
if (props != NULL && (str = spa_dict_lookup(props, PW_KEY_FORMAT_DSP)) != NULL)
port->flags |= PORT_FLAG_DSP;
pw_proxy_add_listener(p, &port->obj.listener, &port_proxy_events, port);
pw_proxy_add_object_listener(p, &port->listener, &port_events, port);
add_object(impl, &port->obj);
spa_list_append(&node->port_list, &port->l);
pw_log_debug(NAME" %p: new port %d for node %d type %d %08x", impl, id, node_id,
node->type, port->flags);
return 0;
}
static int
handle_endpoint(struct impl *impl, uint32_t id, uint32_t type,
const struct spa_dict *props)
{
struct endpoint *ep;
struct pw_proxy *p;
p = pw_registry_proxy_bind(impl->registry_proxy,
id, type, PW_VERSION_ENDPOINT_PROXY,
sizeof(struct endpoint));
ep = pw_proxy_get_user_data(p);
ep->obj.impl = impl;
ep->obj.id = id;
ep->obj.type = type;
ep->obj.proxy = p;
add_object(impl, &ep->obj);
pw_log_debug(NAME" %p: new endpoint %d", impl, id);
return 0;
}
static void
registry_global(void *data,uint32_t id,
uint32_t permissions, uint32_t type, uint32_t version,
const struct spa_dict *props)
{
struct impl *impl = data;
int res;
pw_log_debug(NAME " %p: new global '%d' %d", impl, id, type);
switch (type) {
case PW_TYPE_INTERFACE_Node:
res = handle_node(impl, id, type, props);
break;
case PW_TYPE_INTERFACE_Port:
res = handle_port(impl, id, type, props);
break;
case PW_TYPE_INTERFACE_Endpoint:
res = handle_endpoint(impl, id, type, props);
break;
default:
res = 0;
break;
}
if (res < 0) {
pw_log_warn(NAME" %p: can't handle global %d: %s", impl, id, spa_strerror(res));
}
}
static void
registry_global_remove(void *data, uint32_t id)
{
struct impl *impl = data;
struct object *obj;
pw_log_debug(NAME " %p: remove global '%d'", impl, id);
if ((obj = find_object(impl, id)) == NULL)
return;
remove_object(impl, obj);
}
static const struct pw_registry_proxy_events registry_events = {
PW_VERSION_REGISTRY_PROXY_EVENTS,
.global = registry_global,
.global_remove = registry_global_remove,
};
#if 0
static void stream_set_volume(struct impl *impl, struct node *node, float volume, bool mute)
{
@ -723,7 +499,39 @@ static void rescan_session(struct impl *impl, struct session *sess)
}
#endif
void * sm_stream_monitor_start(struct pw_remote *remote, int session_id)
static void session_update(void *data, struct sm_object *object)
{
struct impl *impl = data;
int res;
pw_log_debug(NAME " %p: update object '%d' %d", impl, object->id, object->type);
switch (object->type) {
case PW_TYPE_INTERFACE_Node:
res = handle_node(impl, object);
break;
default:
res = 0;
break;
}
if (res < 0) {
pw_log_warn(NAME" %p: can't handle global %d: %s", impl,
object->id, spa_strerror(res));
}
}
static void session_remove(void *data, struct sm_object *object)
{
}
static const struct sm_media_session_events session_events = {
SM_VERSION_MEDIA_SESSION_EVENTS,
.update = session_update,
.remove = session_remove,
};
void * sm_stream_monitor_start(struct sm_media_session *session)
{
struct impl *impl;
@ -731,26 +539,14 @@ void * sm_stream_monitor_start(struct pw_remote *remote, int session_id)
if (impl == NULL)
return NULL;
impl->core = pw_remote_get_core(remote);
impl->remote = remote;
impl->session_id = session_id;
pw_map_init(&impl->globals, 64, 64);
spa_list_init(&impl->client_list);
spa_list_init(&impl->node_list);
impl->core_proxy = pw_remote_get_core_proxy(impl->remote);
impl->registry_proxy = pw_core_proxy_get_registry(impl->core_proxy,
PW_VERSION_REGISTRY_PROXY, 0);
pw_registry_proxy_add_listener(impl->registry_proxy,
&impl->registry_listener,
&registry_events, impl);
impl->session = session;
sm_media_session_add_listener(session, &impl->listener, &session_events, impl);
return impl;
}
int sm_stream_monitor_stop(struct impl *impl)
{
spa_hook_remove(&impl->listener);
return 0;
}