remote: move node export code to client-node module

Make the code to export objects more generic. Make it possible for
modules to register a type to export.
Make the client-node also able to export plain spa_nodes.
Let the remote signal the global of the exported object if any. We can
then remote the (unused) remote_id from the proxy.
This commit is contained in:
Wim Taymans 2019-01-31 11:02:13 +01:00
parent 7ec9de5ac6
commit 31dacd9d6f
20 changed files with 1236 additions and 1055 deletions

View file

@ -72,7 +72,6 @@ struct data {
struct pw_remote *remote; struct pw_remote *remote;
struct spa_hook remote_listener; struct spa_hook remote_listener;
struct pw_node *node;
struct spa_port_info port_info; struct spa_port_info port_info;
struct spa_node impl_node; struct spa_node impl_node;
@ -493,13 +492,8 @@ static void make_node(struct data *data)
pw_properties_set(props, PW_NODE_PROP_CATEGORY, "Capture"); pw_properties_set(props, PW_NODE_PROP_CATEGORY, "Capture");
pw_properties_set(props, PW_NODE_PROP_ROLE, "Camera"); pw_properties_set(props, PW_NODE_PROP_ROLE, "Camera");
data->node = pw_node_new(data->core, "SDL-sink", props, 0);
data->impl_node = impl_node; data->impl_node = impl_node;
pw_node_set_implementation(data->node, &data->impl_node); pw_remote_export(data->remote, SPA_TYPE_INTERFACE_Node, props, &data->impl_node);
pw_node_register(data->node, NULL, NULL, NULL);
pw_node_set_active(data->node, true);
pw_remote_export(data->remote, data->node);
} }
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error) static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)

View file

@ -505,14 +505,8 @@ static void make_node(struct data *data)
if (data->path) if (data->path)
pw_properties_set(props, PW_NODE_PROP_TARGET_NODE, data->path); pw_properties_set(props, PW_NODE_PROP_TARGET_NODE, data->path);
data->node = pw_node_new(data->core, "export-source", props, 0);
data->impl_node = impl_node; data->impl_node = impl_node;
pw_node_set_implementation(data->node, &data->impl_node); pw_remote_export(data->remote, SPA_TYPE_INTERFACE_Node, props, &data->impl_node);
pw_node_register(data->node, NULL, NULL, NULL);
pw_node_set_active(data->node, true);
pw_remote_export(data->remote, data->node);
} }
static void on_state_changed(void *_data, enum pw_remote_state old, static void on_state_changed(void *_data, enum pw_remote_state old,

View file

@ -70,7 +70,7 @@ static int make_node(struct data *data)
pw_node_set_active(data->node, true); pw_node_set_active(data->node, true);
pw_remote_export(data->remote, data->node); pw_remote_export(data->remote, PW_TYPE_INTERFACE_Node, NULL, data->node);
return 0; return 0;
} }

View file

