media-session: also create EndpointLink objects

Create an endpoint link object when linking endpoints. Keep track
of the links in the endpoint_link and cleanup when they are all
gone.

Improve properties on session objects.
This commit is contained in:
Wim Taymans 2019-11-15 17:13:45 +01:00
parent 4ccbce9932
commit 709a52e286
7 changed files with 277 additions and 60 deletions

View file

@ -84,10 +84,37 @@ struct impl {
struct pw_client_session_proxy *client_session; struct pw_client_session_proxy *client_session;
struct spa_hook client_session_listener; struct spa_hook client_session_listener;
struct spa_list endpoint_link_list; /** list of struct endpoint_link */
struct pw_map endpoint_links; /** map of endpoint_link */
int rescan_seq; int rescan_seq;
int last_seq; int last_seq;
}; };
struct endpoint_link {
uint32_t id;
struct pw_endpoint_link_info info;
struct impl *impl;
struct spa_list link; /**< link in struct impl endpoint_link_list */
struct spa_list link_list; /**< list of struct link */
};
struct link {
struct pw_proxy *proxy; /**< proxy for link */
struct spa_hook listener; /**< proxy listener */
uint32_t output_node;
uint32_t output_port;
uint32_t input_node;
uint32_t input_port;
struct endpoint_link *endpoint_link;
struct spa_list link; /**< link in struct endpoint_link link_list */
};
static void add_object(struct impl *impl, struct sm_object *obj) static void add_object(struct impl *impl, struct sm_object *obj)
{ {
size_t size = pw_map_get_size(&impl->globals); size_t size = pw_map_get_size(&impl->globals);
@ -340,7 +367,7 @@ static void endpoint_stream_event_info(void *object, const struct pw_endpoint_st
} }
static const struct pw_endpoint_stream_proxy_events endpoint_stream_events = { static const struct pw_endpoint_stream_proxy_events endpoint_stream_events = {
PW_VERSION_ENDPOINT_PROXY_EVENTS, PW_VERSION_ENDPOINT_STREAM_PROXY_EVENTS,
.info = endpoint_stream_event_info, .info = endpoint_stream_event_info,
}; };
@ -357,6 +384,56 @@ static void endpoint_stream_destroy(void *object)
spa_list_remove(&stream->link); spa_list_remove(&stream->link);
} }
} }
/**
* Endpoint Link
*/
static void endpoint_link_event_info(void *object, const struct pw_endpoint_link_info *info)
{
struct sm_endpoint_link *link = object;
struct impl *impl = SPA_CONTAINER_OF(link->obj.session, struct impl, this);
pw_log_debug(NAME" %p: endpoint link %d info", impl, link->obj.id);
if (link->info == NULL && info) {
link->info = calloc(1, sizeof(struct pw_endpoint_link_info));
link->info->version = PW_VERSION_ENDPOINT_LINK_INFO;
link->info->id = info->id;
link->info->session_id = info->session_id;
link->info->output_endpoint_id = info->output_endpoint_id;
link->info->output_stream_id = info->output_stream_id;
link->info->input_endpoint_id = info->input_endpoint_id;
link->info->input_stream_id = info->input_stream_id;
}
link->info->change_mask = info->change_mask;
link->avail |= SM_ENDPOINT_LINK_CHANGE_MASK_INFO;
link->changed |= SM_ENDPOINT_LINK_CHANGE_MASK_INFO;
sm_media_session_emit_update(impl, &link->obj);
link->changed = 0;
}
static const struct pw_endpoint_link_proxy_events endpoint_link_events = {
PW_VERSION_ENDPOINT_LINK_PROXY_EVENTS,
.info = endpoint_link_event_info,
};
static void endpoint_link_destroy(void *object)
{
struct sm_endpoint_link *link = object;
if (link->info) {
free(link->info->error);
free(link->info);
}
if (link->output) {
link->output = NULL;
spa_list_remove(&link->output_link);
}
if (link->input) {
link->input = NULL;
spa_list_remove(&link->input_link);
}
}
/** /**
* Proxy * Proxy
*/ */
@ -430,6 +507,13 @@ registry_global(void *data,uint32_t id,
user_data_size = sizeof(struct sm_endpoint_stream); user_data_size = sizeof(struct sm_endpoint_stream);
break; break;
case PW_TYPE_INTERFACE_EndpointLink:
events = &endpoint_link_events;
client_version = PW_VERSION_ENDPOINT_LINK_PROXY;
destroy = (pw_destroy_t) endpoint_link_destroy;
user_data_size = sizeof(struct sm_endpoint_link);
break;
default: default:
return; return;
} }
@ -501,6 +585,7 @@ registry_global(void *data,uint32_t id,
stream->endpoint->changed |= SM_ENDPOINT_CHANGE_MASK_STREAMS; stream->endpoint->changed |= SM_ENDPOINT_CHANGE_MASK_STREAMS;
} }
} }
spa_list_init(&stream->link_list);
break; break;
} }
default: default:
@ -605,46 +690,46 @@ struct pw_proxy *sm_media_session_create_object(struct sm_media_session *sess,
factory_name, type, version, props, user_data_size); factory_name, type, version, props, user_data_size);
} }
int sm_media_session_create_links(struct sm_media_session *sess, static void check_endpoint_link(struct endpoint_link *link)
const struct spa_dict *dict) {
if (!spa_list_is_empty(&link->link_list))
return;
if (link->impl) {
spa_list_remove(&link->link);
pw_map_remove(&link->impl->endpoint_links, link->id);
pw_client_session_proxy_link_update(link->impl->client_session,
link->id,
PW_CLIENT_SESSION_LINK_UPDATE_DESTROYED,
0, NULL, NULL);
link->impl = NULL;
free(link);
}
}
static void link_proxy_destroy(void *data)
{
struct link *l = data;
if (l->endpoint_link) {
spa_list_remove(&l->link);
check_endpoint_link(l->endpoint_link);
l->endpoint_link = NULL;
}
}
static const struct pw_proxy_events link_proxy_events = {
PW_VERSION_PROXY_EVENTS,
.destroy = link_proxy_destroy
};
static int link_nodes(struct impl *impl, struct endpoint_link *link,
struct sm_node *outnode, struct sm_node *innode)
{ {
struct impl *impl = SPA_CONTAINER_OF(sess, struct impl, this);
struct sm_object *obj;
struct sm_node *outnode, *innode;
struct sm_port *outport, *inport;
struct pw_properties *props; struct pw_properties *props;
const char *str; struct sm_port *outport, *inport;
int res;
sm_media_session_roundtrip(sess);
/* find output node */
if ((str = spa_dict_lookup(dict, PW_KEY_LINK_OUTPUT_NODE)) == NULL) {
res = -EINVAL;
pw_log_error(NAME" %p: no output node given", impl);
goto exit;
}
if ((obj = find_object(impl, atoi(str))) == NULL ||
obj->type != PW_TYPE_INTERFACE_Node) {
res = -EINVAL;
pw_log_error(NAME" %p: can find output node %s", impl, str);
goto exit;
}
outnode = (struct sm_node*)obj;
/* find input node */
if ((str = spa_dict_lookup(dict, PW_KEY_LINK_INPUT_NODE)) == NULL) {
res = -EINVAL;
pw_log_error(NAME" %p: no input node given", impl);
goto exit;
}
if ((obj = find_object(impl, atoi(str))) == NULL ||
obj->type != PW_TYPE_INTERFACE_Node) {
res = -EINVAL;
pw_log_error(NAME" %p: can find input node %s", impl, str);
goto exit;
}
innode = (struct sm_node*)obj;
pw_log_debug(NAME" %p: linking %d -> %d", impl, outnode->obj.id, innode->obj.id); pw_log_debug(NAME" %p: linking %d -> %d", impl, outnode->obj.id, innode->obj.id);
@ -663,14 +748,32 @@ int sm_media_session_create_links(struct sm_media_session *sess,
if (outport->direction == PW_DIRECTION_OUTPUT && if (outport->direction == PW_DIRECTION_OUTPUT &&
inport->direction == PW_DIRECTION_INPUT) { inport->direction == PW_DIRECTION_INPUT) {
struct link *l;
struct pw_proxy *p;
pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%d", outport->obj.id); pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%d", outport->obj.id);
pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%d", inport->obj.id); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%d", inport->obj.id);
sm_media_session_create_object(sess, "link-factory", p = pw_core_proxy_create_object(impl->core_proxy,
"link-factory",
PW_TYPE_INTERFACE_Link, PW_TYPE_INTERFACE_Link,
PW_VERSION_LINK_PROXY, PW_VERSION_LINK_PROXY,
&props->dict, 0); &props->dict, sizeof(struct link));
if (p == NULL)
return -errno;
l = pw_proxy_get_user_data(p);
l->proxy = p;
l->output_node = outnode->obj.id;
l->output_port = outport->obj.id;
l->input_node = innode->obj.id;
l->input_port = inport->obj.id;
pw_proxy_add_listener(p, &l->listener, &link_proxy_events, l);
if (link) {
l->endpoint_link = link;
spa_list_append(&link->link_list, &l->link);
}
outport = spa_list_next(outport, link); outport = spa_list_next(outport, link);
inport = spa_list_next(inport, link); inport = spa_list_next(inport, link);
@ -683,8 +786,98 @@ int sm_media_session_create_links(struct sm_media_session *sess,
} }
pw_properties_free(props); pw_properties_free(props);
return 0;
}
int sm_media_session_create_links(struct sm_media_session *sess,
const struct spa_dict *dict)
{
struct impl *impl = SPA_CONTAINER_OF(sess, struct impl, this);
struct sm_object *obj;
struct sm_node *outnode = NULL, *innode = NULL;
struct sm_endpoint *outendpoint = NULL, *inendpoint = NULL;
struct sm_endpoint_stream *outstream = NULL, *instream = NULL;
struct endpoint_link *link = NULL;
const char *str;
int res;
sm_media_session_roundtrip(sess);
/* find output node */
if ((str = spa_dict_lookup(dict, PW_KEY_LINK_OUTPUT_NODE)) != NULL &&
(obj = find_object(impl, atoi(str))) != NULL &&
obj->type == PW_TYPE_INTERFACE_Node) {
outnode = (struct sm_node*)obj;
}
/* find input node */
if ((str = spa_dict_lookup(dict, PW_KEY_LINK_INPUT_NODE)) != NULL &&
(obj = find_object(impl, atoi(str))) != NULL &&
obj->type == PW_TYPE_INTERFACE_Node) {
innode = (struct sm_node*)obj;
}
/* find endpoints and streams */
if ((str = spa_dict_lookup(dict, PW_KEY_ENDPOINT_LINK_OUTPUT_ENDPOINT)) != NULL &&
(obj = find_object(impl, atoi(str))) != NULL &&
obj->type == PW_TYPE_INTERFACE_Endpoint) {
outendpoint = (struct sm_endpoint*)obj;
}
if ((str = spa_dict_lookup(dict, PW_KEY_ENDPOINT_LINK_OUTPUT_STREAM)) != NULL &&
(obj = find_object(impl, atoi(str))) != NULL &&
obj->type == PW_TYPE_INTERFACE_EndpointStream) {
outstream = (struct sm_endpoint_stream*)obj;
}
if ((str = spa_dict_lookup(dict, PW_KEY_ENDPOINT_LINK_INPUT_ENDPOINT)) != NULL &&
(obj = find_object(impl, atoi(str))) != NULL &&
obj->type == PW_TYPE_INTERFACE_Endpoint) {
inendpoint = (struct sm_endpoint*)obj;
}
if ((str = spa_dict_lookup(dict, PW_KEY_ENDPOINT_LINK_INPUT_STREAM)) != NULL &&
(obj = find_object(impl, atoi(str))) != NULL &&
obj->type == PW_TYPE_INTERFACE_EndpointStream) {
instream = (struct sm_endpoint_stream*)obj;
}
if (outendpoint != NULL && inendpoint != NULL) {
link = calloc(1, sizeof(struct endpoint_link));
if (link == NULL)
return -errno;
link->id = pw_map_insert_new(&impl->endpoint_links, link);
link->impl = impl;
spa_list_init(&link->link_list);
spa_list_append(&impl->endpoint_link_list, &link->link);
link->info.version = PW_VERSION_ENDPOINT_LINK_INFO;
link->info.id = link->id;
link->info.session_id = impl->this.info.id;
link->info.output_endpoint_id = outendpoint->info->id;
link->info.output_stream_id = outstream ? outstream->info->id : SPA_ID_INVALID;
link->info.input_endpoint_id = inendpoint->info->id;
link->info.input_stream_id = instream ? instream->info->id : SPA_ID_INVALID;
link->info.change_mask =
PW_ENDPOINT_LINK_CHANGE_MASK_STATE |
PW_ENDPOINT_LINK_CHANGE_MASK_PROPS;
link->info.state = PW_ENDPOINT_LINK_STATE_ACTIVE;
link->info.props = (struct spa_dict*) dict;
}
/* link the nodes, record the link proxies in the endpoint_link */
if (outnode != NULL && innode != NULL)
res = link_nodes(impl, link, outnode, innode);
else
res = 0; res = 0;
exit:
if (link != NULL) {
/* now create the endpoint link */
pw_client_session_proxy_link_update(impl->client_session,
link->id,
PW_CLIENT_SESSION_UPDATE_INFO,
0, NULL,
&link->info);
}
return res; return res;
} }
@ -875,6 +1068,8 @@ int main(int argc, char *argv[])
pw_module_load(impl.this.core, "libpipewire-module-session-manager", NULL, NULL); pw_module_load(impl.this.core, "libpipewire-module-session-manager", NULL, NULL);
pw_map_init(&impl.globals, 64, 64); pw_map_init(&impl.globals, 64, 64);
pw_map_init(&impl.endpoint_links, 64, 64);
spa_list_init(&impl.endpoint_link_list);
spa_hook_list_init(&impl.hooks); spa_hook_list_init(&impl.hooks);
if ((res = pw_remote_connect(impl.monitor_remote)) < 0) if ((res = pw_remote_connect(impl.monitor_remote)) < 0)

