From 28ae844de92d7cbae769caee10482349e9072014 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 25 Aug 2017 18:59:01 +0200 Subject: [PATCH] node: always use a spa_node as the implementation Avoid duplicating API, remove implementations from port and node and always use an spa_node as the implementation, it's just as easy to implement a spa node. With the implementation always being a spa_node we will be able to better implement the negotiation of the mixers. --- src/examples/export-sink.c | 85 +++++--- src/examples/export-v4l2.c | 1 + src/examples/local-v4l2.c | 80 +++++--- src/modules/module-jack/jack-node.c | 272 +++++++++++++++++--------- src/modules/spa/module-node-factory.c | 1 + src/modules/spa/spa-node.c | 214 +------------------- src/pipewire/core.c | 12 +- src/pipewire/link.c | 192 ++++++++++-------- src/pipewire/node.c | 206 ++++++++++--------- src/pipewire/node.h | 35 +--- src/pipewire/port.c | 102 ++-------- src/pipewire/port.h | 67 +------ src/pipewire/private.h | 29 +-- src/pipewire/remote.c | 13 +- 14 files changed, 548 insertions(+), 761 deletions(-) diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c index 2a9b34bed..62061152b 100644 --- a/src/examples/export-sink.c +++ b/src/examples/export-sink.c @@ -82,6 +82,9 @@ struct data { struct pw_port *port; struct spa_port_info port_info; + struct spa_node impl_node; + const struct spa_node_callbacks *callbacks; + void *callbacks_data; struct spa_port_io *io; uint8_t buffer[1024]; @@ -182,19 +185,34 @@ static Uint32 id_to_sdl_format(struct data *data, uint32_t id) SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) -static int impl_port_set_io(void *data, struct spa_port_io *io) +static int impl_send_command(struct spa_node *node, const struct spa_command *command) { - struct data *d = data; + return SPA_RESULT_OK; +} + +static int impl_set_callbacks(struct spa_node *node, + const struct spa_node_callbacks *callbacks, void *data) +{ + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); + d->callbacks = callbacks; + d->callbacks_data = data; + return SPA_RESULT_OK; +} + +static int impl_port_set_io(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_port_io *io) +{ + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); d->io = io; return SPA_RESULT_OK; } -static int impl_port_enum_formats(void *data, +static int impl_port_enum_formats(struct spa_node *node, enum spa_direction direction, uint32_t port_id, struct spa_format **format, const struct spa_format *filter, - int32_t index) + uint32_t index) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); const struct spa_format *formats[1]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(d->buffer, sizeof(d->buffer)); struct spa_pod_frame f[2]; @@ -247,9 +265,10 @@ static int impl_port_enum_formats(void *data, return SPA_RESULT_OK; } -static int impl_port_set_format(void *data, uint32_t flags, const struct spa_format *format) +static int impl_port_set_format(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t flags, const struct spa_format *format) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct pw_type *t = d->t; struct spa_pod_builder b = { NULL }; struct spa_pod_frame f[2]; @@ -298,9 +317,10 @@ static int impl_port_set_format(void *data, uint32_t flags, const struct spa_for return SPA_RESULT_OK; } -static int impl_port_get_format(void *data, const struct spa_format **format) +static int impl_port_get_format(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_format **format) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct spa_pod_builder b = SPA_POD_BUILDER_INIT(d->buffer, sizeof(d->buffer)); struct spa_pod_frame f[2]; @@ -318,9 +338,10 @@ static int impl_port_get_format(void *data, const struct spa_format **format) return SPA_RESULT_OK; } -static int impl_port_get_info(void *data, const struct spa_port_info **info) +static int impl_port_get_info(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_port_info **info) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); d->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; d->port_info.rate = 0; @@ -331,9 +352,10 @@ static int impl_port_get_info(void *data, const struct spa_port_info **info) return SPA_RESULT_OK; } -static int impl_port_enum_params(void *data, uint32_t index, struct spa_param **param) +static int impl_port_enum_params(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t index, struct spa_param **param) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); if (index >= 2) return SPA_RESULT_ENUM_END; @@ -343,9 +365,10 @@ static int impl_port_enum_params(void *data, uint32_t index, struct spa_param ** return SPA_RESULT_OK; } -static int impl_port_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_buffers) +static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_buffer **buffers, uint32_t n_buffers) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); int i; for (i = 0; i < n_buffers; i++) d->buffers[i] = buffers[i]; @@ -353,20 +376,9 @@ static int impl_port_use_buffers(void *data, struct spa_buffer **buffers, uint32 return SPA_RESULT_OK; } -static const struct pw_port_implementation impl_port = { - PW_VERSION_PORT_IMPLEMENTATION, - .set_io = impl_port_set_io, - .enum_formats = impl_port_enum_formats, - .set_format = impl_port_set_format, - .get_format = impl_port_get_format, - .get_info = impl_port_get_info, - .enum_params = impl_port_enum_params, - .use_buffers = impl_port_use_buffers, -}; - -static int impl_node_process_input(void *data) +static int impl_node_process_input(struct spa_node *node) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct spa_buffer *buf; uint8_t *map; void *sdata, *ddata; @@ -417,8 +429,17 @@ static int impl_node_process_input(void *data) return SPA_RESULT_NEED_BUFFER; } -static const struct pw_node_implementation impl_node = { - PW_VERSION_NODE_IMPLEMENTATION, +static const struct spa_node impl_node = { + SPA_VERSION_NODE, + .set_callbacks = impl_set_callbacks, + .send_command = impl_send_command, + .port_set_io = impl_port_set_io, + .port_enum_formats = impl_port_enum_formats, + .port_set_format = impl_port_set_format, + .port_get_format = impl_port_get_format, + .port_get_info = impl_port_get_info, + .port_enum_params = impl_port_enum_params, + .port_use_buffers = impl_port_use_buffers, .process_input = impl_node_process_input, }; @@ -432,10 +453,10 @@ static void make_node(struct data *data) NULL); data->node = pw_node_new(data->core, NULL, NULL, "SDL-sink", props, 0); - pw_node_set_implementation(data->node, &impl_node, data); + data->impl_node = impl_node; + pw_node_set_implementation(data->node, &data->impl_node); data->port = pw_port_new(PW_DIRECTION_INPUT, 0, NULL, 0); - pw_port_set_implementation(data->port, &impl_port, data); pw_port_add(data->port, data->node); pw_node_register(data->node); diff --git a/src/examples/export-v4l2.c b/src/examples/export-v4l2.c index e60c16065..a09f4b8d9 100644 --- a/src/examples/export-v4l2.c +++ b/src/examples/export-v4l2.c @@ -95,6 +95,7 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo break; case PW_REMOTE_STATE_CONNECTED: + printf("remote state: \"%s\"\n", pw_remote_state_as_string(state)); make_node(data); break; diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index 79b340b9f..d281f676d 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -83,8 +83,12 @@ struct data { struct pw_link *link; + struct spa_node impl_node; struct spa_port_io *io; + const struct spa_node_callbacks *callbacks; + void *callbacks_data; + uint8_t buffer[1024]; struct spa_video_info_raw format; @@ -183,19 +187,34 @@ static Uint32 id_to_sdl_format(struct data *data, uint32_t id) SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) -static int impl_port_set_io(void *data, struct spa_port_io *io) +static int impl_send_command(struct spa_node *node, const struct spa_command *command) { - struct data *d = data; + return SPA_RESULT_OK; +} + +static int impl_set_callbacks(struct spa_node *node, + const struct spa_node_callbacks *callbacks, void *data) +{ + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); + d->callbacks = callbacks; + d->callbacks_data = data; + return SPA_RESULT_OK; +} + +static int impl_port_set_io(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_port_io *io) +{ + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); d->io = io; return SPA_RESULT_OK; } -static int impl_port_enum_formats(void *data, +static int impl_port_enum_formats(struct spa_node *node, enum spa_direction direction, uint32_t port_id, struct spa_format **format, const struct spa_format *filter, - int32_t index) + uint32_t index) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); const struct spa_format *formats[1]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(d->buffer, sizeof(d->buffer)); struct spa_pod_frame f[2]; @@ -248,9 +267,10 @@ static int impl_port_enum_formats(void *data, return SPA_RESULT_OK; } -static int impl_port_set_format(void *data, uint32_t flags, const struct spa_format *format) +static int impl_port_set_format(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t flags, const struct spa_format *format) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct pw_type *t = d->t; struct spa_pod_builder b = { NULL }; struct spa_pod_frame f[2]; @@ -300,9 +320,10 @@ static int impl_port_set_format(void *data, uint32_t flags, const struct spa_for return SPA_RESULT_OK; } -static int impl_port_get_info(void *data, const struct spa_port_info **info) +static int impl_port_get_info(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_port_info **info) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); d->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; d->port_info.rate = 0; @@ -313,9 +334,10 @@ static int impl_port_get_info(void *data, const struct spa_port_info **info) return SPA_RESULT_OK; } -static int impl_port_enum_params(void *data, uint32_t index, struct spa_param **param) +static int impl_port_enum_params(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t index, struct spa_param **param) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); if (index >= 2) return SPA_RESULT_ENUM_END; @@ -325,9 +347,10 @@ static int impl_port_enum_params(void *data, uint32_t index, struct spa_param ** return SPA_RESULT_OK; } -static int impl_port_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_buffers) +static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_buffer **buffers, uint32_t n_buffers) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); int i; for (i = 0; i < n_buffers; i++) d->buffers[i] = buffers[i]; @@ -335,19 +358,9 @@ static int impl_port_use_buffers(void *data, struct spa_buffer **buffers, uint32 return SPA_RESULT_OK; } -static const struct pw_port_implementation impl_port = { - PW_VERSION_PORT_IMPLEMENTATION, - .set_io = impl_port_set_io, - .enum_formats = impl_port_enum_formats, - .set_format = impl_port_set_format, - .get_info = impl_port_get_info, - .enum_params = impl_port_enum_params, - .use_buffers = impl_port_use_buffers, -}; - -static int impl_node_process_input(void *data) +static int impl_node_process_input(struct spa_node *node) { - struct data *d = data; + struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node); struct spa_buffer *buf; uint8_t *map; void *sdata, *ddata; @@ -398,8 +411,17 @@ static int impl_node_process_input(void *data) return SPA_RESULT_NEED_BUFFER; } -static const struct pw_node_implementation impl_node = { - PW_VERSION_NODE_IMPLEMENTATION, +static const struct spa_node impl_node = { + SPA_VERSION_NODE, + NULL, + .send_command = impl_send_command, + .set_callbacks = impl_set_callbacks, + .port_set_io = impl_port_set_io, + .port_enum_formats = impl_port_enum_formats, + .port_set_format = impl_port_set_format, + .port_get_info = impl_port_get_info, + .port_enum_params = impl_port_enum_params, + .port_use_buffers = impl_port_use_buffers, .process_input = impl_node_process_input, }; @@ -409,10 +431,10 @@ static void make_nodes(struct data *data) struct pw_properties *props; data->node = pw_node_new(data->core, NULL, NULL, "SDL-sink", NULL, 0); - pw_node_set_implementation(data->node, &impl_node, data); + data->impl_node = impl_node; + pw_node_set_implementation(data->node, &data->impl_node); data->port = pw_port_new(PW_DIRECTION_INPUT, 0, NULL, 0); - pw_port_set_implementation(data->port, &impl_port, data); pw_port_add(data->port, data->node); pw_node_register(data->node); diff --git a/src/modules/module-jack/jack-node.c b/src/modules/module-jack/jack-node.c index a1decb54f..008136f72 100644 --- a/src/modules/module-jack/jack-node.c +++ b/src/modules/module-jack/jack-node.c @@ -77,6 +77,8 @@ struct node_data { struct spa_hook_list listener_list; + struct spa_node node_impl; + struct port_data *port_data[2][PORT_NUM_FOR_CLIENT]; int port_count[2]; int status; @@ -117,27 +119,76 @@ struct port_data { /** \endcond */ -static int node_get_props(void *data, struct spa_props **props) +static int node_get_props(struct spa_node *node, struct spa_props **props) { return SPA_RESULT_NOT_IMPLEMENTED; } -static int node_set_props(void *data, const struct spa_props *props) +static int node_set_props(struct spa_node *node, const struct spa_props *props) { return SPA_RESULT_NOT_IMPLEMENTED; } -static int node_send_command(void *data, +static int node_send_command(struct spa_node *node, const struct spa_command *command) { return SPA_RESULT_OK; } -static struct pw_port* node_add_port(void *data, - enum pw_direction direction, - uint32_t port_id) +static int node_set_callbacks(struct spa_node *node, + const struct spa_node_callbacks *callbacks, void *data) { - return NULL; + return SPA_RESULT_OK; +} + +static int node_get_n_ports(struct spa_node *node, + uint32_t *n_input_ports, + uint32_t *max_input_ports, + uint32_t *n_output_ports, + uint32_t *max_output_ports) +{ + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + + if (n_input_ports) + *n_input_ports = nd->port_count[SPA_DIRECTION_INPUT]; + if (max_input_ports) + *max_input_ports = PORT_NUM_FOR_CLIENT / 2; + if (n_output_ports) + *n_output_ports = nd->port_count[SPA_DIRECTION_OUTPUT]; + if (max_output_ports) + *max_output_ports = PORT_NUM_FOR_CLIENT / 2; + + return SPA_RESULT_OK; +} + +static int node_get_port_ids(struct spa_node *node, + uint32_t n_input_ports, + uint32_t *input_ids, + uint32_t n_output_ports, + uint32_t *output_ids) +{ + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + int i, c; + + for (c = i = 0; i < PORT_NUM_FOR_CLIENT && c < n_input_ports; i++) { + if (nd->port_data[SPA_DIRECTION_INPUT][i]) + input_ids[c++] = nd->port_data[SPA_DIRECTION_INPUT][i]->port.port->port_id; + } + for (c = i = 0; i < PORT_NUM_FOR_CLIENT && c < n_output_ports; i++) { + if (nd->port_data[SPA_DIRECTION_OUTPUT][i]) + output_ids[c++] = nd->port_data[SPA_DIRECTION_OUTPUT][i]->port.port->port_id; + } + return SPA_RESULT_OK; +} + +static int node_add_port(struct spa_node *node, enum spa_direction direction, uint32_t port_id) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static int node_remove_port(struct spa_node *node, enum spa_direction direction, uint32_t port_id) +{ + return SPA_RESULT_NOT_IMPLEMENTED; } static struct buffer *buffer_dequeue(struct pw_jack_node *this, struct port_data *pd) @@ -160,7 +211,7 @@ static void recycle_buffer(struct pw_jack_node *this, struct port_data *pd, uint spa_list_append(&pd->empty, &b->link); } -static int driver_process_input(void *data) +static int driver_process_input(struct spa_node *node) { return SPA_RESULT_NOT_IMPLEMENTED; } @@ -193,11 +244,11 @@ static void add_f32(float *out, float *in, int n_samples) out[i] += in[i]; } -static int driver_process_output(void *data) +static int driver_process_output(struct spa_node *node) { - struct node_data *nd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); struct pw_jack_node *this = &nd->node; - struct spa_graph_node *node = &this->node->rt.node; + struct spa_graph_node *gn = &this->node->rt.node; struct spa_graph_port *p; struct port_data *opd = SPA_CONTAINER_OF(this->driver_out, struct port_data, port); struct spa_port_io *out_io = opd->io; @@ -226,7 +277,7 @@ static int driver_process_output(void *data) spa_hook_list_call(&nd->listener_list, struct pw_jack_node_events, pull); - spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + spa_list_for_each(p, &gn->ports[SPA_DIRECTION_INPUT], link) { struct pw_port *port = p->callbacks_data; struct port_data *ipd = pw_port_get_user_data(port); struct spa_port_io *in_io = ipd->io; @@ -246,26 +297,16 @@ static int driver_process_output(void *data) out->outbuf->datas[0].chunk->size = ctrl->buffer_size * sizeof(int16_t) * 2; spa_hook_list_call(&nd->listener_list, struct pw_jack_node_events, push); - node->ready_in = node->required_in; + gn->ready_in = gn->required_in; return SPA_RESULT_HAVE_BUFFER; } -static const struct pw_node_implementation driver_impl = { - PW_VERSION_NODE_IMPLEMENTATION, - .get_props = node_get_props, - .set_props = node_set_props, - .send_command = node_send_command, - .add_port = node_add_port, - .process_input = driver_process_input, - .process_output = driver_process_output, -}; - -static int node_process_input(void *data) +static int node_process_input(struct spa_node *node) { - struct node_data *nd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); struct pw_jack_node *this = &nd->node; - struct spa_graph_node *node = &this->node->rt.node; + struct spa_graph_node *gn = &this->node->rt.node; struct spa_graph_port *p; struct jack_server *server = this->server; struct jack_graph_manager *mgr = server->graph_manager; @@ -285,7 +326,7 @@ static int node_process_input(void *data) jack_activation_count_signal(&conn->input_counter[ref_num], &server->synchro_table[ref_num]); - spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + spa_list_for_each(p, &gn->ports[SPA_DIRECTION_OUTPUT], link) { struct pw_port *port = p->callbacks_data; struct port_data *opd = pw_port_get_user_data(port); struct spa_port_io *out_io = opd->io; @@ -296,15 +337,15 @@ static int node_process_input(void *data) return nd->status = SPA_RESULT_HAVE_BUFFER; } -static int node_process_output(void *data) +static int node_process_output(struct spa_node *node) { - struct node_data *nd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); struct pw_jack_node *this = &nd->node; - struct spa_graph_node *node = &this->node->rt.node; + struct spa_graph_node *gn = &this->node->rt.node; struct spa_graph_port *p; pw_log_trace(NAME " %p: process output", nd); - spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + spa_list_for_each(p, &gn->ports[SPA_DIRECTION_INPUT], link) { struct pw_port *port = p->callbacks_data; struct port_data *ipd = pw_port_get_user_data(port); struct spa_port_io *in_io = ipd->io; @@ -315,19 +356,12 @@ static int node_process_output(void *data) return nd->status = SPA_RESULT_NEED_BUFFER; } -static const struct pw_node_implementation node_impl = { - PW_VERSION_NODE_IMPLEMENTATION, - .get_props = node_get_props, - .set_props = node_set_props, - .send_command = node_send_command, - .add_port = node_add_port, - .process_input = node_process_input, - .process_output = node_process_output, -}; -static int port_set_io(void *data, struct spa_port_io *io) +static int port_set_io(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_port_io *io) { - struct port_data *pd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + struct port_data *pd = nd->port_data[direction][port_id]; pd->io = io; return SPA_RESULT_OK; } @@ -335,12 +369,13 @@ static int port_set_io(void *data, struct spa_port_io *io) #define PROP(f,key,type,...) \ SPA_POD_PROP (f,key,0,type,1,__VA_ARGS__) -static int port_enum_formats(void *data, +static int port_enum_formats(struct spa_node *node, enum spa_direction direction, uint32_t port_id, struct spa_format **format, const struct spa_format *filter, - int32_t index) + uint32_t index) { - struct port_data *pd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + struct port_data *pd = nd->port_data[direction][port_id]; struct type *t = &pd->node->type; struct spa_pod_builder b = { NULL, }; struct spa_pod_frame f[2]; @@ -378,23 +413,27 @@ static int port_enum_formats(void *data, return SPA_RESULT_OK; } -static int port_set_format(void *data, uint32_t flags, const struct spa_format *format) +static int port_set_format(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t flags, const struct spa_format *format) { return SPA_RESULT_OK; } -static int port_get_format(void *data, const struct spa_format **format) +static int port_get_format(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_format **format) { int res; struct spa_format *fmt; - res = port_enum_formats(data, &fmt, NULL, 0); + res = port_enum_formats(node, direction, port_id, &fmt, NULL, 0); *format = fmt; return res; } -static int port_get_info(void *data, const struct spa_port_info **info) +static int port_get_info(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_port_info **info) { - struct port_data *pd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + struct port_data *pd = nd->port_data[direction][port_id]; struct pw_jack_port *port = &pd->port; pd->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | SPA_PORT_INFO_FLAG_LIVE; @@ -407,19 +446,23 @@ static int port_get_info(void *data, const struct spa_port_info **info) return SPA_RESULT_OK; } -static int port_enum_params(void *data, uint32_t index, struct spa_param **param) +static int port_enum_params(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + uint32_t index, struct spa_param **param) { return SPA_RESULT_ENUM_END; } -static int port_set_param(void *data, struct spa_param *param) +static int port_set_param(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_param *param) { return SPA_RESULT_OK; } -static int port_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_buffers) +static int port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_buffer **buffers, uint32_t n_buffers) { - struct port_data *pd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + struct port_data *pd = nd->port_data[direction][port_id]; struct type *t = &pd->node->type; int i; @@ -448,11 +491,12 @@ static int port_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_ return SPA_RESULT_OK; } -static int port_alloc_buffers(void *data, +static int port_alloc_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, struct spa_param **params, uint32_t n_params, struct spa_buffer **buffers, uint32_t *n_buffers) { - struct port_data *pd = data; + struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node_impl); + struct port_data *pd = nd->port_data[direction][port_id]; struct type *t = &pd->node->type; int i; @@ -473,29 +517,67 @@ static int port_alloc_buffers(void *data, return SPA_RESULT_OK; } -static int port_reuse_buffer(void *data, uint32_t buffer_id) +static int port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id) { return SPA_RESULT_OK; } -static int port_send_command(void *data, struct spa_command *command) +static int port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + const struct spa_command *command) { return SPA_RESULT_OK; } -static const struct pw_port_implementation port_impl = { - PW_VERSION_PORT_IMPLEMENTATION, - .set_io = port_set_io, - .enum_formats = port_enum_formats, - .set_format = port_set_format, - .get_format = port_get_format, - .get_info = port_get_info, - .enum_params = port_enum_params, - .set_param = port_set_param, - .use_buffers = port_use_buffers, - .alloc_buffers = port_alloc_buffers, - .reuse_buffer = port_reuse_buffer, - .send_command = port_send_command, +static const struct spa_node driver_impl = { + SPA_VERSION_NODE, + NULL, + .get_props = node_get_props, + .set_props = node_set_props, + .send_command = node_send_command, + .set_callbacks = node_set_callbacks, + .get_n_ports = node_get_n_ports, + .get_port_ids = node_get_port_ids, + .add_port = node_add_port, + .remove_port = node_remove_port, + .port_enum_formats = port_enum_formats, + .port_set_format = port_set_format, + .port_get_format = port_get_format, + .port_get_info = port_get_info, + .port_enum_params = port_enum_params, + .port_set_param = port_set_param, + .port_use_buffers = port_use_buffers, + .port_alloc_buffers = port_alloc_buffers, + .port_set_io = port_set_io, + .port_reuse_buffer = port_reuse_buffer, + .port_send_command = port_send_command, + .process_input = driver_process_input, + .process_output = driver_process_output, +}; + +static const struct spa_node node_impl = { + SPA_VERSION_NODE, + NULL, + .get_props = node_get_props, + .set_props = node_set_props, + .send_command = node_send_command, + .set_callbacks = node_set_callbacks, + .get_n_ports = node_get_n_ports, + .get_port_ids = node_get_port_ids, + .add_port = node_add_port, + .remove_port = node_remove_port, + .port_enum_formats = port_enum_formats, + .port_set_format = port_set_format, + .port_get_format = port_get_format, + .port_get_info = port_get_info, + .port_enum_params = port_enum_params, + .port_set_param = port_set_param, + .port_use_buffers = port_use_buffers, + .port_alloc_buffers = port_alloc_buffers, + .port_set_io = port_set_io, + .port_reuse_buffer = port_reuse_buffer, + .port_send_command = port_send_command, + .process_input = node_process_input, + .process_output = node_process_output, }; static int schedule_mix_input(void *data) @@ -549,7 +631,6 @@ static int schedule_mix_output(void *data) return SPA_RESULT_NEED_BUFFER; } - static const struct spa_graph_node_callbacks schedule_mix_node = { SPA_VERSION_GRAPH_NODE_CALLBACKS, schedule_mix_input, @@ -586,33 +667,33 @@ static void port_destroy(void *data) static void port_free(void *data) { struct port_data *pd = data; + struct node_data *nd = pd->node; + struct pw_port *port = pd->port.port; + + nd->port_data[port->direction][port->port_id] = NULL; + nd->port_count[port->direction]--; + spa_hook_list_call(&pd->listener_list, struct pw_jack_port_events, free); } -static void port_link_added(void *data, struct pw_link *link) -{ -} - -static void port_link_removed(void *data, struct pw_link *link) -{ -} - static const struct pw_port_events port_events = { PW_VERSION_PORT_EVENTS, .destroy = port_destroy, .free = port_free, - .link_added = port_link_added, - .link_removed = port_link_removed, }; struct pw_jack_port * -alloc_port(struct pw_jack_node *node, enum pw_direction direction, - uint32_t port_id, size_t user_data_size) +alloc_port(struct pw_jack_node *node, enum pw_direction direction, size_t user_data_size) { struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node); struct pw_port *p; struct port_data *pd; struct pw_jack_port *port; + uint32_t port_id; + + port_id = pw_node_get_free_port_id(node->node, direction); + if (port_id == SPA_ID_INVALID) + return NULL; p = pw_port_new(direction, port_id, NULL, sizeof(struct port_data) + user_data_size); if (p == NULL) @@ -623,9 +704,11 @@ alloc_port(struct pw_jack_node *node, enum pw_direction direction, spa_hook_list_init(&pd->listener_list); spa_list_init(&pd->empty); + nd->port_data[direction][port_id] = pd; + nd->port_count[direction]++; + port = &pd->port; port->node = node; - port->direction = direction; port->port = p; if (user_data_size > 0) @@ -633,8 +716,6 @@ alloc_port(struct pw_jack_node *node, enum pw_direction direction, pw_port_add_listener(p, &pd->port_listener, &port_events, pd); - pw_port_set_implementation(p, &port_impl, pd); - return port; } @@ -645,7 +726,6 @@ pw_jack_node_add_port(struct pw_jack_node *node, unsigned int flags, size_t user_data_size) { - struct node_data *nd = SPA_CONTAINER_OF(node, struct node_data, node); struct jack_server *server = node->server; struct jack_graph_manager *mgr = server->graph_manager; struct jack_connection_manager *conn; @@ -672,7 +752,7 @@ pw_jack_node_add_port(struct pw_jack_node *node, return NULL; } - port = alloc_port(node, direction, nd->port_count[direction]++, user_data_size); + port = alloc_port(node, direction, user_data_size); if (port == NULL) return NULL; @@ -689,6 +769,8 @@ pw_jack_node_add_port(struct pw_jack_node *node, jack_connection_manager_add_outport(conn, ref_num, port_id); jack_graph_manager_next_stop(mgr); + pw_port_add(port->port, node->node); + { struct spa_buffer *b = &pd->buf; struct type *t = &pd->node->type; @@ -710,11 +792,9 @@ pw_jack_node_add_port(struct pw_jack_node *node, port->port->state = PW_PORT_STATE_PAUSED; } if (direction == PW_DIRECTION_INPUT) { - port->port->mix = port; spa_graph_node_set_callbacks(&port->port->rt.mix_node, &schedule_mix_node, port); } - pw_port_add(port->port, node->node); return port; } @@ -796,9 +876,10 @@ struct pw_jack_node *pw_jack_node_new(struct pw_core *core, nd = pw_node_get_user_data(node); spa_hook_list_init(&nd->listener_list); init_type(&nd->type, pw_core_get_type(core)->map); + nd->node_impl = node_impl; pw_node_add_listener(node, &nd->node_listener, &node_events, nd); - pw_node_set_implementation(node, &node_impl, nd); + pw_node_set_implementation(node, &nd->node_impl); this = &nd->node; pw_log_debug("jack-node %p: new", this); @@ -869,9 +950,10 @@ pw_jack_driver_new(struct pw_core *core, nd = pw_node_get_user_data(node); spa_hook_list_init(&nd->listener_list); init_type(&nd->type, pw_core_get_type(core)->map); + nd->node_impl = driver_impl; pw_node_add_listener(node, &nd->node_listener, &node_events, nd); - pw_node_set_implementation(node, &driver_impl, nd); + pw_node_set_implementation(node, &nd->node_impl); this = &nd->node; this->node = node; @@ -907,11 +989,11 @@ pw_jack_driver_new(struct pw_core *core, jack_graph_manager_next_stop(mgr); if (n_capture_channels > 0) { - this->driver_in = alloc_port(this, PW_DIRECTION_INPUT, 0, 0); + this->driver_in = alloc_port(this, PW_DIRECTION_INPUT, 0); pw_port_add(this->driver_in->port, node); } if (n_playback_channels > 0) { - this->driver_out = alloc_port(this, PW_DIRECTION_OUTPUT, 0, 0); + this->driver_out = alloc_port(this, PW_DIRECTION_OUTPUT, 0); pw_port_add(this->driver_out->port, node); } pw_node_register(node); diff --git a/src/modules/spa/module-node-factory.c b/src/modules/spa/module-node-factory.c index e5037e71e..fed29d4ab 100644 --- a/src/modules/spa/module-node-factory.c +++ b/src/modules/spa/module-node-factory.c @@ -98,6 +98,7 @@ static bool module_init(struct pw_module *module, struct pw_properties *properti data = pw_node_factory_get_user_data(factory); data->this = factory; + data->core = core; data->properties = properties; pw_log_debug("module %p: new", module); diff --git a/src/modules/spa/spa-node.c b/src/modules/spa/spa-node.c index 416b34393..997a9d5d3 100644 --- a/src/modules/spa/spa-node.c +++ b/src/modules/spa/spa-node.c @@ -48,121 +48,16 @@ struct impl { struct spa_hook node_listener; }; -struct port { - struct pw_port *port; - enum pw_direction direction; - uint32_t port_id; - struct spa_node *node; -}; - -static int port_impl_set_io(void *data, struct spa_port_io *io) -{ - struct port *p = data; - return spa_node_port_set_io(p->node, p->direction, p->port_id, io); -} - -static int port_impl_enum_formats(void *data, - struct spa_format **format, - const struct spa_format *filter, - int32_t index) -{ - struct port *p = data; - return spa_node_port_enum_formats(p->node, p->direction, p->port_id, format, filter, index); -} - -static int port_impl_set_format(void *data, uint32_t flags, const struct spa_format *format) -{ - struct port *p = data; - return spa_node_port_set_format(p->node, p->direction, p->port_id, flags, format); -} - -static int port_impl_get_format(void *data, const struct spa_format **format) -{ - struct port *p = data; - return spa_node_port_get_format(p->node, p->direction, p->port_id, format); -} - -static int port_impl_get_info(void *data, const struct spa_port_info **info) -{ - struct port *p = data; - return spa_node_port_get_info(p->node, p->direction, p->port_id, info); -} - -static int port_impl_enum_params(void *data, uint32_t index, struct spa_param **param) -{ - struct port *p = data; - return spa_node_port_enum_params(p->node, p->direction, p->port_id, index, param); -} - -static int port_impl_set_param(void *data, struct spa_param *param) -{ - struct port *p = data; - return spa_node_port_set_param(p->node, p->direction, p->port_id, param); -} - -static int port_impl_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_buffers) -{ - struct port *p = data; - return spa_node_port_use_buffers(p->node, p->direction, p->port_id, buffers, n_buffers); -} - -static int port_impl_alloc_buffers(void *data, - struct spa_param **params, uint32_t n_params, - struct spa_buffer **buffers, uint32_t *n_buffers) -{ - struct port *p = data; - return spa_node_port_alloc_buffers(p->node, p->direction, p->port_id, - params, n_params, buffers, n_buffers); -} - -static int port_impl_reuse_buffer(void *data, uint32_t buffer_id) -{ - struct port *p = data; - return spa_node_port_reuse_buffer(p->node, p->port_id, buffer_id); -} - -static int port_impl_send_command(void *data, struct spa_command *command) -{ - struct port *p = data; - return spa_node_port_send_command(p->node, - p->direction, - p->port_id, - command); -} - -const struct pw_port_implementation port_impl = { - PW_VERSION_PORT_IMPLEMENTATION, - .set_io = port_impl_set_io, - .enum_formats = port_impl_enum_formats, - .set_format = port_impl_set_format, - .get_format = port_impl_get_format, - .get_info = port_impl_get_info, - .enum_params = port_impl_enum_params, - .set_param = port_impl_set_param, - .use_buffers = port_impl_use_buffers, - .alloc_buffers = port_impl_alloc_buffers, - .reuse_buffer = port_impl_reuse_buffer, - .send_command = port_impl_send_command, -}; - static struct pw_port * make_port(struct impl *impl, enum pw_direction direction, uint32_t port_id) { struct pw_node *node = impl->this; struct pw_port *port; - struct port *p; - port = pw_port_new(direction, port_id, NULL, sizeof(struct port)); + port = pw_port_new(direction, port_id, NULL, 0); if (port == NULL) return NULL; - p = pw_port_get_user_data(port); - p->port = port; - p->direction = direction; - p->port_id = port_id; - p->node = impl->node; - - pw_port_set_implementation(port, &port_impl, p); pw_port_add(port, node); return port; @@ -200,62 +95,6 @@ static void update_port_ids(struct impl *impl) } -static int node_impl_get_props(void *data, struct spa_props **props) -{ - struct impl *impl = data; - return spa_node_get_props(impl->node, props); -} - -static int node_impl_set_props(void *data, const struct spa_props *props) -{ - struct impl *impl = data; - return spa_node_set_props(impl->node, props); -} - -static int node_impl_send_command(void *data, const struct spa_command *command) -{ - struct impl *impl = data; - return spa_node_send_command(impl->node, command); -} - -static struct pw_port* -node_impl_add_port(void *data, - enum pw_direction direction, - uint32_t port_id) -{ - struct impl *impl = data; - int res; - - if ((res = spa_node_add_port(impl->node, direction, port_id)) < 0) { - pw_log_error("node %p: could not add port %d %d", impl->this, port_id, res); - return NULL; - } - - return make_port(impl, direction, port_id); -} - -static int node_impl_process_input(void *data) -{ - struct impl *impl = data; - return spa_node_process_input(impl->node); -} - -static int node_impl_process_output(void *data) -{ - struct impl *impl = data; - return spa_node_process_output(impl->node); -} - -static const struct pw_node_implementation node_impl = { - PW_VERSION_NODE_IMPLEMENTATION, - .get_props = node_impl_get_props, - .set_props = node_impl_set_props, - .send_command = node_impl_send_command, - .add_port = node_impl_add_port, - .process_input = node_impl_process_input, - .process_output = node_impl_process_output, -}; - static void pw_spa_node_destroy(void *data) { struct impl *impl = data; @@ -280,7 +119,7 @@ static void complete_init(struct impl *impl) pw_node_register(this); } -static void on_node_done(void *data, int seq, int res) +static void on_node_done(void *data, uint32_t seq, int res) { struct impl *impl = data; struct pw_node *this = impl->this; @@ -289,54 +128,13 @@ static void on_node_done(void *data, int seq, int res) complete_init(impl); impl->async_init = false; } - pw_log_debug("spa-node %p: async complete event %d %d", this, seq, res); - spa_hook_list_call(pw_node_get_listeners(this), struct pw_node_events, async_complete, seq, res); } -static void on_node_event(void *data, struct spa_event *event) -{ - struct impl *impl = data; - struct pw_node *this = impl->this; - - spa_hook_list_call(pw_node_get_listeners(this), struct pw_node_events, event, event); -} - -static void on_node_need_input(void *data) -{ - struct impl *impl = data; - struct pw_node *this = impl->this; - spa_hook_list_call(pw_node_get_listeners(this), struct pw_node_events, need_input); -} - -static void on_node_have_output(void *data) -{ - struct impl *impl = data; - struct pw_node *this = impl->this; - spa_hook_list_call(pw_node_get_listeners(this), struct pw_node_events, have_output); -} - -static void -on_node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id) -{ - struct impl *impl = data; - struct pw_node *this = impl->this; - spa_hook_list_call(pw_node_get_listeners(this), struct pw_node_events, - reuse_buffer, port_id, buffer_id); -} - -static const struct spa_node_callbacks spa_node_callbacks = { - SPA_VERSION_NODE_CALLBACKS, - .done = on_node_done, - .event = on_node_event, - .need_input = on_node_need_input, - .have_output = on_node_have_output, - .reuse_buffer = on_node_reuse_buffer, -}; - static const struct pw_node_events node_events = { PW_VERSION_NODE_EVENTS, .destroy = pw_spa_node_destroy, + .async_complete = on_node_done, }; struct pw_node * @@ -379,11 +177,7 @@ pw_spa_node_new(struct pw_core *core, impl->async_init = async; pw_node_add_listener(this, &impl->node_listener, &node_events, impl); - - pw_node_set_implementation(this, &node_impl, impl); - - if (spa_node_set_callbacks(impl->node, &spa_node_callbacks, impl) < 0) - pw_log_warn("spa-node %p: error setting callback", this); + pw_node_set_implementation(this, node); if (!async) complete_init(impl); diff --git a/src/pipewire/core.c b/src/pipewire/core.c index 7a7ac3109..3f58ac62e 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -642,13 +642,15 @@ struct spa_format *pw_core_find_format(struct pw_core *core, if (in_state == PW_PORT_STATE_CONFIGURE && out_state > PW_PORT_STATE_CONFIGURE) { /* only input needs format */ - if ((res = pw_port_get_format(output, (const struct spa_format **) &format)) < 0) { + if ((res = spa_node_port_get_format(output->node->node, output->direction, output->port_id, + (const struct spa_format **) &format)) < 0) { asprintf(error, "error get output format: %d", res); goto error; } } else if (out_state == PW_PORT_STATE_CONFIGURE && in_state > PW_PORT_STATE_CONFIGURE) { /* only output needs format */ - if ((res = pw_port_get_format(input, (const struct spa_format **) &format)) < 0) { + if ((res = spa_node_port_get_format(input->node->node, input->direction, input->port_id, + (const struct spa_format **) &format)) < 0) { asprintf(error, "error get input format: %d", res); goto error; } @@ -656,7 +658,8 @@ struct spa_format *pw_core_find_format(struct pw_core *core, again: /* both ports need a format */ pw_log_debug("core %p: finding best format", core); - if ((res = pw_port_enum_formats(input, &filter, NULL, iidx)) < 0) { + if ((res = spa_node_port_enum_formats(input->node->node, input->direction, input->port_id, + &filter, NULL, iidx)) < 0) { if (res == SPA_RESULT_ENUM_END && iidx != 0) { asprintf(error, "error input enum formats: %d", res); goto error; @@ -666,7 +669,8 @@ struct spa_format *pw_core_find_format(struct pw_core *core, if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) spa_debug_format(filter); - if ((res = pw_port_enum_formats(output, &format, filter, oidx)) < 0) { + if ((res = spa_node_port_enum_formats(output->node->node, output->direction, output->port_id, + &format, filter, oidx)) < 0) { if (res == SPA_RESULT_ENUM_END) { oidx = 0; iidx++; diff --git a/src/pipewire/link.c b/src/pipewire/link.c index abb0956e4..e345f5a91 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -81,8 +81,10 @@ static void complete_ready(void *obj, void *data, int res, uint32_t id) if (SPA_RESULT_IS_OK(res)) { port->state = PW_PORT_STATE_READY; pw_log_debug("port %p: state READY", port); - } else + } else { + port->state = PW_PORT_STATE_ERROR; pw_log_warn("port %p: failed to go to READY", port); + } } static void complete_paused(void *obj, void *data, int res, uint32_t id) @@ -91,8 +93,10 @@ static void complete_paused(void *obj, void *data, int res, uint32_t id) if (SPA_RESULT_IS_OK(res)) { port->state = PW_PORT_STATE_PAUSED; pw_log_debug("port %p: state PAUSED", port); - } else + } else { + port->state = PW_PORT_STATE_ERROR; pw_log_warn("port %p: failed to go to PAUSED", port); + } } static void complete_streaming(void *obj, void *data, int res, uint32_t id) @@ -101,8 +105,10 @@ static void complete_streaming(void *obj, void *data, int res, uint32_t id) if (SPA_RESULT_IS_OK(res)) { port->state = PW_PORT_STATE_STREAMING; pw_log_debug("port %p: state STREAMING", port); - } else + } else { + port->state = PW_PORT_STATE_ERROR; pw_log_warn("port %p: failed to go to STREAMING", port); + } } static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_state) @@ -113,47 +119,51 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st char *error = NULL; struct pw_resource *resource; bool changed = true; + struct pw_port *input, *output; if (in_state != PW_PORT_STATE_CONFIGURE && out_state != PW_PORT_STATE_CONFIGURE) return SPA_RESULT_OK; pw_link_update_state(this, PW_LINK_STATE_NEGOTIATING, NULL); - format = pw_core_find_format(this->core, this->output, this->input, NULL, 0, NULL, &error); + input = this->input; + output = this->output; + + format = pw_core_find_format(this->core, output, input, NULL, 0, NULL, &error); if (format == NULL) goto error; format = spa_format_copy(format); - if (out_state > PW_PORT_STATE_CONFIGURE && this->output->node->info.state == PW_NODE_STATE_IDLE) { - if ((res = pw_port_get_format(this->output, - (const struct spa_format **) ¤t)) < 0) { + if (out_state > PW_PORT_STATE_CONFIGURE && output->node->info.state == PW_NODE_STATE_IDLE) { + if ((res = spa_node_port_get_format(output->node->node, output->direction, output->port_id, + (const struct spa_format **) ¤t)) < 0) { asprintf(&error, "error get output format: %d", res); goto error; } if (spa_format_compare(current, format) < 0) { pw_log_debug("link %p: output format change, renegotiate", this); - pw_node_set_state(this->output->node, PW_NODE_STATE_SUSPENDED); + pw_node_set_state(output->node, PW_NODE_STATE_SUSPENDED); out_state = PW_PORT_STATE_CONFIGURE; } else { - pw_node_update_state(this->output->node, PW_NODE_STATE_RUNNING, NULL); + pw_node_update_state(output->node, PW_NODE_STATE_RUNNING, NULL); changed = false; } } - if (in_state > PW_PORT_STATE_CONFIGURE && this->input->node->info.state == PW_NODE_STATE_IDLE) { - if ((res = pw_port_get_format(this->input, - (const struct spa_format **) ¤t)) < 0) { + if (in_state > PW_PORT_STATE_CONFIGURE && input->node->info.state == PW_NODE_STATE_IDLE) { + if ((res = spa_node_port_get_format(input->node->node, input->direction, input->port_id, + (const struct spa_format **) ¤t)) < 0) { asprintf(&error, "error get input format: %d", res); goto error; } if (spa_format_compare(current, format) < 0) { pw_log_debug("link %p: input format change, renegotiate", this); - pw_node_set_state(this->input->node, PW_NODE_STATE_SUSPENDED); + pw_node_set_state(input->node, PW_NODE_STATE_SUSPENDED); in_state = PW_PORT_STATE_CONFIGURE; } else { - pw_node_update_state(this->input->node, PW_NODE_STATE_RUNNING, NULL); + pw_node_update_state(input->node, PW_NODE_STATE_RUNNING, NULL); changed = false; } } @@ -164,22 +174,22 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st if (out_state == PW_PORT_STATE_CONFIGURE) { pw_log_debug("link %p: doing set format on output", this); - if ((res = pw_port_set_format(this->output, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { + if ((res = pw_port_set_format(output, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { asprintf(&error, "error set output format: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->output->node, res, complete_ready, - this->output); + pw_work_queue_add(impl->work, output->node, res, complete_ready, + output); } if (in_state == PW_PORT_STATE_CONFIGURE) { pw_log_debug("link %p: doing set format on input", this); - if ((res2 = pw_port_set_format(this->input, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { + if ((res2 = pw_port_set_format(input, SPA_PORT_FORMAT_FLAG_NEAREST, format)) < 0) { asprintf(&error, "error set input format: %d", res2); goto error; } if (SPA_RESULT_IS_ASYNC(res2)) - pw_work_queue_add(impl->work, this->input->node, res2, complete_ready, this->input); + pw_work_queue_add(impl->work, input->node, res2, complete_ready, input); } @@ -382,8 +392,8 @@ param_filter(struct pw_link *this, int iidx, oidx, num = 0; for (iidx = 0;; iidx++) { - if (pw_port_enum_params(in_port, iidx, &iparam) - < 0) + if (spa_node_port_enum_params(in_port->node->node, in_port->direction, in_port->port_id, + iidx, &iparam) < 0) break; if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) @@ -393,7 +403,8 @@ param_filter(struct pw_link *this, struct spa_pod_frame f; uint32_t offset; - if (pw_port_enum_params(out_port, oidx, &oparam) < 0) + if (spa_node_port_enum_params(out_port->node->node, out_port->direction, + out_port->port_id, oidx, &oparam) < 0) break; if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) @@ -427,20 +438,25 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s const struct spa_port_info *iinfo, *oinfo; uint32_t in_flags, out_flags; char *error = NULL; + struct pw_port *input, *output; if (in_state != PW_PORT_STATE_READY && out_state != PW_PORT_STATE_READY) return SPA_RESULT_OK; pw_link_update_state(this, PW_LINK_STATE_ALLOCATING, NULL); - pw_log_debug("link %p: doing alloc buffers %p %p", this, this->output->node, - this->input->node); + input = this->input; + output = this->output; + + pw_log_debug("link %p: doing alloc buffers %p %p", this, output->node, input->node); /* find out what's possible */ - if ((res = pw_port_get_info(this->output, &oinfo)) < 0) { + if ((res = spa_node_port_get_info(output->node->node, output->direction, output->port_id, + &oinfo)) < 0) { asprintf(&error, "error get output port info: %d", res); goto error; } - if ((res = pw_port_get_info(this->input, &iinfo)) < 0) { + if ((res = spa_node_port_get_info(input->node->node, input->direction, input->port_id, + &iinfo)) < 0) { asprintf(&error, "error get input port info: %d", res); goto error; } @@ -450,8 +466,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s if (out_flags & SPA_PORT_INFO_FLAG_LIVE) { pw_log_debug("setting link as live"); - this->output->node->live = true; - this->input->node->live = true; + output->node->live = true; + input->node->live = true; } if (in_state == PW_PORT_STATE_READY && out_state == PW_PORT_STATE_READY) { @@ -500,7 +516,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s uint32_t max_buffers; size_t minsize = 1024, stride = 0; - n_params = param_filter(this, this->input, this->output, &b); + n_params = param_filter(this, input, output, &b); params = alloca(n_params * sizeof(struct spa_param *)); for (i = 0, offset = 0; i < n_params; i++) { @@ -556,21 +572,22 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS)) minsize = 0; - if (this->output->n_buffers) { + if (output->n_buffers) { out_flags = 0; in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - this->n_buffers = this->output->n_buffers; - this->buffers = this->output->buffers; - this->buffer_owner = this->output; - pw_log_debug("reusing %d output buffers %p", this->n_buffers, + this->n_buffers = output->n_buffers; + this->buffers = output->buffers; + this->buffer_owner = output; + pw_log_debug("link %p: reusing %d output buffers %p", this, this->n_buffers, this->buffers); - } else if (this->input->n_buffers && this->input->mix == NULL) { + } else if (input->n_buffers && input->mix == NULL) { out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; in_flags = 0; - this->n_buffers = this->input->n_buffers; - this->buffers = this->input->buffers; - this->buffer_owner = this->input; - pw_log_debug("reusing %d input buffers %p", this->n_buffers, this->buffers); + this->n_buffers = input->n_buffers; + this->buffers = input->buffers; + this->buffer_owner = input; + pw_log_debug("link %p: reusing %d input buffers %p", this, this->n_buffers, + this->buffers); } else { size_t data_sizes[1]; ssize_t data_strides[1]; @@ -588,59 +605,57 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s data_sizes, data_strides, &this->buffer_mem); - pw_log_debug("allocating %d input buffers %p %zd %zd", this->n_buffers, - this->buffers, minsize, stride); + pw_log_debug("link %p: allocating %d input buffers %p %zd %zd", this, + this->n_buffers, this->buffers, minsize, stride); } if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = pw_port_alloc_buffers(this->output, + if ((res = pw_port_alloc_buffers(output, params, n_params, this->buffers, &this->n_buffers)) < 0) { asprintf(&error, "error alloc output buffers: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->output->node, res, complete_paused, - this->output); - this->output->buffer_mem = this->buffer_mem; - this->buffer_owner = this->output; - pw_log_debug("allocated %d buffers %p from output port", this->n_buffers, - this->buffers); + pw_work_queue_add(impl->work, output->node, res, complete_paused, output); + output->buffer_mem = this->buffer_mem; + this->buffer_owner = output; + pw_log_debug("link %p: allocated %d buffers %p from output port", this, + this->n_buffers, this->buffers); } else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { - if ((res = pw_port_alloc_buffers(this->input, + if ((res = pw_port_alloc_buffers(input, params, n_params, this->buffers, &this->n_buffers)) < 0) { asprintf(&error, "error alloc input buffers: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->input->node, res, complete_paused, - this->input); - this->input->buffer_mem = this->buffer_mem; - this->buffer_owner = this->input; - pw_log_debug("allocated %d buffers %p from input port", this->n_buffers, - this->buffers); + pw_work_queue_add(impl->work, input->node, res, complete_paused, input); + input->buffer_mem = this->buffer_mem; + this->buffer_owner = input; + pw_log_debug("link %p: allocated %d buffers %p from input port", this, + this->n_buffers, this->buffers); } } if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { - pw_log_debug("using %d buffers %p on input port", this->n_buffers, this->buffers); - if ((res = pw_port_use_buffers(this->input, - this->buffers, this->n_buffers)) < 0) { + pw_log_debug("link %p: using %d buffers %p on input port", this, + this->n_buffers, this->buffers); + if ((res = pw_port_use_buffers(input, this->buffers, this->n_buffers)) < 0) { asprintf(&error, "error use input buffers: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->input->node, res, complete_paused, this->input); + pw_work_queue_add(impl->work, input->node, res, complete_paused, input); } else if (out_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { - pw_log_debug("using %d buffers %p on output port", this->n_buffers, this->buffers); - if ((res = pw_port_use_buffers(this->output, - this->buffers, this->n_buffers)) < 0) { + pw_log_debug("link %p: using %d buffers %p on output port", this, + this->n_buffers, this->buffers); + if ((res = pw_port_use_buffers(output, this->buffers, this->n_buffers)) < 0) { asprintf(&error, "error use output buffers: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->output->node, res, complete_paused, this->output); + pw_work_queue_add(impl->work, output->node, res, complete_paused, output); } else { asprintf(&error, "no common buffer alloc found"); goto error; @@ -649,12 +664,12 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s return SPA_RESULT_OK; error: - this->output->buffers = NULL; - this->output->n_buffers = 0; - this->output->allocated = false; - this->input->buffers = NULL; - this->input->n_buffers = 0; - this->input->allocated = false; + output->buffers = NULL; + output->n_buffers = 0; + output->allocated = false; + input->buffers = NULL; + input->n_buffers = 0; + input->allocated = false; pw_link_update_state(this, PW_LINK_STATE_ERROR, error); return res; } @@ -664,35 +679,37 @@ static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state) struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); char *error = NULL; int res; + struct pw_port *input, *output; if (in_state < PW_PORT_STATE_PAUSED || out_state < PW_PORT_STATE_PAUSED) return SPA_RESULT_OK; pw_link_update_state(this, PW_LINK_STATE_PAUSED, NULL); + input = this->input; + output = this->output; + if (in_state == PW_PORT_STATE_PAUSED) { - if ((res = pw_node_set_state(this->input->node, PW_NODE_STATE_RUNNING)) < 0) { + if ((res = pw_node_set_state(input->node, PW_NODE_STATE_RUNNING)) < 0) { asprintf(&error, "error starting input node: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->input->node, res, complete_streaming, - this->input); + pw_work_queue_add(impl->work, input->node, res, complete_streaming, input); else - complete_streaming(this->input->node, this->input, res, 0); + complete_streaming(input->node, input, res, 0); } if (out_state == PW_PORT_STATE_PAUSED) { - if ((res = pw_node_set_state(this->output->node, PW_NODE_STATE_RUNNING)) < 0) { + if ((res = pw_node_set_state(output->node, PW_NODE_STATE_RUNNING)) < 0) { asprintf(&error, "error starting output node: %d", res); goto error; } if (SPA_RESULT_IS_ASYNC(res)) - pw_work_queue_add(impl->work, this->output->node, res, complete_streaming, - this->output); + pw_work_queue_add(impl->work, output->node, res, complete_streaming, output); else - complete_streaming(this->output->node, this->output, res, 0); + complete_streaming(output->node, output, res, 0); } return SPA_RESULT_OK; @@ -714,24 +731,33 @@ static int check_states(struct pw_link *this, void *user_data, int res) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); uint32_t in_state, out_state; + struct pw_port *input, *output; if (this->state == PW_LINK_STATE_ERROR) return SPA_RESULT_ERROR; - if (this->input == NULL || this->output == NULL) + input = this->input; + output = this->output; + + if (input == NULL || output == NULL) return SPA_RESULT_OK; - if (this->input->node->info.state == PW_NODE_STATE_ERROR || - this->output->node->info.state == PW_NODE_STATE_ERROR) + if (input->node->info.state == PW_NODE_STATE_ERROR || + output->node->info.state == PW_NODE_STATE_ERROR) return SPA_RESULT_ERROR; - in_state = this->input->state; - out_state = this->output->state; + in_state = input->state; + out_state = output->state; pw_log_debug("link %p: input state %d, output state %d", this, in_state, out_state); + if (in_state == PW_PORT_STATE_ERROR || out_state == PW_PORT_STATE_ERROR) { + pw_link_update_state(this, PW_LINK_STATE_ERROR, NULL); + return SPA_RESULT_ERROR; + } + if (in_state == PW_PORT_STATE_STREAMING && out_state == PW_PORT_STATE_STREAMING) { - pw_loop_invoke(this->output->node->data_loop, + pw_loop_invoke(output->node->data_loop, do_activate_link, SPA_ID_INVALID, 0, NULL, false, this); pw_link_update_state(this, PW_LINK_STATE_RUNNING, NULL); return SPA_RESULT_OK; diff --git a/src/pipewire/node.c b/src/pipewire/node.c index ffff1d896..f2a5a0572 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -41,8 +41,6 @@ struct impl { struct pw_work_queue *work; - struct spa_hook node_listener; - bool registered; }; @@ -60,12 +58,8 @@ static int pause_node(struct pw_node *this) return SPA_RESULT_OK; pw_log_debug("node %p: pause node", this); - if (this->implementation->send_command) - res = this->implementation->send_command(this->implementation_data, - &SPA_COMMAND_INIT(this->core->type.command_node.Pause)); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - + res = spa_node_send_command(this->node, + &SPA_COMMAND_INIT(this->core->type.command_node.Pause)); if (res < 0) pw_log_debug("node %p: send command error %d", this, res); @@ -77,12 +71,8 @@ static int start_node(struct pw_node *this) int res = SPA_RESULT_OK; pw_log_debug("node %p: start node", this); - if (this->implementation->send_command) - res = this->implementation->send_command(this->implementation_data, - &SPA_COMMAND_INIT(this->core->type.command_node.Start)); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - + res = spa_node_send_command(this->node, + &SPA_COMMAND_INIT(this->core->type.command_node.Start)); if (res < 0) pw_log_debug("node %p: send command error %d", this, res); @@ -99,26 +89,19 @@ static int suspend_node(struct pw_node *this) spa_list_for_each(p, &this->input_ports, link) { if ((res = pw_port_set_format(p, 0, NULL)) < 0) pw_log_warn("error unset format input: %d", res); + /* force CONFIGURE in case of async */ p->state = PW_PORT_STATE_CONFIGURE; } spa_list_for_each(p, &this->output_ports, link) { if ((res = pw_port_set_format(p, 0, NULL)) < 0) pw_log_warn("error unset format output: %d", res); + /* force CONFIGURE in case of async */ p->state = PW_PORT_STATE_CONFIGURE; } return res; } -static void node_async_complete(void *data, uint32_t seq, int res) -{ - struct impl *impl = data; - struct pw_node *this = &impl->this; - - pw_log_debug("node %p: async complete event %d %d", this, seq, res); - pw_work_queue_complete(impl->work, this, seq, res); -} - static void send_clock_update(struct pw_node *this) { int res; @@ -144,59 +127,11 @@ static void send_clock_update(struct pw_node *this) &cu.body.ticks.value, &cu.body.monotonic_time.value); } - if (this->implementation->send_command) - res = this->implementation->send_command(this->implementation_data, (struct spa_command *) &cu); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - + res = spa_node_send_command(this->node, (struct spa_command *) &cu); if (res < 0) pw_log_debug("node %p: send clock update error %d", this, res); } -static void node_event(void *data, const struct spa_event *event) -{ - struct impl *impl = data; - struct pw_node *this = &impl->this; - - pw_log_trace("node %p: event %d", this, SPA_EVENT_TYPE(event)); - if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) { - send_clock_update(this); - } -} - -static void node_need_input(void *data) -{ - struct impl *impl = data; - struct pw_node *this = &impl->this; - - spa_graph_need_input(this->rt.graph, &this->rt.node); -} - -static void node_have_output(void *data) -{ - struct impl *impl = data; - struct pw_node *this = &impl->this; - - spa_graph_have_output(this->rt.graph, &this->rt.node); -} - -static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id) -{ - struct impl *impl = data; - struct pw_node *this = &impl->this; - struct spa_graph_port *p, *pp; - - spa_list_for_each(p, &this->rt.node.ports[SPA_DIRECTION_INPUT], link) { - if (p->port_id != port_id) - continue; - - pp = p->peer; - if (pp && pp->callbacks->reuse_buffer) - pp->callbacks->reuse_buffer(pp->callbacks_data, buffer_id); - break; - } -} - static void node_unbind_func(void *data) { struct pw_resource *resource = data; @@ -213,7 +148,8 @@ update_info(struct pw_node *this) for (this->info.n_input_formats = 0;; this->info.n_input_formats++) { struct spa_format *fmt; - if (pw_port_enum_formats(port, &fmt, NULL, this->info.n_input_formats) < 0) + if (spa_node_port_enum_formats(port->node->node, port->direction, port->port_id, + &fmt, NULL, this->info.n_input_formats) < 0) break; this->info.input_formats = @@ -230,7 +166,8 @@ update_info(struct pw_node *this) for (this->info.n_output_formats = 0;; this->info.n_output_formats++) { struct spa_format *fmt; - if (pw_port_enum_formats(port, &fmt, NULL, this->info.n_output_formats) < 0) + if (spa_node_port_enum_formats(port->node->node, port->direction, port->port_id, + &fmt, NULL, this->info.n_output_formats) < 0) break; this->info.output_formats = @@ -340,24 +277,14 @@ static int graph_impl_process_input(void *data) { struct pw_node *this = data; - int res; - if (this->implementation->process_input) - res = this->implementation->process_input(this->implementation_data); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - return res; + return spa_node_process_input(this->node); } static int graph_impl_process_output(void *data) { struct pw_node *this = data; - int res; - if (this->implementation->process_output) - res = this->implementation->process_output(this->implementation_data); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - return res; + return spa_node_process_output(this->node); } static const struct spa_graph_node_callbacks graph_callbacks = { @@ -366,15 +293,6 @@ static const struct spa_graph_node_callbacks graph_callbacks = { .process_output = graph_impl_process_output, }; -static const struct pw_node_events node_events = { - PW_VERSION_NODE_EVENTS, - .async_complete = node_async_complete, - .event = node_event, - .need_input = node_need_input, - .have_output = node_have_output, - .reuse_buffer = node_reuse_buffer, -}; - struct pw_node *pw_node_new(struct pw_core *core, struct pw_resource *owner, struct pw_global *parent, @@ -416,8 +334,6 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_hook_list_init(&this->listener_list); - pw_node_add_listener(this, &impl->node_listener, &node_events, impl); - this->info.state = PW_NODE_STATE_CREATING; spa_list_init(&this->input_ports); @@ -486,12 +402,77 @@ void pw_node_update_properties(struct pw_node *node, const struct spa_dict *dict node->info.change_mask = 0; } -void pw_node_set_implementation(struct pw_node *node, - const struct pw_node_implementation *implementation, - void *data) +static void node_done(void *data, int seq, int res) { - node->implementation = implementation; - node->implementation_data = data; + struct pw_node *node = data; + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + + pw_log_debug("node %p: async complete event %d %d", node, seq, res); + pw_work_queue_complete(impl->work, node, seq, res); + spa_hook_list_call(&node->listener_list, struct pw_node_events, async_complete, seq, res); +} + +static void node_event(void *data, struct spa_event *event) +{ + struct pw_node *node = data; + + pw_log_trace("node %p: event %d", node, SPA_EVENT_TYPE(event)); + if (SPA_EVENT_TYPE(event) == node->core->type.event_node.RequestClockUpdate) { + send_clock_update(node); + } + spa_hook_list_call(&node->listener_list, struct pw_node_events, event, event); +} + +static void node_need_input(void *data) +{ + struct pw_node *node = data; + spa_hook_list_call(&node->listener_list, struct pw_node_events, need_input); + spa_graph_need_input(node->rt.graph, &node->rt.node); +} + +static void node_have_output(void *data) +{ + struct pw_node *node = data; + spa_hook_list_call(&node->listener_list, struct pw_node_events, have_output); + spa_graph_have_output(node->rt.graph, &node->rt.node); +} + +static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id) +{ + struct pw_node *node = data; + struct spa_graph_port *p, *pp; + + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) { + if (p->port_id != port_id) + continue; + + pp = p->peer; + if (pp && pp->callbacks->reuse_buffer) + pp->callbacks->reuse_buffer(pp->callbacks_data, buffer_id); + break; + } +} + + +static const struct spa_node_callbacks node_callbacks = { + SPA_VERSION_NODE_CALLBACKS, + .done = node_done, + .event = node_event, + .need_input = node_need_input, + .have_output = node_have_output, + .reuse_buffer = node_reuse_buffer, +}; + +void pw_node_set_implementation(struct pw_node *node, + struct spa_node *spa_node) +{ + node->node = spa_node; + spa_node_set_callbacks(node->node, &node_callbacks, node); +} + +struct spa_node *pw_node_get_implementation(struct pw_node *node) +{ + return node->node; } void pw_node_add_listener(struct pw_node *node, @@ -621,6 +602,14 @@ pw_node_find_port(struct pw_node *node, enum pw_direction direction, uint32_t po return pw_map_lookup(portmap, port_id); } +uint32_t pw_node_get_free_port_id(struct pw_node *node, enum pw_direction direction) +{ + if (direction == PW_DIRECTION_INPUT) + return pw_map_insert_new(&node->input_port_map, NULL); + else + return pw_map_insert_new(&node->output_port_map, NULL); +} + /** * pw_node_get_free_port: * \param node a \ref pw_node @@ -665,16 +654,19 @@ struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction di /* no port, can we create one ? */ if (n_ports < max_ports) { uint32_t port_id = pw_map_insert_new(portmap, NULL); + int res; pw_log_debug("node %p: creating port direction %d %u", node, direction, port_id); - if (node->implementation->add_port) - port = node->implementation->add_port(node->implementation_data, direction, port_id); - else - port = NULL; - + if ((res = spa_node_add_port(node->node, direction, port_id)) < 0) { + pw_log_error("node %p: could not add port %d %d", node, port_id, res); + goto no_mem; + } + port = pw_port_new(direction, port_id, NULL, 0); if (port == NULL) goto no_mem; + + pw_port_add(port, node); } else { port = mixport; } diff --git a/src/pipewire/node.h b/src/pipewire/node.h index 9c2d2b4d5..7fb7a96cc 100644 --- a/src/pipewire/node.h +++ b/src/pipewire/node.h @@ -47,27 +47,6 @@ struct pw_node; #include #include -struct pw_node_implementation { -#define PW_VERSION_NODE_IMPLEMENTATION 0 - uint32_t version; - - int (*get_props) (void *data, struct spa_props **props); - - int (*set_props) (void *data, const struct spa_props *props); - - int (*send_command) (void *data, - const struct spa_command *command); - - struct pw_port* (*add_port) (void *data, - enum pw_direction direction, - uint32_t port_id); - - int (*process_input) (void *data); - - int (*process_output) (void *data); -}; - - struct pw_node_events { #define PW_VERSION_NODE_EVENTS 0 uint32_t version; @@ -139,9 +118,8 @@ const struct pw_properties *pw_node_get_properties(struct pw_node *node); void pw_node_update_properties(struct pw_node *node, const struct spa_dict *dict); -void pw_node_set_implementation(struct pw_node *node, - const struct pw_node_implementation *implementation, - void *data); +void pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node); +struct spa_node *pw_node_get_implementation(struct pw_node *node); void pw_node_add_listener(struct pw_node *node, struct spa_hook *listener, @@ -161,9 +139,12 @@ bool pw_node_for_each_port(struct pw_node *node, struct pw_port * pw_node_find_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id); -/** Get a free unused port from the node */ -struct pw_port * -pw_node_get_free_port(struct pw_node *node, enum pw_direction direction); +/** Get a free unused port_id from the node */ +uint32_t pw_node_get_free_port_id(struct pw_node *node, enum pw_direction direction); + +/** Get a free unused port from the node, this can be an old unused existing port + * or a new port */ +struct pw_port * pw_node_get_free_port(struct pw_node *node, enum pw_direction direction); /** Change the state of the node */ int pw_node_set_state(struct pw_node *node, enum pw_node_state state); diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 4a4e93d37..f7ddc3563 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -235,14 +235,6 @@ struct pw_node *pw_port_get_node(struct pw_port *port) return port->node; } -void pw_port_set_implementation(struct pw_port *port, - const struct pw_port_implementation *implementation, - void *data) -{ - port->implementation = implementation; - port->implementation_data = data; -} - void pw_port_add_listener(struct pw_port *port, struct spa_hook *listener, const struct pw_port_events *events, @@ -269,26 +261,27 @@ static int do_add_port(struct spa_loop *loop, return SPA_RESULT_OK; } -void pw_port_add(struct pw_port *port, struct pw_node *node) +bool pw_port_add(struct pw_port *port, struct pw_node *node) { + uint32_t port_id = port->port_id; + port->node = node; pw_log_debug("port %p: add to node %p", port, node); if (port->direction == PW_DIRECTION_INPUT) { spa_list_insert(&node->input_ports, &port->link); - pw_map_insert_at(&node->input_port_map, port->port_id, port); + pw_map_insert_at(&node->input_port_map, port_id, port); node->info.n_input_ports++; node->info.change_mask |= 1 << 1; } else { spa_list_insert(&node->output_ports, &port->link); - pw_map_insert_at(&node->output_port_map, port->port_id, port); + pw_map_insert_at(&node->output_port_map, port_id, port); node->info.n_output_ports++; node->info.change_mask |= 1 << 3; } - if (port->implementation->set_io) - port->implementation->set_io(port->implementation_data, &port->io); + spa_node_port_set_io(node->node, port->direction, port_id, &port->io); port->rt.graph = node->rt.graph; pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port); @@ -297,6 +290,7 @@ void pw_port_add(struct pw_port *port, struct pw_node *node) port_update_state(port, PW_PORT_STATE_CONFIGURE); spa_hook_list_call(&node->listener_list, struct pw_node_events, port_added, port); + return true; } static int do_remove_port(struct spa_loop *loop, @@ -359,38 +353,16 @@ do_port_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_port *port = user_data; - int res; - - if (port->implementation->send_command) - res = port->implementation->send_command(port->implementation_data, - &SPA_COMMAND_INIT(port->node->core->type.command_node.Pause)); - else - res = SPA_RESULT_OK; - return res; -} - -int pw_port_enum_formats(struct pw_port *port, - struct spa_format **format, - const struct spa_format *filter, - int32_t index) -{ - int res; - if (port->implementation->enum_formats) - res = port->implementation->enum_formats(port->implementation_data, format, filter, index); - else - res = SPA_RESULT_ENUM_END; - return res; + struct pw_node *node = port->node; + return spa_node_port_send_command(node->node, port->direction, port->port_id, + &SPA_COMMAND_INIT(node->core->type.command_node.Pause)); } int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format) { int res; - if (port->implementation->set_format) - res = port->implementation->set_format(port->implementation_data, flags, format); - else - res = SPA_RESULT_OK; - + res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format); pw_log_debug("port %p: set format %d", port, res); if (!SPA_RESULT_IS_ASYNC(res)) { @@ -411,46 +383,6 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_fo return res; } -int pw_port_get_format(struct pw_port *port, const struct spa_format **format) -{ - int res; - if (port->implementation->get_format) - res = port->implementation->get_format(port->implementation_data, format); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - return res; -} - -int pw_port_get_info(struct pw_port *port, const struct spa_port_info **info) -{ - int res; - if (port->implementation->get_info) - res = port->implementation->get_info(port->implementation_data, info); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - return res; -} - -int pw_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param) -{ - int res; - if (port->implementation->enum_params) - res = port->implementation->enum_params(port->implementation_data, index, param); - else - res = SPA_RESULT_ENUM_END; - return res; -} - -int pw_port_set_param(struct pw_port *port, struct spa_param *param) -{ - int res; - if (port->implementation->set_param) - res = port->implementation->set_param(port->implementation_data, param); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - return res; -} - int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) { int res; @@ -468,11 +400,7 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 } pw_log_debug("port %p: use %d buffers", port, n_buffers); - - if (port->implementation->use_buffers) - res = port->implementation->use_buffers(port->implementation_data, buffers, n_buffers); - else - res = SPA_RESULT_NOT_IMPLEMENTED; + res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers); if (port->allocated) { free(port->buffers); @@ -507,13 +435,9 @@ int pw_port_alloc_buffers(struct pw_port *port, pw_log_debug("port %p: alloc %d buffers", port, *n_buffers); - if (port->implementation->alloc_buffers) - res = port->implementation->alloc_buffers(port->implementation_data, + res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id, params, n_params, buffers, n_buffers); - else - res = SPA_RESULT_NOT_IMPLEMENTED; - if (port->allocated) { free(port->buffers); pw_memblock_free(&port->buffer_mem); diff --git a/src/pipewire/port.h b/src/pipewire/port.h index 88d8995ab..eee6a7b88 100644 --- a/src/pipewire/port.h +++ b/src/pipewire/port.h @@ -57,37 +57,6 @@ enum pw_port_state { PW_PORT_STATE_STREAMING = 4, }; -struct pw_port_implementation { -#define PW_VERSION_PORT_IMPLEMENTATION 0 - uint32_t version; - - int (*set_io) (void *data, struct spa_port_io *io); - - int (*enum_formats) (void *data, - struct spa_format **format, - const struct spa_format *filter, - int32_t index); - - int (*set_format) (void *data, uint32_t flags, const struct spa_format *format); - - int (*get_format) (void *data, const struct spa_format **format); - - int (*get_info) (void *data, const struct spa_port_info **info); - - int (*enum_params) (void *data, uint32_t index, struct spa_param **param); - - int (*set_param) (void *data, struct spa_param *param); - - int (*use_buffers) (void *data, struct spa_buffer **buffers, uint32_t n_buffers); - - int (*alloc_buffers) (void *data, - struct spa_param **params, uint32_t n_params, - struct spa_buffer **buffers, uint32_t *n_buffers); - int (*reuse_buffer) (void *data, uint32_t buffer_id); - - int (*send_command) (void *data, struct spa_command *command); -}; - struct pw_port_events { #define PW_VERSION_PORT_EVENTS 0 uint32_t version; @@ -127,11 +96,7 @@ uint32_t pw_port_get_id(struct pw_port *port); struct pw_node *pw_port_get_node(struct pw_port *port); /** Add a port to a node \memberof pw_port */ -void pw_port_add(struct pw_port *port, struct pw_node *node); - -void pw_port_set_implementation(struct pw_port *port, - const struct pw_port_implementation *implementation, - void *data); +bool pw_port_add(struct pw_port *port, struct pw_node *node); void pw_port_add_listener(struct pw_port *port, struct spa_hook *listener, @@ -143,36 +108,6 @@ void pw_port_destroy(struct pw_port *port); void * pw_port_get_user_data(struct pw_port *port); -/** Get the current format on a port \memberof pw_port */ -int pw_port_enum_formats(struct pw_port *port, - struct spa_format **format, - const struct spa_format *filter, - int32_t index); - -/** Set a format on a port \memberof pw_port */ -int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format); - -/** Get the current format on a port \memberof pw_port */ -int pw_port_get_format(struct pw_port *port, const struct spa_format **format); - -/** Get the info on a port \memberof pw_port */ -int pw_port_get_info(struct pw_port *port, const struct spa_port_info **info); - -/** Enumerate the port parameters \memberof pw_port */ -int pw_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param); - -/** Set a port parameter \memberof pw_port */ -int pw_port_set_param(struct pw_port *port, struct spa_param *param); - -/** Use buffers on a port \memberof pw_port */ -int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers); - -/** Allocate memory for buffers on a port \memberof pw_port */ -int pw_port_alloc_buffers(struct pw_port *port, - struct spa_param **params, uint32_t n_params, - struct spa_buffer **buffers, uint32_t *n_buffers); - - #ifdef __cplusplus } #endif diff --git a/src/pipewire/private.h b/src/pipewire/private.h index b1a698b91..bb38840e4 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -219,13 +219,10 @@ struct pw_node { bool live; /**< if the node is live */ struct spa_clock *clock; /**< handle to SPA clock if any */ + struct spa_node *node; /**< SPA node implementation */ struct spa_list resource_list; /**< list of resources for this node */ - /** Implementation of core node functions */ - const struct pw_node_implementation *implementation; - void *implementation_data; - struct spa_list input_ports; /**< list of input ports */ struct pw_map input_port_map; /**< map from port_id to port */ uint32_t n_used_input_links; /**< number of active input links */ @@ -259,9 +256,6 @@ struct pw_port { enum pw_port_state state; /**< state of the port */ - const struct pw_port_implementation *implementation; - void *implementation_data; - struct spa_port_io io; /**< io area of the port */ bool allocated; /**< if buffers are allocated */ @@ -273,14 +267,14 @@ struct pw_port { struct spa_hook_list listener_list; - void *mix; /**< optional port buffer mix/split */ + struct spa_node *mix; /**< optional port buffer mix/split */ struct { struct spa_graph *graph; - struct spa_graph_port port; - struct spa_graph_port mix_port; - struct spa_graph_node mix_node; - } rt; /**< data only accessed from the data thread */ + struct spa_graph_port port; /**< this graph port, linked to mix_port */ + struct spa_graph_port mix_port; /**< port from the mixer */ + struct spa_graph_node mix_node; /**< mixer node */ + } rt; /**< data only accessed from the data thread */ void *user_data; /**< extra user data */ }; @@ -376,6 +370,17 @@ struct pw_node_factory { void *user_data; }; +/** Set a format on a port \memberof pw_port */ +int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format); + +/** Use buffers on a port \memberof pw_port */ +int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers); + +/** Allocate memory for buffers on a port \memberof pw_port */ +int pw_port_alloc_buffers(struct pw_port *port, + struct spa_param **params, uint32_t n_params, + struct spa_buffer **buffers, uint32_t *n_buffers); + #ifdef __cplusplus } #endif diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 34f933f3d..c7a14d57b 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -550,13 +550,14 @@ static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32 } } if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_FORMAT) { - pw_port_get_format(port, &format); + spa_node_port_get_format(port->node->node, port->direction, port->port_id, &format); } if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { for (;; n_params++) { struct spa_param *param; - if (pw_port_enum_params(port, n_params, ¶m) < 0) + if (spa_node_port_enum_params(port->node->node, port->direction, port->port_id, + n_params, ¶m) < 0) break; params = realloc(params, sizeof(struct spa_param *) * (n_params + 1)); @@ -564,7 +565,7 @@ static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32 } } if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_INFO) { - pw_port_get_info(port, &port_info); + spa_node_port_get_info(port->node->node, port->direction, port->port_id, &port_info); pi = * port_info; pi.flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; } @@ -858,7 +859,7 @@ handle_node_command(struct pw_proxy *proxy, uint32_t seq, const struct spa_comma data->rtsocket_source, SPA_IO_ERR | SPA_IO_HUP); - if ((res = data->node->implementation->send_command(data->node, command)) < 0) + if ((res = spa_node_send_command(data->node->node, command)) < 0) pw_log_warn("node %p: pause failed", proxy); pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -871,7 +872,7 @@ handle_node_command(struct pw_proxy *proxy, uint32_t seq, const struct spa_comma data->rtsocket_source, SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); - if ((res = data->node->implementation->send_command(data->node, command)) < 0) + if ((res = spa_node_send_command(data->node->node, command)) < 0) pw_log_warn("node %p: start failed", proxy); pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -934,7 +935,6 @@ static void node_need_input(void *data) { struct node_data *d = data; uint64_t cmd = 1; - pw_client_node_transport_add_message(d->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); write(d->rtwritefd, &cmd, 8); @@ -944,7 +944,6 @@ static void node_have_output(void *data) { struct node_data *d = data; uint64_t cmd = 1; - pw_client_node_transport_add_message(d->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); write(d->rtwritefd, &cmd, 8);