@ -38,6 +38,8 @@ struct pw_client_node_proxy;
#define PW_VERSION_CLIENT_NODE 0 #define PW_VERSION_CLIENT_NODE 0
#define PW_EXTENSION_MODULE_CLIENT_NODE PIPEWIRE_MODULE_PREFIX "module-client-node"
/** information about a buffer */ /** information about a buffer */
struct pw_client_node_buffer { struct pw_client_node_buffer {
uint32_t mem_id; /**< the memory id for the metadata */ uint32_t mem_id; /**< the memory id for the metadata */

View file

@ -25,6 +25,7 @@ endif
pipewire_module_client_node = shared_library('pipewire-module-client-node', pipewire_module_client_node = shared_library('pipewire-module-client-node',
[ 'module-client-node.c', [ 'module-client-node.c',
'module-client-node/remote-node.c',
'module-client-node/client-node.c', 'module-client-node/client-node.c',
'module-client-node/client-stream.c', 'module-client-node/client-stream.c',
'module-client-node/protocol-native.c', 'module-client-node/protocol-native.c',

View file

@ -40,6 +40,11 @@ static const struct spa_dict_item module_props[] = {
{ PW_MODULE_PROP_VERSION, PACKAGE_VERSION }, { PW_MODULE_PROP_VERSION, PACKAGE_VERSION },
}; };
struct pw_proxy *pw_remote_node_export(struct pw_remote *remote,
uint32_t type, struct pw_properties *props, void *object);
struct pw_proxy *pw_remote_spa_node_export(struct pw_remote *remote,
uint32_t type, struct pw_properties *props, void *object);
struct pw_protocol *pw_protocol_native_ext_client_node_init(struct pw_core *core); struct pw_protocol *pw_protocol_native_ext_client_node_init(struct pw_core *core);
struct factory_data { struct factory_data {
@ -48,6 +53,9 @@ struct factory_data {
struct pw_module *module; struct pw_module *module;
struct spa_hook module_listener; struct spa_hook module_listener;
struct pw_export_type export_node;
struct pw_export_type export_spanode;
}; };
static void *create_object(void *_data, static void *create_object(void *_data,
@ -103,6 +111,9 @@ static void module_destroy(void *data)
if (d->properties) if (d->properties)
pw_properties_free(d->properties); pw_properties_free(d->properties);
spa_list_remove(&d->export_node.link);
spa_list_remove(&d->export_spanode.link);
pw_factory_destroy(d->this); pw_factory_destroy(d->this);
} }
@ -141,6 +152,14 @@ static int module_init(struct pw_module *module, struct pw_properties *propertie
pw_factory_register(factory, NULL, pw_module_get_global(module), NULL); pw_factory_register(factory, NULL, pw_module_get_global(module), NULL);
data->export_node.type = PW_TYPE_INTERFACE_Node;
data->export_node.func = pw_remote_node_export;
pw_core_register_export_type(core, &data->export_node);
data->export_spanode.type = SPA_TYPE_INTERFACE_Node;
data->export_spanode.func = pw_remote_spa_node_export;
pw_core_register_export_type(core, &data->export_spanode);
pw_module_add_listener(module, &data->module_listener, &module_events, data); pw_module_add_listener(module, &data->module_listener, &module_events, data);
pw_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props)); pw_module_update_properties(module, &SPA_DICT_INIT_ARRAY(module_props));

File diff suppressed because it is too large Load diff

View file

@ -445,6 +445,7 @@ struct pw_core *pw_core_new(struct pw_loop *main_loop,
spa_list_init(&this->link_list); spa_list_init(&this->link_list);
spa_list_init(&this->control_list[0]); spa_list_init(&this->control_list[0]);
spa_list_init(&this->control_list[1]); spa_list_init(&this->control_list[1]);
spa_list_init(&this->export_list);
spa_hook_list_init(&this->listener_list); spa_hook_list_init(&this->listener_list);
if ((name = pw_properties_get(properties, PW_CORE_PROP_NAME)) == NULL) { if ((name = pw_properties_get(properties, PW_CORE_PROP_NAME)) == NULL) {

View file

@ -116,8 +116,6 @@ struct pw_core_proxy_methods {
void (*get_registry) (void *object, uint32_t version, uint32_t new_id); void (*get_registry) (void *object, uint32_t version, uint32_t new_id);
/** /**
* Create a new object on the PipeWire server from a factory. * Create a new object on the PipeWire server from a factory.
* Use a \a factory_name of "client-node" to create a
* \ref pw_client_node.
* *
* \param factory_name the factory name to use * \param factory_name the factory name to use
* \param type the interface to bind to * \param type the interface to bind to

View file

@ -35,6 +35,7 @@ extern "C" {
#include <pipewire/core.h> #include <pipewire/core.h>
#define PIPEWIRE_SYMBOL_MODULE_INIT "pipewire__module_init" #define PIPEWIRE_SYMBOL_MODULE_INIT "pipewire__module_init"
#define PIPEWIRE_MODULE_PREFIX "libpipewire-"
/** \class pw_module /** \class pw_module
* *

View file

@ -655,6 +655,9 @@ struct pw_node *pw_node_new(struct pw_core *core,
if (impl == NULL) if (impl == NULL)
return NULL; return NULL;
if (name == NULL)
name = "node";
this = &impl->this; this = &impl->this;
this->core = core; this->core = core;
pw_log_debug("node %p: new \"%s\"", this, name); pw_log_debug("node %p: new \"%s\"", this, name);
@ -687,7 +690,6 @@ struct pw_node *pw_node_new(struct pw_core *core,
spa_list_init(&this->output_ports); spa_list_init(&this->output_ports);
pw_map_init(&this->output_port_map, 64, 64); pw_map_init(&this->output_port_map, 64, 64);
spa_graph_init(&impl->driver_graph, &impl->driver_state); spa_graph_init(&impl->driver_graph, &impl->driver_state);
this->rt.driver = &impl->driver_graph; this->rt.driver = &impl->driver_graph;

View file

@ -150,6 +150,19 @@ static int schedule_mix_input(struct spa_node *data)
return SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER; return SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER;
} }
static int schedule_mix_port_set_param(struct spa_node *data,
enum spa_direction direction, uint32_t port_id,
uint32_t id, uint32_t flags,
const struct spa_pod *param)
{
switch (id) {
case SPA_PARAM_Format:
pw_log_debug("port %d:%d: set format", direction, port_id);
break;
}
return 0;
}
static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{ {
struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node); struct impl *impl = SPA_CONTAINER_OF(data, struct impl, mix_node);
@ -170,6 +183,7 @@ static const struct spa_node schedule_mix_node = {
SPA_VERSION_NODE, SPA_VERSION_NODE,
NULL, NULL,
.process = schedule_mix_input, .process = schedule_mix_input,
.port_set_param = schedule_mix_port_set_param,
.port_reuse_buffer = schedule_mix_reuse_buffer, .port_reuse_buffer = schedule_mix_reuse_buffer,
}; };

View file

@ -181,6 +181,7 @@ struct pw_core {
struct spa_list factory_list; /**< list of factories */ struct spa_list factory_list; /**< list of factories */
struct spa_list link_list; /**< list of links */ struct spa_list link_list; /**< list of links */
struct spa_list control_list[2]; /**< list of controls, indexed by direction */ struct spa_list control_list[2]; /**< list of controls, indexed by direction */
struct spa_list export_list; /**< list of export types */
struct spa_hook_list listener_list; struct spa_hook_list listener_list;
@ -537,7 +538,6 @@ struct pw_proxy {
struct spa_list link; /**< link in the remote */ struct spa_list link; /**< link in the remote */
uint32_t id; /**< client side id */ uint32_t id; /**< client side id */
uint32_t remote_id; /**< remote id */
struct spa_hook_list listener_list; struct spa_hook_list listener_list;
struct spa_hook_list proxy_listener_list; struct spa_hook_list proxy_listener_list;
@ -550,7 +550,7 @@ struct pw_proxy {
#define pw_remote_events_emit(r,m,v,...) spa_hook_list_call(&r->listener_list, struct pw_remote_events, m, v, ##__VA_ARGS__) #define pw_remote_events_emit(r,m,v,...) spa_hook_list_call(&r->listener_list, struct pw_remote_events, m, v, ##__VA_ARGS__)
#define pw_remote_events_destroy(r) pw_remote_events_emit(r, destroy, 0) #define pw_remote_events_destroy(r) pw_remote_events_emit(r, destroy, 0)
#define pw_remote_events_state_changed(r,o,s,e) pw_remote_events_emit(r, state_changed, 0, o, s, e) #define pw_remote_events_state_changed(r,o,s,e) pw_remote_events_emit(r, state_changed, 0, o, s, e)
#define pw_remote_events_exported(r,i) pw_remote_events_emit(r, exported, 0, i) #define pw_remote_events_exported(r,i,g) pw_remote_events_emit(r, exported, 0, i,g)
struct pw_remote { struct pw_remote {
struct pw_core *core; /**< core */ struct pw_core *core; /**< core */
@ -676,6 +676,8 @@ pw_core_find_port(struct pw_core *core,
struct spa_pod **format_filters, struct spa_pod **format_filters,
char **error); char **error);
const struct pw_export_type *pw_core_find_export_type(struct pw_core *core, uint32_t type);
/** Create a new port \memberof pw_port /** Create a new port \memberof pw_port
* \return a newly allocated port */ * \return a newly allocated port */
struct pw_port * struct pw_port *

View file

@ -70,7 +70,6 @@ struct pw_proxy *pw_proxy_new(struct pw_proxy *factory,
spa_hook_list_init(&this->proxy_listener_list); spa_hook_list_init(&this->proxy_listener_list);
this->id = pw_map_insert_new(&remote->objects, this); this->id = pw_map_insert_new(&remote->objects, this);
this->remote_id = SPA_ID_INVALID;
if (user_data_size > 0) if (user_data_size > 0)
this->user_data = SPA_MEMBER(impl, sizeof(struct proxy), void); this->user_data = SPA_MEMBER(impl, sizeof(struct proxy), void);

File diff suppressed because it is too large Load diff

View file

@ -138,7 +138,7 @@ struct pw_remote_events {
void (*state_changed) (void *data, enum pw_remote_state old, void (*state_changed) (void *data, enum pw_remote_state old,
enum pw_remote_state state, const char *error); enum pw_remote_state state, const char *error);
/** emited when a node was exported */ /** emited when a node was exported */
void (*exported) (void *data, uint32_t id); void (*exported) (void *data, uint32_t proxy_id, uint32_t global_id);
}; };
/** Specify the name of the protocol to use, default is using the native protocol */ /** Specify the name of the protocol to use, default is using the native protocol */
@ -203,7 +203,22 @@ struct pw_proxy *pw_remote_find_proxy(struct pw_remote *remote, uint32_t id);
int pw_remote_disconnect(struct pw_remote *remote); int pw_remote_disconnect(struct pw_remote *remote);
/** run a local node in a remote graph */ /** run a local node in a remote graph */
struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct pw_node *node); struct pw_proxy *pw_remote_export(struct pw_remote *remote, /**< the remote */
uint32_t type, /**< the type of object */
struct pw_properties *properties, /**< extra properties */
void *object /**< object to export */);
/** data for registering export functions */
struct pw_export_type {
struct spa_list link;
uint32_t type;
struct pw_proxy * (*func) (struct pw_remote *remote,
uint32_t type, struct pw_properties *properties, void *object);
};
/** register a type that can be exported on a remote. This is usually used by
* extension modules */
int pw_core_register_export_type(struct pw_core *core, struct pw_export_type *type);
#ifdef __cplusplus #ifdef __cplusplus
} }

View file

@ -41,7 +41,6 @@
#include "pipewire/pipewire.h" #include "pipewire/pipewire.h"
#include "pipewire/stream.h" #include "pipewire/stream.h"
#include "pipewire/private.h" #include "pipewire/private.h"
#include "extensions/client-node.h"
#define MAX_BUFFERS 64 #define MAX_BUFFERS 64
#define MIN_QUEUED 1 #define MIN_QUEUED 1
@ -195,8 +194,10 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu
uint32_t index, id; uint32_t index, id;
struct buffer *buffer; struct buffer *buffer;
if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < MIN_QUEUED) if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < MIN_QUEUED) {
errno = EPIPE;
return NULL; return NULL;
}
id = queue->ids[index & MASK_BUFFERS]; id = queue->ids[index & MASK_BUFFERS];
spa_ringbuffer_read_update(&queue->ring, index + 1); spa_ringbuffer_read_update(&queue->ring, index + 1);
@ -236,6 +237,8 @@ static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (id < impl->n_buffers) if (id < impl->n_buffers)
return &impl->buffers[id]; return &impl->buffers[id];
errno = EINVAL;
return NULL; return NULL;
} }
@ -806,6 +809,7 @@ static void proxy_destroy(void *_data)
struct pw_stream *stream = _data; struct pw_stream *stream = _data;
stream->proxy = NULL; stream->proxy = NULL;
spa_hook_remove(&stream->proxy_listener); spa_hook_remove(&stream->proxy_listener);
stream->node_id = SPA_ID_INVALID;
stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL); stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL);
} }
@ -821,6 +825,9 @@ static int handle_connect(struct pw_stream *stream)
pw_log_debug("stream %p: creating node", stream); pw_log_debug("stream %p: creating node", stream);
impl->node = pw_node_new(impl->core, stream->name, impl->node = pw_node_new(impl->core, stream->name,
pw_properties_copy(stream->properties), 0); pw_properties_copy(stream->properties), 0);
if (impl->node == NULL)
goto no_node;
impl->impl_node = impl_node; impl->impl_node = impl_node;
if (impl->direction == SPA_DIRECTION_INPUT) if (impl->direction == SPA_DIRECTION_INPUT)
@ -835,10 +842,21 @@ static int handle_connect(struct pw_stream *stream)
pw_node_set_active(impl->node, true); pw_node_set_active(impl->node, true);
pw_log_debug("stream %p: export node %p", stream, impl->node); pw_log_debug("stream %p: export node %p", stream, impl->node);
stream->proxy = pw_remote_export(stream->remote, impl->node); stream->proxy = pw_remote_export(stream->remote,
PW_TYPE_INTERFACE_Node, NULL, impl->node);
if (stream->proxy == NULL)
goto no_proxy;
pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream); pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream);
return 0; return 0;
no_node:
pw_log_error("stream %p: can't make node: %m", stream);
return -errno;
no_proxy:
pw_log_error("stream %p: can't make proxy: %m", stream);
return -errno;
} }
static void on_remote_state_changed(void *_data, enum pw_remote_state old, static void on_remote_state_changed(void *_data, enum pw_remote_state old,
@ -866,11 +884,14 @@ static void on_remote_state_changed(void *_data, enum pw_remote_state old,
break; break;
} }
} }
static void on_remote_exported(void *_data, uint32_t id)
static void on_remote_exported(void *_data, uint32_t proxy_id, uint32_t global_id)
{ {
struct pw_stream *stream = _data; struct pw_stream *stream = _data;
if (stream->proxy && stream->proxy->id == id) if (stream->proxy && stream->proxy->id == proxy_id) {
stream->node_id = global_id;
stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL); stream_set_state(stream, PW_STREAM_STATE_CONFIGURE, NULL);
}
} }
static const struct pw_remote_events remote_events = { static const struct pw_remote_events remote_events = {
@ -916,6 +937,7 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
this->remote = remote; this->remote = remote;
this->name = name ? strdup(name) : NULL; this->name = name ? strdup(name) : NULL;
this->node_id = SPA_ID_INVALID;
reset_props(&impl->props); reset_props(&impl->props);
@ -1146,9 +1168,7 @@ pw_stream_connect(struct pw_stream *stream,
uint32_t pw_stream_get_node_id(struct pw_stream *stream) uint32_t pw_stream_get_node_id(struct pw_stream *stream)
{ {
if (stream->proxy == NULL) return stream->node_id;
return SPA_ID_INVALID;
return stream->proxy->remote_id;
} }
int pw_stream_disconnect(struct pw_stream *stream) int pw_stream_disconnect(struct pw_stream *stream)
@ -1165,6 +1185,7 @@ int pw_stream_disconnect(struct pw_stream *stream)
if (stream->proxy) { if (stream->proxy) {
stream->proxy = NULL; stream->proxy = NULL;
spa_hook_remove(&stream->proxy_listener); spa_hook_remove(&stream->proxy_listener);
stream->node_id = SPA_ID_INVALID;
} }
stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL); stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL);
return 0; return 0;
@ -1277,6 +1298,7 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
pw_log_trace("stream %p: no more buffers", stream); pw_log_trace("stream %p: no more buffers", stream);
call_trigger(impl); call_trigger(impl);
errno = EPIPE;
return NULL; return NULL;
} }
pw_log_trace("stream %p: dequeue buffer %d", stream, b->id); pw_log_trace("stream %p: dequeue buffer %d", stream, b->id);

