diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index 0cb951086..6408586d7 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -24,6 +24,8 @@ extern "C" { #endif +#include + #include #include #include diff --git a/spa/include/spa/loop.h b/spa/include/spa/loop.h index c613780d6..82c19c043 100644 --- a/spa/include/spa/loop.h +++ b/spa/include/spa/loop.h @@ -63,7 +63,7 @@ typedef int (*spa_invoke_func_t) (struct spa_loop *loop, bool async, uint32_t seq, size_t size, - void *data, + const void *data, void *user_data); /** @@ -89,7 +89,7 @@ struct spa_loop { spa_invoke_func_t func, uint32_t seq, size_t size, - void *data, + const void *data, bool block, void *user_data); }; diff --git a/spa/include/spa/node.h b/spa/include/spa/node.h index 553afb2d4..acb2d4ea2 100644 --- a/spa/include/spa/node.h +++ b/spa/include/spa/node.h @@ -214,7 +214,7 @@ struct spa_node { * #SPA_RESULT_INVALID_COMMAND @command is an invalid command * #SPA_RESULT_ASYNC @command is executed asynchronously */ - int (*send_command) (struct spa_node *node, struct spa_command *command); + int (*send_command) (struct spa_node *node, const struct spa_command *command); /** * struct spa_node::set_event_callback: * @node: a #struct spa_node @@ -519,7 +519,7 @@ struct spa_node { int (*port_send_command) (struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command); + const struct spa_command *command); /** * struct spa_node::process_input: * @node: a #struct spa_node diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index c8f47421a..b7986ce52 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -88,7 +88,7 @@ static int impl_node_set_props(struct spa_node *node, const struct spa_props *pr return SPA_RESULT_OK; } -static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct state *this = user_data; @@ -97,11 +97,11 @@ static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t return SPA_RESULT_OK; } -static int do_command(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_command(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct state *this = user_data; int res; - struct spa_command *cmd = data; + const struct spa_command *cmd = data; if (SPA_COMMAND_TYPE(cmd) == this->type.command_node.Start || SPA_COMMAND_TYPE(cmd) == this->type.command_node.Pause) { @@ -121,7 +121,7 @@ static int do_command(struct spa_loop *loop, bool async, uint32_t seq, size_t si return res; } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct state *this; @@ -513,7 +513,7 @@ static int impl_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, static int impl_node_port_send_command(struct spa_node *node, - enum spa_direction direction, uint32_t port_id, struct spa_command *command) + enum spa_direction direction, uint32_t port_id, const struct spa_command *command) { struct state *this; int res; diff --git a/spa/plugins/alsa/alsa-source.c b/spa/plugins/alsa/alsa-source.c index 99fdedbc7..63d92ec8b 100644 --- a/spa/plugins/alsa/alsa-source.c +++ b/spa/plugins/alsa/alsa-source.c @@ -90,7 +90,7 @@ static int impl_node_set_props(struct spa_node *node, const struct spa_props *pr return SPA_RESULT_OK; } -static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct state *this = user_data; @@ -99,7 +99,7 @@ static int do_send_done(struct spa_loop *loop, bool async, uint32_t seq, size_t return SPA_RESULT_OK; } -static int do_start(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_start(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct state *this = user_data; int res; @@ -118,7 +118,7 @@ static int do_start(struct spa_loop *loop, bool async, uint32_t seq, size_t size return res; } -static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) +static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct state *this = user_data; int res; @@ -137,7 +137,7 @@ static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size return res; } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct state *this; @@ -533,7 +533,7 @@ static int impl_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, static int impl_node_port_send_command(struct spa_node *node, - enum spa_direction direction, uint32_t port_id, struct spa_command *command) + enum spa_direction direction, uint32_t port_id, const struct spa_command *command) { struct state *this; int res; diff --git a/spa/plugins/audiomixer/audiomixer.c b/spa/plugins/audiomixer/audiomixer.c index c0c482500..14a94c903 100644 --- a/spa/plugins/audiomixer/audiomixer.c +++ b/spa/plugins/audiomixer/audiomixer.c @@ -145,7 +145,7 @@ static int impl_node_set_props(struct spa_node *node, const struct spa_props *pr return SPA_RESULT_NOT_IMPLEMENTED; } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -622,7 +622,7 @@ static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/audiotestsrc/audiotestsrc.c b/spa/plugins/audiotestsrc/audiotestsrc.c index 10615678e..58438dc16 100644 --- a/spa/plugins/audiotestsrc/audiotestsrc.c +++ b/spa/plugins/audiotestsrc/audiotestsrc.c @@ -355,7 +355,7 @@ static void on_output(struct spa_source *source) this->callbacks->have_output(&this->node, this->user_data); } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -838,7 +838,7 @@ static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/ffmpeg/ffmpeg-dec.c b/spa/plugins/ffmpeg/ffmpeg-dec.c index 5336e58b7..af9b74cd0 100644 --- a/spa/plugins/ffmpeg/ffmpeg-dec.c +++ b/spa/plugins/ffmpeg/ffmpeg-dec.c @@ -90,7 +90,7 @@ static int spa_ffmpeg_dec_node_set_props(struct spa_node *node, const struct spa return SPA_RESULT_NOT_IMPLEMENTED; } -static int spa_ffmpeg_dec_node_send_command(struct spa_node *node, struct spa_command *command) +static int spa_ffmpeg_dec_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -424,7 +424,7 @@ static int spa_ffmpeg_dec_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/ffmpeg/ffmpeg-enc.c b/spa/plugins/ffmpeg/ffmpeg-enc.c index c92f0a334..ab962c06e 100644 --- a/spa/plugins/ffmpeg/ffmpeg-enc.c +++ b/spa/plugins/ffmpeg/ffmpeg-enc.c @@ -94,7 +94,7 @@ static int spa_ffmpeg_enc_node_set_props(struct spa_node *node, const struct spa return SPA_RESULT_NOT_IMPLEMENTED; } -static int spa_ffmpeg_enc_node_send_command(struct spa_node *node, struct spa_command *command) +static int spa_ffmpeg_enc_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -390,7 +390,7 @@ spa_ffmpeg_enc_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, u static int spa_ffmpeg_enc_node_port_send_command(struct spa_node *node, enum spa_direction direction, - uint32_t port_id, struct spa_command *command) + uint32_t port_id, const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/support/loop.c b/spa/plugins/support/loop.c index 9afb45d4e..387e5e3a6 100644 --- a/spa/plugins/support/loop.c +++ b/spa/plugins/support/loop.c @@ -195,7 +195,7 @@ loop_invoke(struct spa_loop *loop, spa_invoke_func_t func, uint32_t seq, size_t size, - void *data, + const void *data, bool block, void *user_data) { diff --git a/spa/plugins/test/fakesink.c b/spa/plugins/test/fakesink.c index a7f086a3b..3104f51a0 100644 --- a/spa/plugins/test/fakesink.c +++ b/spa/plugins/test/fakesink.c @@ -270,7 +270,7 @@ static void on_input(struct spa_source *source) consume_buffer(this); } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -640,7 +640,7 @@ static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/test/fakesrc.c b/spa/plugins/test/fakesrc.c index 623096098..8f1ca92a2 100644 --- a/spa/plugins/test/fakesrc.c +++ b/spa/plugins/test/fakesrc.c @@ -282,7 +282,7 @@ static void on_output(struct spa_source *source) this->callbacks->have_output(&this->node, this->user_data); } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -687,7 +687,7 @@ static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index 438e66f58..e89a04336 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -228,7 +228,7 @@ static int do_pause_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, - void *data, + const void *data, void *user_data) { struct impl *this = user_data; @@ -246,12 +246,12 @@ static int do_pause(struct spa_loop *loop, bool async, uint32_t seq, size_t size, - void *data, + const void *data, void *user_data) { struct impl *this = user_data; int res; - struct spa_command *cmd = data; + const struct spa_command *cmd = data; res = spa_node_port_send_command(&this->node, SPA_DIRECTION_OUTPUT, 0, cmd); @@ -271,7 +271,7 @@ static int do_start_done(struct spa_loop *loop, bool async, uint32_t seq, size_t size, - void *data, + const void *data, void *user_data) { struct impl *this = user_data; @@ -286,12 +286,12 @@ static int do_start(struct spa_loop *loop, bool async, uint32_t seq, size_t size, - void *data, + const void *data, void *user_data) { struct impl *this = user_data; int res; - struct spa_command *cmd = data; + const struct spa_command *cmd = data; res = spa_node_port_send_command(&this->node, SPA_DIRECTION_OUTPUT, 0, cmd); @@ -307,7 +307,7 @@ static int do_start(struct spa_loop *loop, return SPA_RESULT_OK; } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -774,7 +774,7 @@ static int impl_node_port_reuse_buffer(struct spa_node *node, static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { struct impl *this; int res; diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index 8a9f516cc..d9d94d7dc 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -969,7 +969,8 @@ static int spa_v4l2_use_buffers(struct impl *this, struct spa_buffer **buffers, } else if (d[0].type == this->type.data.DmaBuf) { state->memtype = V4L2_MEMORY_DMABUF; } else { - spa_log_error(state->log, "v4l2: can't use buffers of type %d", d[0].type); + spa_log_error(state->log, "v4l2: can't use buffers of type %s (%d)", + spa_type_map_get_type (this->map, d[0].type), d[0].type); return SPA_RESULT_ERROR; } } diff --git a/spa/plugins/videotestsrc/videotestsrc.c b/spa/plugins/videotestsrc/videotestsrc.c index 35838160d..89f1c9636 100644 --- a/spa/plugins/videotestsrc/videotestsrc.c +++ b/spa/plugins/videotestsrc/videotestsrc.c @@ -306,7 +306,7 @@ static void on_output(struct spa_source *source) this->callbacks->have_output(&this->node, this->user_data); } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -782,7 +782,7 @@ static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/plugins/volume/volume.c b/spa/plugins/volume/volume.c index 23a2f98f3..a3cf0ae12 100644 --- a/spa/plugins/volume/volume.c +++ b/spa/plugins/volume/volume.c @@ -188,7 +188,7 @@ static int impl_node_set_props(struct spa_node *node, const struct spa_props *pr return SPA_RESULT_OK; } -static int impl_node_send_command(struct spa_node *node, struct spa_command *command) +static int impl_node_send_command(struct spa_node *node, const struct spa_command *command) { struct impl *this; @@ -627,7 +627,7 @@ static int impl_node_port_send_command(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_NOT_IMPLEMENTED; } diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index f2a9e699e..152f32817 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -275,7 +275,7 @@ static void do_remove_source(struct spa_source *source) static int do_invoke(struct spa_loop *loop, - spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data) + spa_invoke_func_t func, uint32_t seq, size_t size, const void *data, bool block, void *user_data) { return func(loop, false, seq, size, data, user_data); } diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 925b86711..6c5a9b183 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -320,7 +320,7 @@ static void do_remove_source(struct spa_source *source) static int do_invoke(struct spa_loop *loop, - spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data) + spa_invoke_func_t func, uint32_t seq, size_t size, const void *data, bool block, void *user_data) { return func(loop, false, seq, size, data, user_data); } diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index b7a4cdd1c..34352b55b 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -333,7 +333,7 @@ static void do_remove_source(struct spa_source *source) static int do_invoke(struct spa_loop *loop, - spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data) + spa_invoke_func_t func, uint32_t seq, size_t size, const void *data, bool block, void *user_data) { return func(loop, false, seq, size, data, user_data); } diff --git a/spa/tests/test-props.c b/spa/tests/test-props.c index f7570d30a..712d5a9ea 100644 --- a/spa/tests/test-props.c +++ b/spa/tests/test-props.c @@ -48,6 +48,57 @@ * */ +bool: true | false +int: +float: +string: "" +bytes: "$" +pointer: "@" +rectangle: "x" +fraction: "/" +bitmask: "|" +array: "[,...]" +object: "{: ,...}" +key: "" + + + "format": "S16LE", + "format": [ "S16LE", "S24LE" ], + "size": "320x240", + "+size": [ "320x240", { "min": "320x240", "max": "640x480", "step": "8x8" ], + "-size": [ "320x240", ["320x240", "640x480", "512x400"] ], + + "schema.size": { "default": "320x240", + "flags": ["range", "optional"], + "values": ["320x240", "640x480", "8x8"] } + + + "size-schema": + { "default": "320x240", + "values": ["320x240", "640x480"] + }, + "size-schema": + { "default": "320x240", + "values": { "min": "320x240", "max": "640x480", "step": "8x8" } + }, + + + "size-alt": { "def": "320x240", "type": "range", "vals": ["320x240", "640x480", "8x8"] }, + "size-alt": { "def": "320x240", "type": "enum", "vals": ["320x240", "640x480"], + +{ + "type": "audio/raw", + "format": ["S16LE", "enum", "S16LE", "S24LE" ], + "size": [ "320x240", "range", "320x240", "640x480" ], + "framerate": [ "25/1", "range", "25/1", "30/1" ] +} + +{ "type": "audio", "subtype": "raw", + "format": "S16LE", + "size": { "min": "320x240", "max": "640x480", "step": "8x8" }, + + "framerate": [ "25/1", "30/1" ] } + spa_build(SPA_MEDIA_TYPE_VIDEO, SPA_MEDIA_SUBTYPE_RAW, type.format_video.format, SPA_PROP_TYPE_ID, video_format.I420 diff --git a/spa/tests/test-ringbuffer.c b/spa/tests/test-ringbuffer.c index 028a6b326..2c770a6a7 100644 --- a/spa/tests/test-ringbuffer.c +++ b/spa/tests/test-ringbuffer.c @@ -269,7 +269,7 @@ static void do_remove_source(struct spa_source *source) static int do_invoke(struct spa_loop *loop, - spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data) + spa_invoke_func_t func, uint32_t seq, size_t size, const void *data, bool block, void *user_data) { return func(loop, false, seq, size, data, user_data); } diff --git a/spa/tests/test-v4l2.c b/spa/tests/test-v4l2.c index 17e5466dd..3e9d98356 100644 --- a/spa/tests/test-v4l2.c +++ b/spa/tests/test-v4l2.c @@ -291,7 +291,7 @@ static void do_remove_source(struct spa_source *source) static int do_invoke(struct spa_loop *loop, - spa_invoke_func_t func, uint32_t seq, size_t size, void *data, bool block, void *user_data) + spa_invoke_func_t func, uint32_t seq, size_t size, const void *data, bool block, void *user_data) { return func(loop, false, seq, size, data, user_data); } diff --git a/src/examples/export-sink.c b/src/examples/export-sink.c new file mode 100644 index 000000000..a9ec4a17d --- /dev/null +++ b/src/examples/export-sink.c @@ -0,0 +1,560 @@ +/* PipeWire + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +struct type { + uint32_t format; + uint32_t props; + struct spa_type_meta meta; + struct spa_type_data data; + struct spa_type_media_type media_type; + struct spa_type_media_subtype media_subtype; + struct spa_type_format_video format_video; + struct spa_type_video_format video_format; +}; + +static inline void init_type(struct type *type, struct spa_type_map *map) +{ + type->format = spa_type_map_get_id(map, SPA_TYPE__Format); + type->props = spa_type_map_get_id(map, SPA_TYPE__Props); + spa_type_meta_map(map, &type->meta); + spa_type_data_map(map, &type->data); + spa_type_media_type_map(map, &type->media_type); + spa_type_media_subtype_map(map, &type->media_subtype); + spa_type_format_video_map(map, &type->format_video); + spa_type_video_format_map(map, &type->video_format); +} + +#define WIDTH 640 +#define HEIGHT 480 +#define BPP 3 + +struct data { + struct type type; + + const char *path; + + SDL_Renderer *renderer; + SDL_Window *window; + SDL_Texture *texture; + + bool running; + struct pw_loop *loop; + + struct pw_core *core; + + struct pw_remote *remote; + struct pw_listener on_state_changed; + + struct pw_node *node; + struct pw_port *port; + struct spa_port_info port_info; + + uint8_t buffer[1024]; + + struct spa_video_info_raw format; + int32_t stride; + + uint8_t params_buffer[1024]; + struct spa_param *params[2]; + + struct spa_buffer *buffers[32]; + int n_buffers; +}; + +static void handle_events(struct data *data) +{ + SDL_Event event; + while (SDL_PollEvent(&event)) { + switch (event.type) { + case SDL_QUIT: + exit(0); + break; + } + } +} + +static struct { + Uint32 format; + uint32_t id; +} video_formats[] = { + { + SDL_PIXELFORMAT_UNKNOWN, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_INDEX1LSB, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_UNKNOWN, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_INDEX1LSB, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_INDEX1MSB, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_INDEX4LSB, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_INDEX4MSB, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_INDEX8, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGB332, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGB444, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGB555, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_BGR555, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_ARGB4444, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGBA4444, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_ABGR4444, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_BGRA4444, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_ARGB1555, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGBA5551, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_ABGR1555, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_BGRA5551, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGB565, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_BGR565, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGB24, offsetof(struct spa_type_video_format, RGB),}, { + SDL_PIXELFORMAT_RGB888, offsetof(struct spa_type_video_format, RGB),}, { + SDL_PIXELFORMAT_RGBX8888, offsetof(struct spa_type_video_format, RGBx),}, { + SDL_PIXELFORMAT_BGR24, offsetof(struct spa_type_video_format, BGR),}, { + SDL_PIXELFORMAT_BGR888, offsetof(struct spa_type_video_format, BGR),}, { + SDL_PIXELFORMAT_BGRX8888, offsetof(struct spa_type_video_format, BGRx),}, { + SDL_PIXELFORMAT_ARGB2101010, offsetof(struct spa_type_video_format, UNKNOWN),}, { + SDL_PIXELFORMAT_RGBA8888, offsetof(struct spa_type_video_format, RGBA),}, { + SDL_PIXELFORMAT_ARGB8888, offsetof(struct spa_type_video_format, ARGB),}, { + SDL_PIXELFORMAT_BGRA8888, offsetof(struct spa_type_video_format, BGRA),}, { + SDL_PIXELFORMAT_ABGR8888, offsetof(struct spa_type_video_format, ABGR),}, { + SDL_PIXELFORMAT_YV12, offsetof(struct spa_type_video_format, YV12),}, { + SDL_PIXELFORMAT_IYUV, offsetof(struct spa_type_video_format, I420),}, { + SDL_PIXELFORMAT_YUY2, offsetof(struct spa_type_video_format, YUY2),}, { + SDL_PIXELFORMAT_UYVY, offsetof(struct spa_type_video_format, UYVY),}, { + SDL_PIXELFORMAT_YVYU, offsetof(struct spa_type_video_format, YVYU),}, { + SDL_PIXELFORMAT_NV12, offsetof(struct spa_type_video_format, NV12),}, { +SDL_PIXELFORMAT_NV21, offsetof(struct spa_type_video_format, NV21),}}; + +static uint32_t sdl_format_to_id(struct data *data, Uint32 format) +{ + int i; + + for (i = 0; i < SPA_N_ELEMENTS(video_formats); i++) { + if (video_formats[i].format == format) + return *SPA_MEMBER(&data->type.video_format, video_formats[i].id, uint32_t); + } + return data->type.video_format.UNKNOWN; +} + +static Uint32 id_to_sdl_format(struct data *data, uint32_t id) +{ + int i; + + for (i = 0; i < SPA_N_ELEMENTS(video_formats); i++) { + if (*SPA_MEMBER(&data->type.video_format, video_formats[i].id, uint32_t) == id) + return video_formats[i].format; + } + return SDL_PIXELFORMAT_UNKNOWN; +} + +#define PROP(f,key,type,...) \ + SPA_POD_PROP (f,key,0,type,1,__VA_ARGS__) +#define PROP_U_MM(f,key,type,...) \ + SPA_POD_PROP (f,key,SPA_POD_PROP_FLAG_UNSET | \ + SPA_POD_PROP_RANGE_MIN_MAX,type,3,__VA_ARGS__) + +static int impl_port_enum_formats(struct pw_port *port, + struct spa_format **format, + const struct spa_format *filter, + int32_t index) +{ + struct data *data = port->user_data; + const struct spa_format *formats[1]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(data->buffer, sizeof(data->buffer)); + struct spa_pod_frame f[2]; + SDL_RendererInfo info; + int i, c; + + if (index != 0) + return SPA_RESULT_ENUM_END; + + SDL_GetRendererInfo(data->renderer, &info); + + spa_pod_builder_push_format(&b, &f[0], data->type.format, + data->type.media_type.video, + data->type.media_subtype.raw); + + spa_pod_builder_push_prop(&b, &f[1], data->type.format_video.format, + SPA_POD_PROP_FLAG_UNSET | + SPA_POD_PROP_RANGE_ENUM); + for (i = 0, c = 0; i < info.num_texture_formats; i++) { + uint32_t id = sdl_format_to_id(data, info.texture_formats[i]); + if (id == 0) + continue; + if (c++ == 0) + spa_pod_builder_id(&b, id); + spa_pod_builder_id(&b, id); + } + for (i = 0; i < SPA_N_ELEMENTS(video_formats); i++) { + uint32_t id = + *SPA_MEMBER(&data->type.video_format, video_formats[i].id, + uint32_t); + if (id != data->type.video_format.UNKNOWN) + spa_pod_builder_id(&b, id); + } + spa_pod_builder_pop(&b, &f[1]); + spa_pod_builder_add(&b, + PROP_U_MM(&f[1], data->type.format_video.size, SPA_POD_TYPE_RECTANGLE, + WIDTH, HEIGHT, + 1, 1, info.max_texture_width, info.max_texture_height), + PROP_U_MM(&f[1], data->type.format_video.framerate, SPA_POD_TYPE_FRACTION, + 25, 1, + 0, 1, 30, 1), + 0); + spa_pod_builder_pop(&b, &f[0]); + formats[0] = SPA_POD_BUILDER_DEREF(&b, f[0].ref, struct spa_format); + + spa_debug_format(formats[0]); + + *format = (struct spa_format *)formats[0]; + + return SPA_RESULT_OK; +} + +static int impl_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format) +{ + struct data *data = port->user_data; + struct pw_core *core = data->core; + struct spa_pod_builder b = { NULL }; + struct spa_pod_frame f[2]; + Uint32 sdl_format; + void *d; + + if (format == NULL) + return SPA_RESULT_OK; + + spa_debug_format(format); + + spa_format_video_raw_parse(format, &data->format, &data->type.format_video); + + sdl_format = id_to_sdl_format(data, data->format.format); + if (sdl_format == SDL_PIXELFORMAT_UNKNOWN) + return SPA_RESULT_ERROR; + + data->texture = SDL_CreateTexture(data->renderer, + sdl_format, + SDL_TEXTUREACCESS_STREAMING, + data->format.size.width, + data->format.size.height); + SDL_LockTexture(data->texture, NULL, &d, &data->stride); + SDL_UnlockTexture(data->texture); + + spa_pod_builder_init(&b, data->params_buffer, sizeof(data->params_buffer)); + spa_pod_builder_object(&b, &f[0], 0, core->type.param_alloc_buffers.Buffers, + PROP(&f[1], core->type.param_alloc_buffers.size, SPA_POD_TYPE_INT, + data->stride * data->format.size.height), + PROP(&f[1], core->type.param_alloc_buffers.stride, SPA_POD_TYPE_INT, + data->stride), + PROP_U_MM(&f[1], core->type.param_alloc_buffers.buffers, SPA_POD_TYPE_INT, + 32, + 2, 32), + PROP(&f[1], core->type.param_alloc_buffers.align, SPA_POD_TYPE_INT, + 16)); + data->params[0] = SPA_POD_BUILDER_DEREF(&b, f[0].ref, struct spa_param); + + spa_pod_builder_object(&b, &f[0], 0, core->type.param_alloc_meta_enable.MetaEnable, + PROP(&f[1], core->type.param_alloc_meta_enable.type, SPA_POD_TYPE_ID, + core->type.meta.Header), + PROP(&f[1], core->type.param_alloc_meta_enable.size, SPA_POD_TYPE_INT, + sizeof(struct spa_meta_header))); + data->params[1] = SPA_POD_BUILDER_DEREF(&b, f[0].ref, struct spa_param); + + return SPA_RESULT_OK; +} + +static int impl_port_get_format(struct pw_port *port, const struct spa_format **format) +{ + struct data *data = port->user_data; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(data->buffer, sizeof(data->buffer)); + struct spa_pod_frame f[2]; + + spa_pod_builder_push_format(&b, &f[0], data->type.format, + data->type.media_type.video, + data->type.media_subtype.raw); + spa_pod_builder_add(&b, + PROP(&f[1], data->type.format_video.format, SPA_POD_TYPE_ID, data->format.format), + PROP(&f[1], data->type.format_video.size, SPA_POD_TYPE_RECTANGLE, &data->format.size), + PROP(&f[1], data->type.format_video.framerate, SPA_POD_TYPE_FRACTION, &data->format.framerate), + 0); + spa_pod_builder_pop(&b, &f[0]); + *format = SPA_POD_BUILDER_DEREF(&b, f[0].ref, struct spa_format); + + return SPA_RESULT_OK; +} + +static int impl_port_get_info(struct pw_port *port, const struct spa_port_info **info) +{ + struct data *data = port->user_data; + + data->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; + data->port_info.rate = 0; + data->port_info.props = NULL; + + *info = &data->port_info; + + return SPA_RESULT_OK; +} + +static int impl_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param) +{ + struct data *data = port->user_data; + + if (index >= 2) + return SPA_RESULT_ENUM_END; + + *param = data->params[index]; + + return SPA_RESULT_OK; +} + +static int impl_port_set_param(struct pw_port *port, struct spa_param *param) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static int impl_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) +{ + struct data *data = port->user_data; + int i; + for (i = 0; i < n_buffers; i++) + data->buffers[i] = buffers[i]; + data->n_buffers = n_buffers; + return SPA_RESULT_OK; +} + +static int impl_port_alloc_buffers(struct pw_port *port, + struct spa_param **params, uint32_t n_params, + struct spa_buffer **buffers, uint32_t *n_buffers) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static int impl_port_reuse_buffer(struct pw_port *port, uint32_t buffer_id) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static int impl_port_send_command(struct pw_port *port, struct spa_command *command) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static const struct pw_port_implementation impl_port = { + PW_VERSION_PORT_IMPLEMENTATION, + impl_port_enum_formats, + impl_port_set_format, + impl_port_get_format, + impl_port_get_info, + impl_port_enum_params, + impl_port_set_param, + impl_port_use_buffers, + impl_port_alloc_buffers, + impl_port_reuse_buffer, + impl_port_send_command, +}; + +static int impl_node_get_props(struct pw_node *node, struct spa_props **props) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static int impl_node_set_props(struct pw_node *node, const struct spa_props *props) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static int impl_node_send_command(struct pw_node *node, + const struct spa_command *command) +{ + return SPA_RESULT_OK; +} + +static struct pw_port* impl_node_add_port(struct pw_node *node, + enum pw_direction direction, + uint32_t port_id) +{ + return NULL; +} + +static int impl_node_process_input(struct pw_node *node) +{ + struct data *data = node->user_data; + struct pw_port *port = data->port; + struct spa_buffer *buf; + uint8_t *map; + void *sdata, *ddata; + int sstride, dstride, ostride; + int i; + uint8_t *src, *dst; + + buf = port->buffers[port->io.buffer_id]; + + if (buf->datas[0].type == data->type.data.MemFd || + buf->datas[0].type == data->type.data.DmaBuf) { + map = mmap(NULL, buf->datas[0].maxsize + buf->datas[0].mapoffset, PROT_READ, + MAP_PRIVATE, buf->datas[0].fd, 0); + sdata = SPA_MEMBER(map, buf->datas[0].mapoffset, uint8_t); + } else if (buf->datas[0].type == data->type.data.MemPtr) { + map = NULL; + sdata = buf->datas[0].data; + } else + return SPA_RESULT_ERROR; + + if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) { + fprintf(stderr, "Couldn't lock texture: %s\n", SDL_GetError()); + return SPA_RESULT_ERROR; + } + sstride = buf->datas[0].chunk->stride; + ostride = SPA_MIN(sstride, dstride); + + src = sdata; + dst = ddata; + for (i = 0; i < data->format.size.height; i++) { + memcpy(dst, src, ostride); + src += sstride; + dst += dstride; + } + SDL_UnlockTexture(data->texture); + + SDL_RenderClear(data->renderer); + SDL_RenderCopy(data->renderer, data->texture, NULL, NULL); + SDL_RenderPresent(data->renderer); + + if (map) + munmap(map, buf->datas[0].maxsize); + + handle_events(data); + + port->io.status = SPA_RESULT_NEED_BUFFER; + + return SPA_RESULT_NEED_BUFFER; +} + +static int impl_node_process_output(struct pw_node *node) +{ + return SPA_RESULT_NOT_IMPLEMENTED; +} + +static const struct pw_node_implementation impl_node = { + PW_VERSION_NODE_IMPLEMENTATION, + impl_node_get_props, + impl_node_set_props, + impl_node_send_command, + impl_node_add_port, + impl_node_process_input, + impl_node_process_output, +}; + +static void make_node(struct data *data) +{ + struct pw_properties *props; + + props = pw_properties_new( + //"pipewire.target.node", port_path, + "pipewire.autoconnect", "1", + NULL); + + data->node = pw_node_new(data->core, NULL, NULL, "SDL-sink", props, 0); + data->node->user_data = data; + data->node->implementation = &impl_node; + + data->port = pw_port_new(PW_DIRECTION_INPUT, 0, 0); + data->port->user_data = data; + data->port->implementation = &impl_port; + pw_port_add(data->port, data->node); + pw_node_export(data->node); + + pw_remote_export(data->remote, data->node); +} + +static void on_state_changed(struct pw_listener *listener, struct pw_remote *remote) +{ + struct data *data = SPA_CONTAINER_OF(listener, struct data, on_state_changed); + + switch (remote->state) { + case PW_REMOTE_STATE_ERROR: + printf("remote error: %s\n", remote->error); + data->running = false; + break; + + case PW_REMOTE_STATE_CONNECTED: + make_node(data); + break; + + default: + printf("remote state: \"%s\"\n", pw_remote_state_as_string(remote->state)); + break; + } +} + + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + + pw_init(&argc, &argv); + + data.loop = pw_loop_new(); + data.running = true; + data.core = pw_core_new(data.loop, NULL); + data.remote = pw_remote_new(data.core, NULL); + data.path = argc > 1 ? argv[1] : NULL; + + pw_module_load(data.core, "libpipewire-module-spa-node-factory", NULL); + + init_type(&data.type, data.core->type.map); + + spa_debug_set_type_map(data.core->type.map); + + if (SDL_Init(SDL_INIT_VIDEO) < 0) { + printf("can't initialize SDL: %s\n", SDL_GetError()); + return -1; + } + + if (SDL_CreateWindowAndRenderer + (WIDTH, HEIGHT, SDL_WINDOW_RESIZABLE, &data.window, &data.renderer)) { + printf("can't create window: %s\n", SDL_GetError()); + return -1; + } + + pw_signal_add(&data.remote->state_changed, &data.on_state_changed, on_state_changed); + + pw_remote_connect(data.remote); + + pw_loop_enter(data.loop); + while (data.running) { + pw_loop_iterate(data.loop, -1); + } + pw_loop_leave(data.loop); + + pw_loop_destroy(data.loop); + + return 0; +} diff --git a/src/examples/export-v4l2.c b/src/examples/export-v4l2.c new file mode 100644 index 000000000..8579a06ec --- /dev/null +++ b/src/examples/export-v4l2.c @@ -0,0 +1,140 @@ +/* PipeWire + * Copyright (C) 2017 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +struct type { + uint32_t format; + uint32_t props; + struct spa_type_meta meta; + struct spa_type_data data; + struct spa_type_media_type media_type; + struct spa_type_media_subtype media_subtype; + struct spa_type_format_video format_video; + struct spa_type_video_format video_format; +}; + +static inline void init_type(struct type *type, struct spa_type_map *map) +{ + type->format = spa_type_map_get_id(map, SPA_TYPE__Format); + type->props = spa_type_map_get_id(map, SPA_TYPE__Props); + spa_type_meta_map(map, &type->meta); + spa_type_data_map(map, &type->data); + spa_type_media_type_map(map, &type->media_type); + spa_type_media_subtype_map(map, &type->media_subtype); + spa_type_format_video_map(map, &type->format_video); + spa_type_video_format_map(map, &type->video_format); +} + +struct data { + struct type type; + + bool running; + struct pw_loop *loop; + + struct pw_core *core; + + struct pw_remote *remote; + struct pw_listener on_state_changed; + + struct pw_node *node; +}; + +static void make_node(struct data *data) +{ + struct pw_node_factory *factory; + struct pw_properties *props; + + factory = pw_core_find_node_factory(data->core, "spa-node-factory"); + props = pw_properties_new("spa.library.name", "v4l2/libspa-v4l2", + "spa.factory.name", "v4l2-source", NULL); + data->node = pw_node_factory_create_node(factory, NULL, "v4l2-source", props); + + pw_node_export(data->node); + + pw_remote_export(data->remote, data->node); +} + +static void on_state_changed(struct pw_listener *listener, struct pw_remote *remote) +{ + struct data *data = SPA_CONTAINER_OF(listener, struct data, on_state_changed); + + switch (remote->state) { + case PW_REMOTE_STATE_ERROR: + printf("remote error: %s\n", remote->error); + data->running = false; + break; + + case PW_REMOTE_STATE_CONNECTED: + make_node(data); + break; + + default: + printf("remote state: \"%s\"\n", pw_remote_state_as_string(remote->state)); + break; + } +} + + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + + pw_init(&argc, &argv); + + data.loop = pw_loop_new(); + data.running = true; + data.core = pw_core_new(data.loop, NULL); + data.remote = pw_remote_new(data.core, NULL); + + pw_module_load(data.core, "libpipewire-module-spa-node-factory", NULL); + + init_type(&data.type, data.core->type.map); + + spa_debug_set_type_map(data.core->type.map); + + pw_signal_add(&data.remote->state_changed, &data.on_state_changed, on_state_changed); + + pw_remote_connect(data.remote); + + pw_loop_enter(data.loop); + while (data.running) { + pw_loop_iterate(data.loop, -1); + } + pw_loop_leave(data.loop); + + pw_loop_destroy(data.loop); + + return 0; +} diff --git a/src/examples/local-v4l2.c b/src/examples/local-v4l2.c index 652bee41e..85167b070 100644 --- a/src/examples/local-v4l2.c +++ b/src/examples/local-v4l2.c @@ -239,7 +239,7 @@ static int impl_port_enum_formats(struct pw_port *port, return SPA_RESULT_OK; } -static int impl_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) +static int impl_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format) { struct data *data = port->user_data; struct pw_core *core = data->core; @@ -378,7 +378,7 @@ static int impl_node_set_props(struct pw_node *node, const struct spa_props *pro } static int impl_node_send_command(struct pw_node *node, - struct spa_command *command) + const struct spa_command *command) { return SPA_RESULT_OK; } diff --git a/src/examples/meson.build b/src/examples/meson.build index e087f98d3..af750b43c 100644 --- a/src/examples/meson.build +++ b/src/examples/meson.build @@ -3,6 +3,11 @@ executable('video-src', install: false, dependencies : [pipewire_dep], ) +executable('export-v4l2', + 'export-v4l2.c', + install: false, + dependencies : [pipewire_dep], +) if sdl_dep.found() executable('video-play', @@ -15,4 +20,9 @@ if sdl_dep.found() install: false, dependencies : [pipewire_dep, sdl_dep], ) + executable('export-sink', + 'export-sink.c', + install: false, + dependencies : [pipewire_dep, sdl_dep], + ) endif diff --git a/src/examples/video-play.c b/src/examples/video-play.c index e47d89863..7d2af808f 100644 --- a/src/examples/video-play.c +++ b/src/examples/video-play.c @@ -70,7 +70,6 @@ struct data { bool running; struct pw_loop *loop; - struct spa_source *timer; struct pw_core *core; struct pw_remote *remote; diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index ea6266401..8f0eedc69 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -174,7 +174,7 @@ static inline void send_have_output(struct proxy *this) do_flush(this); } -static int spa_proxy_node_send_command(struct spa_node *node, struct spa_command *command) +static int spa_proxy_node_send_command(struct spa_node *node, const struct spa_command *command) { struct proxy *this; int res = SPA_RESULT_OK; @@ -237,11 +237,11 @@ spa_proxy_node_get_n_ports(struct spa_node *node, if (n_input_ports) *n_input_ports = this->n_inputs; if (max_input_ports) - *max_input_ports = this->max_inputs; + *max_input_ports = this->max_inputs == 0 ? this->n_inputs : this->max_inputs; if (n_output_ports) *n_output_ports = this->n_outputs; if (max_output_ports) - *max_output_ports = this->max_outputs; + *max_output_ports = this->max_outputs == 0 ? this->n_outputs : this->max_outputs; return SPA_RESULT_OK; } @@ -298,6 +298,7 @@ do_update_port(struct proxy *this, } if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_POSSIBLE_FORMATS) { + spa_log_info(this->log, "proxy %p: %d formats", this, n_possible_formats); for (i = 0; i < port->n_formats; i++) free(port->formats[i]); port->n_formats = n_possible_formats; @@ -307,12 +308,14 @@ do_update_port(struct proxy *this, port->formats[i] = spa_format_copy(possible_formats[i]); } if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_FORMAT) { + spa_log_info(this->log, "proxy %p: update format %p", this, format); if (port->format) free(port->format); port->format = spa_format_copy(format); } if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { + spa_log_info(this->log, "proxy %p: update %d params", this, n_params); for (i = 0; i < port->n_params; i++) free(port->params[i]); port->n_params = n_params; @@ -743,7 +746,7 @@ spa_proxy_node_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32 static int spa_proxy_node_port_send_command(struct spa_node *node, enum spa_direction direction, - uint32_t port_id, struct spa_command *command) + uint32_t port_id, const struct spa_command *command) { struct proxy *this; @@ -1034,6 +1037,38 @@ proxy_init(struct proxy *this, return SPA_RESULT_RETURN_ASYNC(this->seq++); } +static int client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd) +{ + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + + if (impl->fds[0] == -1) { +#if 0 + if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != + 0) + return SPA_RESULT_ERRNO; + + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[0]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[1]; +#else + impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; +#endif + + spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); + pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); + } + *readfd = impl->other_fds[0]; + *writefd = impl->other_fds[1]; + + return SPA_RESULT_OK; +} + static void on_initialized(struct pw_listener *listener, struct pw_node *node) { struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, initialized); @@ -1044,11 +1079,11 @@ static void on_initialized(struct pw_listener *listener, struct pw_node *node) if (this->resource == NULL) return; - impl->transport = pw_transport_new(node->info.max_input_ports, node->info.max_output_ports); + impl->transport = pw_transport_new(node->info.max_input_ports, node->info.max_output_ports, 0); impl->transport->area->n_input_ports = node->info.n_input_ports; impl->transport->area->n_output_ports = node->info.n_output_ports; - pw_client_node_get_fds(this, &readfd, &writefd); + client_node_get_fds(this, &readfd, &writefd); pw_transport_get_info(impl->transport, &info); pw_client_node_resource_transport(this->resource, node->global->id, @@ -1181,46 +1216,3 @@ void pw_client_node_destroy(struct pw_client_node *node) { pw_resource_destroy(node->resource); } - -/** Get the set of fds for this \ref pw_client_node - * - * \param node a \ref pw_client_node - * \param[out] readfd an fd for reading - * \param[out] writefd an fd for writing - * \return 0 on success < 0 on error - * - * Create or return a previously created set of fds for \a node. - * - * \memberof pw_client_node - */ -int pw_client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - - if (impl->fds[0] == -1) { -#if 0 - if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != - 0) - return SPA_RESULT_ERRNO; - - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[0]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[1]; -#else - impl->fds[0] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->fds[1] = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->proxy.data_source.fd = impl->fds[0]; - impl->proxy.writefd = impl->fds[1]; - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[0]; -#endif - - spa_loop_add_source(impl->proxy.data_loop, &impl->proxy.data_source); - pw_log_debug("client-node %p: add data fd %d", node, impl->proxy.data_source.fd); - } - *readfd = impl->other_fds[0]; - *writefd = impl->other_fds[1]; - - return SPA_RESULT_OK; -} diff --git a/src/modules/module-client-node/client-node.h b/src/modules/module-client-node/client-node.h index 1dcaec8cc..6cfc0ca66 100644 --- a/src/modules/module-client-node/client-node.h +++ b/src/modules/module-client-node/client-node.h @@ -47,9 +47,6 @@ pw_client_node_new(struct pw_resource *resource, void pw_client_node_destroy(struct pw_client_node *node); -int -pw_client_node_get_fds(struct pw_client_node *node, int *readfd, int *writefd); - #ifdef __cplusplus } #endif diff --git a/src/modules/spa/spa-node.c b/src/modules/spa/spa-node.c index 680ab6ed0..20a21b355 100644 --- a/src/modules/spa/spa-node.c +++ b/src/modules/spa/spa-node.c @@ -59,7 +59,7 @@ static int port_impl_enum_formats(struct pw_port *port, return spa_node_port_enum_formats(p->node, port->direction, port->port_id, format, filter, index); } -static int port_impl_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) +static int port_impl_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format) { struct port *p = port->user_data; return spa_node_port_set_format(p->node, port->direction, port->port_id, flags, format); @@ -248,7 +248,7 @@ static int node_impl_set_props(struct pw_node *node, const struct spa_props *pro return spa_node_set_props(impl->node, props); } -static int node_impl_send_command(struct pw_node *node, struct spa_command *command) +static int node_impl_send_command(struct pw_node *node, const struct spa_command *command) { struct impl *impl = node->user_data; return spa_node_send_command(impl->node, command); @@ -342,18 +342,14 @@ static void on_node_need_input(struct spa_node *node, void *user_data) { struct impl *impl = user_data; struct pw_node *this = impl->this; - - spa_graph_scheduler_pull(this->rt.sched, &this->rt.node); - while (spa_graph_scheduler_iterate(this->rt.sched)); + pw_signal_emit(&this->need_input, this); } static void on_node_have_output(struct spa_node *node, void *user_data) { struct impl *impl = user_data; struct pw_node *this = impl->this; - - spa_graph_scheduler_push(this->rt.sched, &this->rt.node); - while (spa_graph_scheduler_iterate(this->rt.sched)); + pw_signal_emit(&this->have_output, this); } static void diff --git a/src/pipewire/link.c b/src/pipewire/link.c index c3ea9e5a7..258a56211 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -244,7 +244,7 @@ static struct spa_buffer **alloc_buffers(struct pw_link *this, /* each buffer */ skel_size = sizeof(struct spa_buffer); - metas = alloca(sizeof(struct spa_meta) * n_params + 1); + metas = alloca(sizeof(struct spa_meta) * (n_params + 1)); /* add shared metadata */ metas[n_metas].type = this->core->type.meta.Shared; @@ -759,7 +759,7 @@ static void clear_port_buffers(struct pw_link *link, struct pw_port *port) static int do_remove_input(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_link *this = user_data; spa_graph_port_remove(&this->rt.in_port); @@ -782,7 +782,7 @@ static void input_remove(struct pw_link *this, struct pw_port *port) static int do_remove_output(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_link *this = user_data; spa_graph_port_remove(&this->rt.out_port); @@ -848,7 +848,7 @@ static void on_output_port_destroy(struct pw_listener *listener, struct pw_port static int do_activate_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_link *this = user_data; spa_graph_port_link(&this->rt.out_port, &this->rt.in_port); @@ -879,7 +879,7 @@ bool pw_link_activate(struct pw_link *this) static int do_deactivate_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_link *this = user_data; spa_graph_port_unlink(&this->rt.out_port); @@ -972,7 +972,7 @@ link_bind_func(struct pw_global *global, static int do_add_link(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_link *this = user_data; struct pw_port *port = ((struct pw_port **) data)[0]; diff --git a/src/pipewire/node.c b/src/pipewire/node.c index ac9724f8f..e2b8ceb38 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -39,6 +39,8 @@ struct impl { struct pw_work_queue *work; struct pw_listener on_async_complete; struct pw_listener on_event; + struct pw_listener on_need_input; + struct pw_listener on_have_output; bool exported; }; @@ -133,7 +135,7 @@ static void send_clock_update(struct pw_node *this) } static void on_event(struct pw_listener *listener, - struct pw_node *node, struct spa_event *event) + struct pw_node *node, const struct spa_event *event) { struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, on_event); struct pw_node *this = &impl->this; @@ -144,6 +146,20 @@ static void on_event(struct pw_listener *listener, } } +static void on_need_input(struct pw_listener *listener, + struct pw_node *node) +{ + spa_graph_scheduler_pull(node->rt.sched, &node->rt.node); + while (spa_graph_scheduler_iterate(node->rt.sched)); +} + +static void on_have_output(struct pw_listener *listener, + struct pw_node *node) +{ + spa_graph_scheduler_push(node->rt.sched, &node->rt.node); + while (spa_graph_scheduler_iterate(node->rt.sched)); +} + static void node_unbind_func(void *data) { struct pw_resource *resource = data; @@ -244,7 +260,7 @@ node_bind_func(struct pw_global *global, static int do_node_add(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_node *this = user_data; @@ -313,17 +329,22 @@ struct pw_node *pw_node_new(struct pw_core *core, this = &impl->this; this->core = core; this->owner = owner; + impl->parent = parent; pw_log_debug("node %p: new, owner %p", this, owner); if (user_data_size > 0) this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); - impl->work = pw_work_queue_new(this->core->main_loop); - impl->parent = parent; + if (properties == NULL) + properties = pw_properties_new(NULL, NULL); + if (properties == NULL) + goto no_mem; - this->info.name = strdup(name); this->properties = properties; + impl->work = pw_work_queue_new(this->core->main_loop); + this->info.name = strdup(name); + this->data_loop = core->data_loop; this->rt.sched = &core->rt.sched; @@ -339,9 +360,13 @@ struct pw_node *pw_node_new(struct pw_core *core, pw_signal_init(&this->free_signal); pw_signal_init(&this->async_complete); pw_signal_init(&this->event); + pw_signal_init(&this->need_input); + pw_signal_init(&this->have_output); pw_signal_add(&this->async_complete, &impl->on_async_complete, on_async_complete); pw_signal_add(&this->event, &impl->on_event, on_event); + pw_signal_add(&this->need_input, &impl->on_need_input, on_need_input); + pw_signal_add(&this->have_output, &impl->on_have_output, on_have_output); this->info.state = PW_NODE_STATE_CREATING; @@ -356,11 +381,15 @@ struct pw_node *pw_node_new(struct pw_core *core, this); return this; + + no_mem: + free(impl); + return NULL; } static int do_node_remove(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_node *this = user_data; @@ -429,6 +458,19 @@ void pw_node_destroy(struct pw_node *node) free(impl); } +struct pw_port * +pw_node_find_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id) +{ + struct pw_map *portmap; + + if (direction == PW_DIRECTION_INPUT) + portmap = &node->input_port_map; + else + portmap = &node->output_port_map; + + return pw_map_lookup(portmap, port_id); +} + /** * pw_node_get_free_port: * \param node a \ref pw_node diff --git a/src/pipewire/node.h b/src/pipewire/node.h index c97b6de27..33c92b271 100644 --- a/src/pipewire/node.h +++ b/src/pipewire/node.h @@ -51,7 +51,7 @@ struct pw_node_implementation { int (*set_props) (struct pw_node *node, const struct spa_props *props); int (*send_command) (struct pw_node *node, - struct spa_command *command); + const struct spa_command *command); struct pw_port* (*add_port) (struct pw_node *node, enum pw_direction direction, @@ -131,7 +131,12 @@ struct pw_node { /** an event is emited */ PW_SIGNAL(event, (struct pw_listener *listener, - struct pw_node *node, struct spa_event *event)); + struct pw_node *node, const struct spa_event *event)); + + /** the node wants input */ + PW_SIGNAL(need_input, (struct pw_listener *listener, struct pw_node *node)); + /** the node has output */ + PW_SIGNAL(have_output, (struct pw_listener *listener, struct pw_node *node)); struct pw_loop *data_loop; /**< the data loop for this node */ @@ -159,6 +164,10 @@ void pw_node_export(struct pw_node *node); /** Destroy a node */ void pw_node_destroy(struct pw_node *node); +/** Find the port with direction and port_id or NULL when not found */ +struct pw_port * +pw_node_find_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id); + /** Get a free unused port from the node */ struct pw_port * pw_node_get_free_port(struct pw_node *node, enum pw_direction direction); diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 537747ad8..0ac19830e 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -185,7 +185,7 @@ struct pw_port *pw_port_new(enum pw_direction direction, } static int do_add_port(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_port *this = user_data; @@ -224,7 +224,7 @@ void pw_port_add(struct pw_port *port, struct pw_node *node) } static int do_remove_port(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_port *this = user_data; struct spa_graph_port *p; @@ -272,7 +272,7 @@ void pw_port_destroy(struct pw_port *port) static int do_port_pause(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, const void *data, void *user_data) { struct pw_port *port = user_data; @@ -288,7 +288,7 @@ int pw_port_enum_formats(struct pw_port *port, return port->implementation->enum_formats(port, format, filter, index); } -int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) +int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format) { int res; diff --git a/src/pipewire/port.h b/src/pipewire/port.h index 5b5b226e8..f5de222d0 100644 --- a/src/pipewire/port.h +++ b/src/pipewire/port.h @@ -56,7 +56,7 @@ struct pw_port_implementation { const struct spa_format *filter, int32_t index); - int (*set_format) (struct pw_port *port, uint32_t flags, struct spa_format *format); + int (*set_format) (struct pw_port *port, uint32_t flags, const struct spa_format *format); int (*get_format) (struct pw_port *port, const struct spa_format **format); @@ -144,7 +144,7 @@ int pw_port_enum_formats(struct pw_port *port, int32_t index); /** Set a format on a port \memberof pw_port */ -int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format); +int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_format *format); /** Get the current format on a port \memberof pw_port */ int pw_port_get_format(struct pw_port *port, const struct spa_format **format); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index b2ba4bc92..16b9046c0 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -34,11 +35,54 @@ #include "pipewire/stream.h" #include "extensions/protocol-native.h" +#include "extensions/client-node.h" /** \cond */ struct remote { struct pw_remote this; + uint32_t type_client_node; }; + +struct mem_id { + uint32_t id; + int fd; + uint32_t flags; + void *ptr; + uint32_t offset; + uint32_t size; +}; + +struct buffer_id { + struct spa_list link; + uint32_t id; + void *buf_ptr; + struct spa_buffer *buf; +}; + +struct node_data { + struct pw_node *node; + struct pw_client_node_proxy *node_proxy; + uint32_t node_id; + + int rtreadfd; + int rtwritefd; + struct spa_source *rtsocket_source; + struct pw_transport *trans; + + struct pw_listener node_proxy_destroy; + struct pw_listener node_need_input; + struct pw_listener node_have_output; + + struct pw_array mem_ids; + struct pw_array buffer_ids; + bool in_order; + +}; +struct trans_data { + struct spa_graph_port *in_ports; + struct spa_graph_port *out_ports; +}; + /** \endcond */ const char *pw_remote_state_as_string(enum pw_remote_state state) @@ -173,6 +217,7 @@ struct pw_remote *pw_remote_new(struct pw_core *core, pw_fill_remote_properties(core, properties); this->properties = properties; + impl->type_client_node = spa_type_map_get_id(core->type.map, PW_TYPE_INTERFACE__ClientNode); this->state = PW_REMOTE_STATE_UNCONNECTED; pw_map_init(&this->objects, 64, 32); @@ -311,3 +356,602 @@ void pw_remote_disconnect(struct pw_remote *remote) } pw_remote_update_state(remote, PW_REMOTE_STATE_UNCONNECTED, NULL); } + + +static void unhandle_socket(struct pw_proxy *proxy) +{ + struct node_data *data = proxy->user_data; + + if (data->rtsocket_source) { + pw_loop_destroy_source(proxy->remote->core->data_loop, data->rtsocket_source); + data->rtsocket_source = NULL; + } +} + +static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event) +{ + struct node_data *data = proxy->user_data; + struct pw_remote *remote = proxy->remote; + struct spa_graph_node *n = &data->node->rt.node; + int res; + + if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.HaveOutput) { + res = n->methods->process_input(n, n->user_data); + } + else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.NeedInput) { + res = n->methods->process_output(n, n->user_data); + } + else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ReuseBuffer) { + } + else { + pw_log_warn("unexpected node event %d", SPA_EVENT_TYPE(event)); + } +} + +static void +on_rtsocket_condition(struct spa_loop_utils *utils, + struct spa_source *source, int fd, enum spa_io mask, void *user_data) +{ + struct pw_proxy *proxy = user_data; + struct node_data *data = proxy->user_data; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + pw_log_warn("got error"); + unhandle_socket(proxy); + return; + } + + if (mask & SPA_IO_IN) { + struct spa_event event; + uint64_t cmd; + + read(data->rtreadfd, &cmd, 8); + + while (pw_transport_next_event(data->trans, &event) == SPA_RESULT_OK) { + struct spa_event *ev = alloca(SPA_POD_SIZE(&event)); + pw_transport_parse_event(data->trans, ev); + handle_rtnode_event(proxy, ev); + } + } +} + +static void client_node_transport(void *object, uint32_t node_id, + int readfd, int writefd, int memfd, uint32_t offset, uint32_t size) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct pw_transport_info info; + struct trans_data *t; + struct pw_port *port; + int i; + + data->node_id = node_id; + + info.memfd = memfd; + if (info.memfd == -1) + return; + info.offset = offset; + info.size = size; + + if (data->trans) + pw_transport_destroy(data->trans); + data->trans = pw_transport_new_from_info(&info, sizeof(struct trans_data)); + t = data->trans->user_data; + + pw_log_info("remote-node %p: create transport %p with fds %d %d for node %u", + proxy, data->trans, readfd, writefd, node_id); + + t->in_ports = calloc(data->trans->area->max_input_ports, sizeof(struct spa_graph_port)); + t->out_ports = calloc(data->trans->area->max_output_ports, sizeof(struct spa_graph_port)); + + for (i = 0; i < data->trans->area->max_input_ports; i++) { + spa_graph_port_init(&t->in_ports[i], + SPA_DIRECTION_INPUT, + i, + 0, + &data->trans->inputs[i]); + } + spa_list_for_each(port, &data->node->input_ports, link) + spa_graph_port_add(&port->rt.mix_node, &t->in_ports[port->port_id]); + + for (i = 0; i < data->trans->area->max_output_ports; i++) { + spa_graph_port_init(&t->out_ports[i], + SPA_DIRECTION_OUTPUT, + i, + 0, + &data->trans->outputs[i]); + } + spa_list_for_each(port, &data->node->output_ports, link) + spa_graph_port_add(&port->rt.mix_node, &t->out_ports[port->port_id]); + + data->rtreadfd = readfd; + data->rtwritefd = writefd; + + unhandle_socket(proxy); + data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop, + data->rtreadfd, + SPA_IO_ERR | SPA_IO_HUP, + true, on_rtsocket_condition, proxy); +} + +static void add_port_update(struct pw_proxy *proxy, struct pw_port *port, uint32_t change_mask) +{ + struct node_data *data = proxy->user_data; + const struct spa_format *format = NULL; + const struct spa_port_info *port_info = NULL; + struct spa_port_info pi; + uint32_t n_possible_formats = 0, n_params = 0; + struct spa_param **params = NULL; + struct spa_format **possible_formats = NULL; + + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_POSSIBLE_FORMATS) { + if (port->direction == PW_DIRECTION_INPUT) { + n_possible_formats = data->node->info.n_input_formats; + possible_formats = data->node->info.input_formats; + } + else if (port->direction == PW_DIRECTION_OUTPUT) { + n_possible_formats = data->node->info.n_output_formats; + possible_formats = data->node->info.output_formats; + } + } + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_FORMAT) { + pw_port_get_format(port, &format); + } + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_PARAMS) { + for (;; n_params++) { + struct spa_param *param; + + if (pw_port_enum_params(port, n_params, ¶m) < 0) + break; + + params = realloc(params, sizeof(struct spa_param *) * (n_params + 1)); + params[n_params] = spa_param_copy(param); + } + } + if (change_mask & PW_CLIENT_NODE_PORT_UPDATE_INFO) { + pw_port_get_info(port, &port_info); + pi = * port_info; + pi.flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + } + + pw_client_node_proxy_port_update(data->node_proxy, + port->direction, + port->port_id, + change_mask, + n_possible_formats, + (const struct spa_format **) possible_formats, + format, + n_params, + (const struct spa_param **) params, + &pi); + if (params) { + while (n_params > 0) + free(params[--n_params]); + free(params); + } +} + +static void +client_node_set_props(void *object, uint32_t seq, const struct spa_props *props) +{ + pw_log_warn("set property not implemented"); +} + +static void client_node_event(void *object, const struct spa_event *event) +{ + pw_log_warn("unhandled node event %d", SPA_EVENT_TYPE(event)); +} + +static void +client_node_add_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + pw_log_warn("add port not supported"); +} + +static void +client_node_remove_port(void *object, uint32_t seq, enum spa_direction direction, uint32_t port_id) +{ + pw_log_warn("remove port not supported"); +} + +static void +client_node_set_format(void *object, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, uint32_t flags, const struct spa_format *format) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct pw_port *port; + int res; + + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) { + res = SPA_RESULT_INVALID_PORT; + goto done; + } + + res = pw_port_set_format(port, flags, format); + if (res != SPA_RESULT_OK) + goto done; + + add_port_update(proxy, port, + PW_CLIENT_NODE_PORT_UPDATE_FORMAT | + PW_CLIENT_NODE_PORT_UPDATE_PARAMS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + + done: + pw_client_node_proxy_done(data->node_proxy, seq, res); +} + +static void +client_node_set_param(void *object, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, + const struct spa_param *param) +{ + pw_log_warn("set param not implemented"); +} + +static struct mem_id *find_mem(struct pw_proxy *proxy, uint32_t id) +{ + struct mem_id *mid; + struct node_data *data = proxy->user_data; + + pw_array_for_each(mid, &data->mem_ids) { + if (mid->id == id) + return mid; + } + return NULL; +} + +static void clear_memid(struct mem_id *mid) +{ + if (mid->ptr != NULL) + munmap(mid->ptr, mid->size + mid->offset); + mid->ptr = NULL; + close(mid->fd); +} + +static void clear_mems(struct pw_proxy *proxy) +{ + struct node_data *data = proxy->user_data; + struct mem_id *mid; + + pw_array_for_each(mid, &data->mem_ids) + clear_memid(mid); + data->mem_ids.size = 0; +} + +static void clear_buffers(struct pw_proxy *proxy) +{ + struct node_data *data = proxy->user_data; + struct buffer_id *bid; + + pw_log_debug("node %p: clear buffers", proxy); + + pw_array_for_each(bid, &data->buffer_ids) { + free(bid->buf); + bid->buf = NULL; + } + data->buffer_ids.size = 0; +} + +static void +client_node_add_mem(void *object, + enum spa_direction direction, + uint32_t port_id, + uint32_t mem_id, + uint32_t type, int memfd, uint32_t flags, uint32_t offset, uint32_t size) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mem_id *m; + + m = find_mem(proxy, mem_id); + if (m) { + pw_log_debug("update mem %u, fd %d, flags %d, off %d, size %d", + mem_id, memfd, flags, offset, size); + clear_memid(m); + } else { + m = pw_array_add(&data->mem_ids, sizeof(struct mem_id)); + pw_log_debug("add mem %u, fd %d, flags %d, off %d, size %d", + mem_id, memfd, flags, offset, size); + } + m->id = mem_id; + m->fd = memfd; + m->flags = flags; + m->ptr = NULL; + m->offset = offset; + m->size = size; +} + +static void +client_node_use_buffers(void *object, + uint32_t seq, + enum spa_direction direction, + uint32_t port_id, uint32_t n_buffers, struct pw_client_node_buffer *buffers) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct buffer_id *bid; + uint32_t i, j, len; + struct spa_buffer *b, **bufs; + struct pw_port *port; + int res; + + port = pw_node_find_port(data->node, direction, port_id); + if (port == NULL) { + res = SPA_RESULT_INVALID_PORT; + goto done; + } + + /* clear previous buffers */ + clear_buffers(proxy); + + bufs = alloca(n_buffers * sizeof(struct spa_buffer *)); + + for (i = 0; i < n_buffers; i++) { + off_t offset; + + struct mem_id *mid = find_mem(proxy, buffers[i].mem_id); + if (mid == NULL) { + pw_log_warn("unknown memory id %u", buffers[i].mem_id); + continue; + } + + if (mid->ptr == NULL) { + mid->ptr = + mmap(NULL, mid->size + mid->offset, PROT_READ | PROT_WRITE, MAP_SHARED, + mid->fd, 0); + if (mid->ptr == MAP_FAILED) { + mid->ptr = NULL; + pw_log_warn("Failed to mmap memory %d %p: %s", mid->size, mid, + strerror(errno)); + continue; + } + } + len = pw_array_get_len(&data->buffer_ids, struct buffer_id); + bid = pw_array_add(&data->buffer_ids, sizeof(struct buffer_id)); + + b = buffers[i].buffer; + + bid->buf_ptr = SPA_MEMBER(mid->ptr, mid->offset + buffers[i].offset, void); + { + size_t size; + + size = sizeof(struct spa_buffer); + for (j = 0; j < buffers[i].buffer->n_metas; j++) + size += sizeof(struct spa_meta); + for (j = 0; j < buffers[i].buffer->n_datas; j++) + size += sizeof(struct spa_data); + + b = bid->buf = malloc(size); + memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); + + b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); + b->datas = + SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + struct spa_data); + } + bid->id = b->id; + + if (bid->id != len) { + pw_log_warn("unexpected id %u found, expected %u", bid->id, len); + } + pw_log_debug("add buffer %d %d %u", mid->id, bid->id, buffers[i].offset); + + offset = 0; + for (j = 0; j < b->n_metas; j++) { + struct spa_meta *m = &b->metas[j]; + memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); + m->data = SPA_MEMBER(bid->buf_ptr, offset, void); + offset += m->size; + } + + for (j = 0; j < b->n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); + d->chunk = + SPA_MEMBER(bid->buf_ptr, offset + sizeof(struct spa_chunk) * j, + struct spa_chunk); + + if (d->type == proxy->remote->core->type.data.Id) { + struct mem_id *bmid = find_mem(proxy, SPA_PTR_TO_UINT32(d->data)); + void *map; + + d->type = proxy->remote->core->type.data.MemFd; + d->fd = bmid->fd; + map = mmap(NULL, d->maxsize + d->mapoffset, PROT_READ|PROT_WRITE, + MAP_SHARED, d->fd, 0); + d->data = SPA_MEMBER(map, d->mapoffset, uint8_t); + pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); + } else if (d->type == proxy->remote->core->type.data.MemPtr) { + d->data = SPA_MEMBER(bid->buf_ptr, SPA_PTR_TO_INT(d->data), void); + d->fd = -1; + pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); + } else { + pw_log_warn("unknown buffer data type %d", d->type); + } + } + bufs[i] = b; + } + + res = pw_port_use_buffers(port, bufs, n_buffers); + + if (n_buffers == 0) + clear_mems(proxy); + + done: + pw_client_node_proxy_done(data->node_proxy, seq, res); + +} + +static bool +handle_node_command(struct pw_proxy *proxy, uint32_t seq, const struct spa_command *command) +{ + struct node_data *data = proxy->user_data; + struct pw_remote *remote = proxy->remote; + int res; + + if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Pause) { + pw_log_debug("node %p: pause %d", proxy, seq); + + pw_loop_update_io(remote->core->data_loop, + data->rtsocket_source, + SPA_IO_ERR | SPA_IO_HUP); + + if ((res = data->node->implementation->send_command(data->node, command)) < 0) + pw_log_warn("node %p: pause failed", proxy); + + pw_client_node_proxy_done(data->node_proxy, seq, res); + } + else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.Start) { + + pw_log_debug("node %p: start %d", proxy, seq); + + pw_loop_update_io(remote->core->data_loop, + data->rtsocket_source, + SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); + + if ((res = data->node->implementation->send_command(data->node, command)) < 0) + pw_log_warn("node %p: start failed", proxy); + + pw_client_node_proxy_done(data->node_proxy, seq, res); + } + else if (SPA_COMMAND_TYPE(command) == remote->core->type.command_node.ClockUpdate) { + struct spa_command_node_clock_update *cu = (__typeof__(cu)) command; + +#if 0 + if (cu->body.flags.value & SPA_COMMAND_NODE_CLOCK_UPDATE_FLAG_LIVE) { + pw_properties_set(stream->properties, "pipewire.latency.is-live", "1"); + pw_properties_setf(stream->properties, + "pipewire.latency.min", "%" PRId64, + cu->body.latency.value); + } + impl->last_ticks = cu->body.ticks.value; + impl->last_rate = cu->body.rate.value; + impl->last_monotonic = cu->body.monotonic_time.value; +#endif + } + else { + pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); + pw_client_node_proxy_done(data->node_proxy, seq, SPA_RESULT_NOT_IMPLEMENTED); + } + return true; +} + + + +static void client_node_node_command(void *object, uint32_t seq, const struct spa_command *command) +{ + struct pw_proxy *proxy = object; + handle_node_command(proxy, seq, command); +} + +static void +client_node_port_command(void *object, + uint32_t direction, + uint32_t port_id, + const struct spa_command *command) +{ + pw_log_warn("port command not supported"); +} + +static const struct pw_client_node_events client_node_events = { + PW_VERSION_CLIENT_NODE_EVENTS, + &client_node_transport, + &client_node_set_props, + &client_node_event, + &client_node_add_port, + &client_node_remove_port, + &client_node_set_format, + &client_node_set_param, + &client_node_add_mem, + &client_node_use_buffers, + &client_node_node_command, + &client_node_port_command, +}; + +static void node_need_input(struct pw_listener *listener, struct pw_node *node) +{ + struct node_data *data = SPA_CONTAINER_OF(listener, struct node_data, node_need_input); + struct pw_core *core = node->core; + uint64_t cmd = 1; + + pw_transport_add_event(data->trans, + &SPA_EVENT_INIT(core->type.event_transport.NeedInput)); + write(data->rtwritefd, &cmd, 8); +} + +static void node_have_output(struct pw_listener *listener, struct pw_node *node) +{ + struct node_data *data = SPA_CONTAINER_OF(listener, struct node_data, node_have_output); + struct pw_core *core = node->core; + uint64_t cmd = 1; + + pw_transport_add_event(data->trans, + &SPA_EVENT_INIT(core->type.event_transport.HaveOutput)); + write(data->rtwritefd, &cmd, 8); +} + +static void do_node_init(struct pw_proxy *proxy) +{ + struct node_data *data = proxy->user_data; + struct pw_port *port; + + pw_client_node_proxy_update(data->node_proxy, + PW_CLIENT_NODE_UPDATE_MAX_INPUTS | + PW_CLIENT_NODE_UPDATE_MAX_OUTPUTS | + PW_CLIENT_NODE_UPDATE_PROPS, + data->node->info.max_input_ports, + data->node->info.max_output_ports, + NULL); + + spa_list_for_each(port, &data->node->input_ports, link) { + add_port_update(proxy, port, + PW_CLIENT_NODE_PORT_UPDATE_POSSIBLE_FORMATS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + } + spa_list_for_each(port, &data->node->output_ports, link) { + add_port_update(proxy, port, + PW_CLIENT_NODE_PORT_UPDATE_POSSIBLE_FORMATS | + PW_CLIENT_NODE_PORT_UPDATE_INFO); + } + pw_client_node_proxy_done(data->node_proxy, 0, SPA_RESULT_OK); +} + +struct pw_proxy *pw_remote_export(struct pw_remote *remote, + struct pw_node *node) +{ + struct remote *impl = SPA_CONTAINER_OF(remote, struct remote, this); + struct pw_proxy *proxy; + struct node_data *data; + + proxy = pw_core_proxy_create_node(remote->core_proxy, + "client-node", + "client-node", + impl->type_client_node, + PW_VERSION_CLIENT_NODE, + &node->properties->dict, + sizeof(struct node_data), NULL); + if (proxy == NULL) + return NULL; + + data = proxy->user_data; + data->node = node; + data->node_proxy = (struct pw_client_node_proxy *)proxy; + pw_array_init(&data->mem_ids, 64); + pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64); + pw_array_init(&data->buffer_ids, 32); + pw_array_ensure_size(&data->buffer_ids, sizeof(struct buffer_id) * 64); + pw_signal_add(&node->need_input, &data->node_need_input, node_need_input); + pw_signal_add(&node->have_output, &data->node_have_output, node_have_output); + + pw_client_node_proxy_add_listener(data->node_proxy, proxy, &client_node_events); + do_node_init(proxy); + + return proxy; +} diff --git a/src/pipewire/remote.h b/src/pipewire/remote.h index b3b087e2e..78854b7df 100644 --- a/src/pipewire/remote.h +++ b/src/pipewire/remote.h @@ -136,6 +136,7 @@ struct pw_remote { struct spa_list proxy_list; /**< list of \ref pw_proxy objects */ struct spa_list stream_list; /**< list of \ref pw_stream objects */ + struct spa_list remote_node_list; /**< list of \ref pw_remote_node objects */ struct pw_protocol_connection *conn; /**< the protocol connection */ @@ -173,6 +174,9 @@ void pw_remote_disconnect(struct pw_remote *remote); /** Update the state of the remote, mostly used by protocols */ void pw_remote_update_state(struct pw_remote *remote, enum pw_remote_state state, const char *fmt, ...); +struct pw_proxy *pw_remote_export(struct pw_remote *remote, + struct pw_node *node); + #ifdef __cplusplus } #endif diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index b1f7a3762..9f82880b7 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -85,7 +85,6 @@ struct stream { struct pw_client_node_proxy *node_proxy; bool disconnecting; struct pw_listener node_proxy_destroy; - struct pw_listener node_proxy_sync_done; struct pw_transport *trans; @@ -852,7 +851,7 @@ static void client_node_transport(void *object, uint32_t node_id, if (impl->trans) pw_transport_destroy(impl->trans); - impl->trans = pw_transport_new_from_info(&info); + impl->trans = pw_transport_new_from_info(&info, 0); pw_log_info("stream %p: create client transport %p with fds %d %d for node %u", stream, impl->trans, readfd, writefd, node_id); diff --git a/src/pipewire/transport.c b/src/pipewire/transport.c index e1054029c..3a6653a4f 100644 --- a/src/pipewire/transport.c +++ b/src/pipewire/transport.c @@ -102,7 +102,7 @@ static void transport_reset_area(struct pw_transport *trans) * \return a newly allocated \ref pw_transport * \memberof pw_transport */ -struct pw_transport *pw_transport_new(uint32_t max_input_ports, uint32_t max_output_ports) +struct pw_transport *pw_transport_new(uint32_t max_input_ports, uint32_t max_output_ports, size_t user_data_size) { struct transport *impl; struct pw_transport *trans; @@ -113,14 +113,16 @@ struct pw_transport *pw_transport_new(uint32_t max_input_ports, uint32_t max_out area.max_output_ports = max_output_ports; area.n_output_ports = 0; - impl = calloc(1, sizeof(struct transport)); + impl = calloc(1, sizeof(struct transport) + user_data_size); if (impl == NULL) return NULL; - impl->offset = 0; - trans = &impl->trans; pw_signal_init(&trans->destroy_signal); + impl->offset = 0; + + if(user_data_size > 0) + trans->user_data = SPA_MEMBER(impl, sizeof(struct transport), void); pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | PW_MEMBLOCK_FLAG_MAP_READWRITE | @@ -133,19 +135,22 @@ struct pw_transport *pw_transport_new(uint32_t max_input_ports, uint32_t max_out return trans; } -struct pw_transport *pw_transport_new_from_info(struct pw_transport_info *info) +struct pw_transport *pw_transport_new_from_info(struct pw_transport_info *info, size_t user_data_size) { struct transport *impl; struct pw_transport *trans; void *tmp; - impl = calloc(1, sizeof(struct transport)); + impl = calloc(1, sizeof(struct transport) + user_data_size); if (impl == NULL) return NULL; trans = &impl->trans; pw_signal_init(&trans->destroy_signal); + if(user_data_size > 0) + trans->user_data = SPA_MEMBER(impl, sizeof(struct transport), void); + impl->mem.flags = PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_WITH_FD; impl->mem.fd = info->memfd; impl->mem.offset = info->offset; diff --git a/src/pipewire/transport.h b/src/pipewire/transport.h index e47918b72..a3e33692c 100644 --- a/src/pipewire/transport.h +++ b/src/pipewire/transport.h @@ -66,13 +66,15 @@ struct pw_transport { struct spa_ringbuffer *input_buffer; /**< ringbuffer for input memory */ void *output_data; /**< output memory for ringbuffer */ struct spa_ringbuffer *output_buffer; /**< ringbuffer for output memory */ + + void *user_data; }; struct pw_transport * -pw_transport_new(uint32_t max_input_ports, uint32_t max_output_ports); +pw_transport_new(uint32_t max_input_ports, uint32_t max_output_ports, size_t user_data_size); struct pw_transport * -pw_transport_new_from_info(struct pw_transport_info *info); +pw_transport_new_from_info(struct pw_transport_info *info, size_t user_data_size); void pw_transport_destroy(struct pw_transport *trans);