View file

@ -116,7 +116,9 @@ struct sm_endpoint_stream {
struct sm_endpoint *endpoint; struct sm_endpoint *endpoint;
struct spa_list link; /**< link in endpoint stream_list */ struct spa_list link; /**< link in endpoint stream_list */
#define SM_STREAM_CHANGE_MASK_INFO (1<<0) struct spa_list link_list; /**< list of links */
#define SM_ENDPOINT_STREAM_CHANGE_MASK_INFO (1<<0)
uint32_t mask; /**< monitored info */ uint32_t mask; /**< monitored info */
uint32_t avail; /**< available info */ uint32_t avail; /**< available info */
uint32_t changed; /**< changed since last update */ uint32_t changed; /**< changed since last update */
@ -126,8 +128,14 @@ struct sm_endpoint_stream {
struct sm_endpoint_link { struct sm_endpoint_link {
struct sm_object obj; struct sm_object obj;
struct spa_list link; struct spa_list link; /**< link in session link_list */
struct spa_list output_link;
struct sm_endpoint_stream *output;
struct spa_list input_link;
struct sm_endpoint_stream *input;
#define SM_ENDPOINT_LINK_CHANGE_MASK_INFO (1<<0)
uint32_t mask; /**< monitored info */ uint32_t mask; /**< monitored info */
uint32_t avail; /**< available info */ uint32_t avail; /**< available info */
uint32_t changed; /**< changed since last update */ uint32_t changed; /**< changed since last update */

View file

@ -83,6 +83,10 @@ static int client_session_link_update(void *object,
PW_KEY_FACTORY_ID, PW_KEY_FACTORY_ID,
PW_KEY_CLIENT_ID, PW_KEY_CLIENT_ID,
PW_KEY_SESSION_ID, PW_KEY_SESSION_ID,
PW_KEY_ENDPOINT_LINK_OUTPUT_ENDPOINT,
PW_KEY_ENDPOINT_LINK_OUTPUT_STREAM,
PW_KEY_ENDPOINT_LINK_INPUT_ENDPOINT,
PW_KEY_ENDPOINT_LINK_INPUT_STREAM,
NULL NULL
}; };
@ -94,6 +98,8 @@ static int client_session_link_update(void *object,
if (!props) if (!props)
goto no_mem; goto no_mem;
pw_properties_update_keys(props, &session->props->dict, keys); pw_properties_update_keys(props, &session->props->dict, keys);
if (info && info->props)
pw_properties_update_keys(props, info->props, keys);
if (endpoint_link_init(link, link_id, session->info.id, if (endpoint_link_init(link, link_id, session->info.id,
this, core, props) < 0) this, core, props) < 0)

View file

@ -303,6 +303,8 @@ int endpoint_link_init(struct endpoint_link *this,
this->id = id; this->id = id;
this->props = properties; this->props = properties;
pw_properties_setf(properties, PW_KEY_SESSION_ID, "%u", session_id);
properties = pw_properties_copy(properties); properties = pw_properties_copy(properties);
if (!properties) if (!properties)
goto no_mem; goto no_mem;
@ -314,6 +316,8 @@ int endpoint_link_init(struct endpoint_link *this,
if (!this->global) if (!this->global)
goto no_mem; goto no_mem;
pw_properties_setf(this->props, PW_KEY_OBJECT_ID, "%u", this->global->id);
this->info.version = PW_VERSION_ENDPOINT_LINK_INFO; this->info.version = PW_VERSION_ENDPOINT_LINK_INFO;
this->info.id = this->global->id; this->info.id = this->global->id;
this->info.session_id = session_id; this->info.session_id = session_id;

View file

@ -285,6 +285,8 @@ int endpoint_stream_init(struct endpoint_stream *this,
this->id = id; this->id = id;
this->props = properties; this->props = properties;
pw_properties_setf(properties, PW_KEY_ENDPOINT_ID, "%u", endpoint_id);
properties = pw_properties_copy(properties); properties = pw_properties_copy(properties);
if (!properties) if (!properties)
goto no_mem; goto no_mem;
@ -296,6 +298,8 @@ int endpoint_stream_init(struct endpoint_stream *this,
if (!this->global) if (!this->global)
goto no_mem; goto no_mem;
pw_properties_setf(this->props, PW_KEY_OBJECT_ID, "%u", this->global->id);
this->info.version = PW_VERSION_ENDPOINT_STREAM_INFO; this->info.version = PW_VERSION_ENDPOINT_STREAM_INFO;
this->info.id = this->global->id; this->info.id = this->global->id;
this->info.endpoint_id = endpoint_id; this->info.endpoint_id = endpoint_id;

View file

@ -327,7 +327,7 @@ int endpoint_init(struct endpoint *this,
if (!this->global) if (!this->global)
goto no_mem; goto no_mem;
pw_properties_setf(this->props, PW_KEY_ENDPOINT_ID, "%u", this->global->id); pw_properties_setf(this->props, PW_KEY_OBJECT_ID, "%u", this->global->id);
this->info.version = PW_VERSION_ENDPOINT_INFO; this->info.version = PW_VERSION_ENDPOINT_INFO;
this->info.id = this->global->id; this->info.id = this->global->id;

View file

@ -294,7 +294,7 @@ int session_init(struct session *this,
if (!this->global) if (!this->global)
goto no_mem; goto no_mem;
pw_properties_setf(this->props, PW_KEY_SESSION_ID, "%u", this->global->id); pw_properties_setf(this->props, PW_KEY_OBJECT_ID, "%u", this->global->id);
this->info.version = PW_VERSION_SESSION_INFO; this->info.version = PW_VERSION_SESSION_INFO;
this->info.id = this->global->id; this->info.id = this->global->id;