View file

@ -34,6 +34,7 @@ extern "C" {
enum { enum {
PW_TYPE_FIRST = SPA_TYPE_VENDOR_PipeWire, PW_TYPE_FIRST = SPA_TYPE_VENDOR_PipeWire,
PW_TYPE_INTERFACE_START = PW_TYPE_FIRST + SPA_TYPE_INTERFACE_START,
PW_TYPE_INTERFACE_Core, PW_TYPE_INTERFACE_Core,
PW_TYPE_INTERFACE_Registry, PW_TYPE_INTERFACE_Registry,
PW_TYPE_INTERFACE_Node, PW_TYPE_INTERFACE_Node,
@ -42,9 +43,12 @@ enum {
PW_TYPE_INTERFACE_Link, PW_TYPE_INTERFACE_Link,
PW_TYPE_INTERFACE_Client, PW_TYPE_INTERFACE_Client,
PW_TYPE_INTERFACE_Module, PW_TYPE_INTERFACE_Module,
PW_TYPE_INTERFACE_ClientNode,
PW_TYPE_INTERFACE_Device, PW_TYPE_INTERFACE_Device,
/* extensions */
PW_TYPE_INTERFACE_EXTENSIONS = PW_TYPE_INTERFACE_START + 0x1000,
PW_TYPE_INTERFACE_ClientNode,
}; };
#define PW_TYPE_INFO_BASE "PipeWire:" #define PW_TYPE_INFO_BASE "PipeWire:"

View file

@ -40,7 +40,7 @@ static void test_abi(void)
void (*destroy) (void *data); void (*destroy) (void *data);
void (*state_changed) (void *data, enum pw_remote_state old, void (*state_changed) (void *data, enum pw_remote_state old,
enum pw_remote_state state, const char *error); enum pw_remote_state state, const char *error);
void (*exported) (void *data, uint32_t id); void (*exported) (void *data, uint32_t proxy_id, uint32_t remote_id);
} test = { PW_VERSION_REMOTE_EVENTS, NULL }; } test = { PW_VERSION_REMOTE_EVENTS, NULL };
TEST_FUNC(ev, test, destroy); TEST_FUNC(ev, test, destroy);
@ -70,7 +70,7 @@ static void remote_state_changed_error(void *data, enum pw_remote_state old,
{ {
spa_assert_not_reached(); spa_assert_not_reached();
} }
static void remote_exported_error(void *data, uint32_t id) static void remote_exported_error(void *data, uint32_t proxy_id, uint32_t global_id)
{ {
spa_assert_not_reached(); spa_assert_not_reached();
} }

View file

@ -1149,7 +1149,7 @@ static bool do_export_node(struct data *data, const char *cmd, char *args, char
return false; return false;
} }
node = pw_global_get_object(global); node = pw_global_get_object(global);
proxy = pw_remote_export(rd->remote, node); proxy = pw_remote_export(rd->remote, PW_TYPE_INTERFACE_Node, NULL, node);
id = pw_map_insert_new(&data->vars, proxy); id = pw_map_insert_new(&data->vars, proxy);
fprintf(stdout, "%d = @proxy:%d\n", id, pw_proxy_get_id((struct pw_proxy*)proxy)); fprintf(stdout, "%d = @proxy:%d\n", id, pw_proxy_get_id((struct pw_proxy*)proxy));