diff --git a/src/examples/audio-dsp-filter.c b/src/examples/audio-dsp-filter.c new file mode 100644 index 000000000..3b9d0f703 --- /dev/null +++ b/src/examples/audio-dsp-filter.c @@ -0,0 +1,147 @@ +/* PipeWire + * + * Copyright © 2019 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +struct data; + +struct port { + struct data *data; +}; + +struct data { + struct pw_main_loop *loop; + struct pw_filter *filter; + struct port *in_port; + struct port *out_port; +}; + +/* our data processing function is in general: + * + * struct pw_buffer *b; + * in = pw_filter_dequeue_buffer(filter, in_port); + * out = pw_filter_dequeue_buffer(filter, out_port); + * + * .. do stuff with buffers ... + * + * pw_filter_queue_buffer(filter, in_port, in); + * pw_filter_queue_buffer(filter, out_port, out); + * + * For DSP ports, there is a shortcut to directly dequeue, get + * the data and requeue the buffer with pw_filter_get_dsp_buffer(). + * + * + */ +static void on_process(void *userdata, struct spa_io_position *position) +{ + struct data *data = userdata; + float *in, *out; + uint32_t n_samples = position->clock.duration; + + pw_log_trace("do process %d", n_samples); + + in = pw_filter_get_dsp_buffer(data->in_port, n_samples); + out = pw_filter_get_dsp_buffer(data->out_port, n_samples); + + memcpy(out, in, n_samples * sizeof(float)); +} + +static const struct pw_filter_events filter_events = { + PW_VERSION_FILTER_EVENTS, + .process = on_process, +}; + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + + pw_init(&argc, &argv); + + /* make a main loop. If you already have another main loop, you can add + * the fd of this pipewire mainloop to it. */ + data.loop = pw_main_loop_new(NULL); + + /* Create a simple filter, the simple filter manages the core and remote + * objects for you if you don't need to deal with them. + * + * Pass your events and a user_data pointer as the last arguments. This + * will inform you about the filter state. The most important event + * you need to listen to is the process event where you need to process + * the data. + */ + data.filter = pw_filter_new_simple( + pw_main_loop_get_loop(data.loop), + "audio-filter", + pw_properties_new( + PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_MEDIA_CATEGORY, "Filter", + PW_KEY_MEDIA_ROLE, "DSP", + NULL), + &filter_events, + &data); + + /* make an audio DSP input port */ + data.in_port = pw_filter_add_port(data.filter, + PW_DIRECTION_INPUT, + PW_FILTER_PORT_FLAG_MAP_BUFFERS, + sizeof(struct port), + pw_properties_new( + PW_KEY_FORMAT_DSP, "32 bit float mono audio", + PW_KEY_PORT_NAME, "input", + NULL), + NULL, 0); + + /* make an audio DSP output port */ + data.out_port = pw_filter_add_port(data.filter, + PW_DIRECTION_OUTPUT, + PW_FILTER_PORT_FLAG_MAP_BUFFERS, + sizeof(struct port), + pw_properties_new( + PW_KEY_FORMAT_DSP, "32 bit float mono audio", + PW_KEY_PORT_NAME, "output", + NULL), + NULL, 0); + + /* Now connect this filter. We ask that our process function is + * called in a realtime thread. */ + pw_filter_connect(data.filter, PW_FILTER_FLAG_RT_PROCESS, NULL, 0); + + /* and wait while we let things run */ + pw_main_loop_run(data.loop); + + pw_filter_destroy(data.filter); + pw_main_loop_destroy(data.loop); + + return 0; +} diff --git a/src/examples/meson.build b/src/examples/meson.build index 59af1042b..825fcd103 100644 --- a/src/examples/meson.build +++ b/src/examples/meson.build @@ -23,6 +23,13 @@ executable('export-source', dependencies : [pipewire_dep, mathlib], ) +executable('audio-dsp-filter', + 'audio-dsp-filter.c', + c_args : [ '-D_GNU_SOURCE' ], + install: false, + dependencies : [pipewire_dep, mathlib], +) + executable('export-spa', 'export-spa.c', c_args : [ '-D_GNU_SOURCE' ], @@ -58,6 +65,13 @@ if sdl_dep.found() install: false, dependencies : [pipewire_dep, sdl_dep, mathlib], ) + executable('video-dsp-play', + 'video-dsp-play.c', + c_args : [ '-D_GNU_SOURCE' ], + install: false, + dependencies : [pipewire_dep, sdl_dep, mathlib], + ) + executable('local-v4l2', 'local-v4l2.c', c_args : [ '-D_GNU_SOURCE' ], diff --git a/src/examples/sdl.h b/src/examples/sdl.h index c4cdb0dfa..9fd3b6aed 100644 --- a/src/examples/sdl.h +++ b/src/examples/sdl.h @@ -77,7 +77,7 @@ static struct { #endif }; -static uint32_t sdl_format_to_id(Uint32 format) +static inline uint32_t sdl_format_to_id(Uint32 format) { size_t i; for (i = 0; i < SPA_N_ELEMENTS(sdl_video_formats); i++) { @@ -87,7 +87,7 @@ static uint32_t sdl_format_to_id(Uint32 format) return SPA_VIDEO_FORMAT_UNKNOWN; } -static Uint32 id_to_sdl_format(uint32_t id) +static inline Uint32 id_to_sdl_format(uint32_t id) { size_t i; for (i = 0; i < SPA_N_ELEMENTS(sdl_video_formats); i++) { @@ -98,7 +98,7 @@ static Uint32 id_to_sdl_format(uint32_t id) } -static struct spa_pod *sdl_build_formats(SDL_RendererInfo *info, struct spa_pod_builder *b) +static inline struct spa_pod *sdl_build_formats(SDL_RendererInfo *info, struct spa_pod_builder *b) { uint32_t i, c; struct spa_pod_frame f[2]; diff --git a/src/examples/video-dsp-play.c b/src/examples/video-dsp-play.c new file mode 100644 index 000000000..f16311089 --- /dev/null +++ b/src/examples/video-dsp-play.c @@ -0,0 +1,316 @@ +/* PipeWire + * + * Copyright © 2019 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include + +#include +#include +#include + +#include +#include + +#define WIDTH 640 +#define HEIGHT 480 +#define BPP 3 + +#define MAX_BUFFERS 64 + +#include "sdl.h" + +struct pixel { + float r, g, b, a; +}; + +struct data { + const char *path; + + SDL_Renderer *renderer; + SDL_Window *window; + SDL_Texture *texture; + SDL_Texture *cursor; + + struct pw_main_loop *loop; + + struct pw_filter *filter; + struct spa_hook filter_listener; + + void *in_port; + + struct spa_video_info_raw format; + int32_t stride; + + int counter; + SDL_Rect rect; + SDL_Rect cursor_rect; +}; + +static void handle_events(struct data *data) +{ + SDL_Event event; + while (SDL_PollEvent(&event)) { + switch (event.type) { + case SDL_QUIT: + pw_main_loop_quit(data->loop); + break; + } + } +} + +/* our data processing function is in general: + * + * struct pw_buffer *b; + * b = pw_filter_dequeue_buffer(port); + * + * .. do stuff with buffer ... + * + * pw_filter_queue_buffer(port, b); + */ +static void +on_process(void *_data, struct spa_io_position *position) +{ + struct data *data = _data; + struct pw_buffer *b; + struct spa_buffer *buf; + void *sdata, *ddata; + int sstride, dstride; + uint32_t i, j; + uint8_t *src, *dst; + + b = pw_filter_dequeue_buffer(data->in_port); + if (b == NULL) + return; + + buf = b->buffer; + + pw_log_trace("new buffer %p %dx%d", buf, data->format.size.width, data->format.size.height); + + handle_events(data); + + if ((sdata = buf->datas[0].data) == NULL) { + pw_log_error("no buffer data"); + goto done; + } + + if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) { + pw_log_error("Couldn't lock texture: %s", SDL_GetError()); + goto done; + } + + /* copy video image in texture */ + sstride = buf->datas[0].chunk->stride; + + src = sdata; + dst = ddata; + + for (i = 0; i < data->format.size.height; i++) { + struct pixel *p = (struct pixel *) src; + for (j = 0; j < data->format.size.width; j++) { + dst[j * 4 + 0] = SPA_CLAMP(lrintf(p[j].r * 255.0f), 0, 255); + dst[j * 4 + 1] = SPA_CLAMP(lrintf(p[j].g * 255.0f), 0, 255); + dst[j * 4 + 2] = SPA_CLAMP(lrintf(p[j].b * 255.0f), 0, 255); + dst[j * 4 + 3] = SPA_CLAMP(lrintf(p[j].a * 255.0f), 0, 255); + } + src += sstride; + dst += dstride; + } + SDL_UnlockTexture(data->texture); + + SDL_RenderClear(data->renderer); + SDL_RenderCopy(data->renderer, data->texture, &data->rect, NULL); + SDL_RenderPresent(data->renderer); + + done: + pw_filter_queue_buffer(data->in_port, b); +} + +static void on_filter_state_changed(void *_data, enum pw_filter_state old, + enum pw_filter_state state, const char *error) +{ + struct data *data = _data; + fprintf(stderr, "filter state: \"%s\"\n", pw_filter_state_as_string(state)); + switch (state) { + case PW_FILTER_STATE_UNCONNECTED: + pw_main_loop_quit(data->loop); + break; + case PW_FILTER_STATE_PAUSED: + /* because we started inactive, activate ourselves now */ + pw_filter_set_active(data->filter, true); + break; + default: + break; + } +} + +static void +on_filter_param_changed(void *_data, void *port_data, uint32_t id, const struct spa_pod *param) +{ + struct data *data = _data; + struct pw_filter *filter = data->filter; + uint8_t params_buffer[1024]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer)); + const struct spa_pod *params[5]; + Uint32 sdl_format; + void *d; + + if (id != SPA_PARAM_Format) + return; + + /* NULL means to clear the format */ + if (param == NULL) { + pw_filter_update_params(filter, port_data, 0, NULL, 0); + return; + } + + fprintf(stderr, "got format:\n"); + spa_debug_format(2, NULL, param); + + /* call a helper function to parse the format for us. */ + spa_format_video_raw_parse(param, &data->format); + + if (data->format.format == SPA_VIDEO_FORMAT_RGBA_F32) + sdl_format = SDL_PIXELFORMAT_RGBA32; + else + sdl_format = SDL_PIXELFORMAT_UNKNOWN; + + if (sdl_format == SDL_PIXELFORMAT_UNKNOWN) { + pw_filter_update_params(filter, port_data, -EINVAL, NULL, 0); + return; + } + + data->texture = SDL_CreateTexture(data->renderer, + sdl_format, + SDL_TEXTUREACCESS_STREAMING, + data->format.size.width, + data->format.size.height); + SDL_LockTexture(data->texture, NULL, &d, &data->stride); + SDL_UnlockTexture(data->texture); + + data->rect.x = 0; + data->rect.y = 0; + data->rect.w = data->format.size.width; + data->rect.h = data->format.size.height; + + /* a SPA_TYPE_OBJECT_ParamBuffers object defines the acceptable size, + * number, stride etc of the buffers */ + params[0] = spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(8, 2, MAX_BUFFERS), + SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), + SPA_PARAM_BUFFERS_size, SPA_POD_Int(data->stride * data->format.size.height), + SPA_PARAM_BUFFERS_stride, SPA_POD_Int(data->stride), + SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); + + /* we are done */ + pw_filter_update_params(filter, port_data, 0, params, 1); +} + +/* these are the filter events we listen for */ +static const struct pw_filter_events filter_events = { + PW_VERSION_FILTER_EVENTS, + .state_changed = on_filter_state_changed, + .param_changed = on_filter_param_changed, + .process = on_process, +}; + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + const struct spa_pod *params[1]; + + pw_init(&argc, &argv); + + /* create a main loop */ + data.loop = pw_main_loop_new(NULL); + + /* create a simple filter, the simple filter manages to core and remote + * objects for you if you don't need to deal with them + * + * If you plan to autoconnect your filter, you need to provide at least + * media, category and role properties + * + * Pass your events and a user_data pointer as the last arguments. This + * will inform you about the filter state. The most important event + * you need to listen to is the process event where you need to consume + * the data provided to you. + */ + data.filter = pw_filter_new_simple( + pw_main_loop_get_loop(data.loop), + "video-dsp-play", + pw_properties_new( + PW_KEY_MEDIA_TYPE, "Video", + PW_KEY_MEDIA_CATEGORY, "Capture", + PW_KEY_MEDIA_ROLE, "DSP", + NULL), + &filter_events, + &data); + + data.path = argc > 1 ? argv[1] : NULL; + + if (SDL_Init(SDL_INIT_VIDEO) < 0) { + fprintf(stderr, "can't initialize SDL: %s\n", SDL_GetError()); + return -1; + } + + if (SDL_CreateWindowAndRenderer + (WIDTH, HEIGHT, SDL_WINDOW_RESIZABLE, &data.window, &data.renderer)) { + fprintf(stderr, "can't create window: %s\n", SDL_GetError()); + return -1; + } + + /* build the extra parameters to connect with. To connect, we can provide + * a list of supported formats. We use a builder that writes the param + * object to the stack. */ + + data.in_port = pw_filter_add_port(data.filter, + PW_DIRECTION_INPUT, + PW_FILTER_PORT_FLAG_MAP_BUFFERS, + 0, + pw_properties_new( + PW_KEY_FORMAT_DSP, "32 bit float RGBA video", + PW_KEY_PORT_NAME, "input", + NULL), + params, 1); + + pw_filter_connect(data.filter, + 0, + //PW_FILTER_FLAG_RT_PROCESS, + NULL, 0); + + /* do things until we quit the mainloop */ + pw_main_loop_run(data.loop); + + pw_filter_destroy(data.filter); + pw_main_loop_destroy(data.loop); + + SDL_DestroyTexture(data.texture); + if (data.cursor) + SDL_DestroyTexture(data.cursor); + SDL_DestroyRenderer(data.renderer); + SDL_DestroyWindow(data.window); + + return 0; +} diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c new file mode 100644 index 000000000..b4d9141ae --- /dev/null +++ b/src/pipewire/filter.c @@ -0,0 +1,1605 @@ +/* PipeWire + * + * Copyright © 2019 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "pipewire/pipewire.h" +#include "pipewire/filter.h" +#include "pipewire/private.h" + +#define NAME "filter" + +#define MAX_SAMPLES 8192 +#define MAX_BUFFERS 64 +#define MIN_QUEUED 1 + +#define MASK_BUFFERS (MAX_BUFFERS-1) +#define MAX_PORTS 1024 + +static float empty[MAX_SAMPLES]; + +struct buffer { + struct pw_buffer this; + uint32_t id; +#define BUFFER_FLAG_MAPPED (1 << 0) +#define BUFFER_FLAG_QUEUED (1 << 1) + uint32_t flags; +}; + +struct queue { + uint32_t ids[MAX_BUFFERS]; + struct spa_ringbuffer ring; + uint64_t incount; + uint64_t outcount; +}; + +struct data { + struct pw_core *core; + struct pw_remote *remote; + struct spa_hook filter_listener; +}; + +struct param { + uint32_t id; + struct spa_list link; + struct spa_pod *param; +}; + +struct port { + struct spa_list link; + + struct filter *filter; + + enum spa_direction direction; + uint32_t id; + uint32_t flags; + struct pw_port *port; + + struct pw_properties *props; + + uint32_t change_mask_all; + struct spa_port_info info; + struct spa_list param_list; + struct spa_param_info params[5]; + + unsigned int alloc_buffers:1; + + struct spa_io_buffers *io; + + struct buffer buffers[MAX_BUFFERS]; + uint32_t n_buffers; + + struct queue dequeued; + struct queue queued; + + /* from here is what the caller gets as user_data */ + uint8_t user_data[0]; +}; + +struct filter { + struct pw_filter this; + + const char *path; + + struct pw_core *core; + + enum pw_filter_flags flags; + + struct spa_hook remote_listener; + + struct pw_node *node; + + struct spa_node impl_node; + struct spa_node_methods node_methods; + struct spa_hook_list hooks; + struct spa_callbacks callbacks; + struct spa_io_position *position; + + struct spa_list port_list;; + struct port *ports[2][MAX_PORTS]; + + struct spa_list param_list; + struct spa_param_info params[5]; + + uint32_t pending_seq; + + struct data data; + uintptr_t seq; + struct pw_time time; + + unsigned int async_connect:1; + unsigned int disconnecting:1; + unsigned int free_data:1; + unsigned int subscribe:1; + unsigned int draining:1; +}; + +static int get_param_index(uint32_t id) +{ + switch (id) { + case SPA_PARAM_EnumFormat: + return 0; + case SPA_PARAM_Meta: + return 1; + case SPA_PARAM_IO: + return 2; + case SPA_PARAM_Format: + return 3; + case SPA_PARAM_Buffers: + return 4; + default: + return -1; + } +} + +static struct param *add_param(struct filter *impl, struct port *port, + uint32_t id, const struct spa_pod *param) +{ + struct param *p; + int idx; + + if (param == NULL || !spa_pod_is_object(param)) { + errno = EINVAL; + return NULL; + } + if (id == SPA_ID_INVALID) + id = SPA_POD_OBJECT_ID(param); + + p = malloc(sizeof(struct param) + SPA_POD_SIZE(param)); + if (p == NULL) + return NULL; + + p->id = id; + p->param = SPA_MEMBER(p, sizeof(struct param), struct spa_pod); + memcpy(p->param, param, SPA_POD_SIZE(param)); + + idx = get_param_index(id); + + if (port) { + spa_list_append(&port->param_list, &p->link); + if (idx != -1) + port->params[idx].flags |= SPA_PARAM_INFO_READ; + } else { + spa_list_append(&impl->param_list, &p->link); + if (idx != -1) + impl->params[idx].flags |= SPA_PARAM_INFO_READ; + } + return p; +} + +static void clear_params(struct filter *impl, struct port *port, uint32_t id) +{ + struct param *p, *t; + struct spa_list *param_list; + + if (port) + param_list = &port->param_list; + else + param_list = &impl->param_list; + + spa_list_for_each_safe(p, t, param_list, link) { + if (id == SPA_ID_INVALID || p->id == id) { + spa_list_remove(&p->link); + free(p); + } + } +} + +static struct port *alloc_port(struct filter *filter, + enum spa_direction direction, uint32_t user_data_size) +{ + struct port *p; + int i; + + for (i = 0; i < MAX_PORTS; i++) { + if ((p = filter->ports[direction][i]) == NULL) + break; + } + if (i == MAX_PORTS) + return NULL; + + p = calloc(1, sizeof(struct port) + user_data_size); + p->filter = filter; + p->direction = direction; + p->id = i; + + spa_list_init(&p->param_list); + spa_ringbuffer_init(&p->dequeued.ring); + spa_ringbuffer_init(&p->queued.ring); + + filter->ports[direction][i] = p; + spa_list_append(&filter->port_list, &p->link); + + return p; +} + +static inline struct port *get_port(struct filter *filter, enum spa_direction direction, uint32_t port_id) +{ + return filter->ports[direction][port_id]; +} + +static inline int push_queue(struct port *port, struct queue *queue, struct buffer *buffer) +{ + uint32_t index; + + if (SPA_FLAG_IS_SET(buffer->flags, BUFFER_FLAG_QUEUED)) + return -EINVAL; + + SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED); + queue->incount += buffer->this.size; + + spa_ringbuffer_get_write_index(&queue->ring, &index); + queue->ids[index & MASK_BUFFERS] = buffer->id; + spa_ringbuffer_write_update(&queue->ring, index + 1); + + return 0; +} + +static inline struct buffer *pop_queue(struct port *port, struct queue *queue) +{ + int32_t avail; + uint32_t index, id; + struct buffer *buffer; + + if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < MIN_QUEUED) { + errno = EPIPE; + return NULL; + } + + id = queue->ids[index & MASK_BUFFERS]; + spa_ringbuffer_read_update(&queue->ring, index + 1); + + buffer = &port->buffers[id]; + queue->outcount += buffer->this.size; + SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED); + + return buffer; +} + +static inline void clear_queue(struct port *port, struct queue *queue) +{ + spa_ringbuffer_init(&queue->ring); + queue->incount = queue->outcount; +} + +static bool filter_set_state(struct pw_filter *filter, enum pw_filter_state state, const char *error) +{ + enum pw_filter_state old = filter->state; + bool res = old != state; + + if (res) { + free(filter->error); + filter->error = error ? strdup(error) : NULL; + + pw_log_debug(NAME" %p: update state from %s -> %s (%s)", filter, + pw_filter_state_as_string(old), + pw_filter_state_as_string(state), filter->error); + + filter->state = state; + pw_filter_emit_state_changed(filter, old, state, error); + } + return res; +} + +static int impl_set_io(void *object, uint32_t id, void *data, size_t size) +{ + struct filter *impl = object; + + pw_log_debug(NAME" %p: io %d %p/%zd", impl, id, data, size); + + switch(id) { + case SPA_IO_Position: + if (data && size >= sizeof(struct spa_io_position)) + impl->position = data; + else + impl->position = NULL; + break; + } + pw_filter_emit_io_changed(&impl->this, NULL, id, data, size); + + return 0; +} + +static int impl_send_command(void *object, const struct spa_command *command) +{ + struct filter *impl = object; + struct pw_filter *filter = &impl->this; + + switch (SPA_NODE_COMMAND_ID(command)) { + case SPA_NODE_COMMAND_Suspend: + case SPA_NODE_COMMAND_Pause: + pw_loop_invoke(impl->core->main_loop, + NULL, 0, NULL, 0, false, impl); + if (filter->state == PW_FILTER_STATE_STREAMING) { + pw_log_debug(NAME" %p: pause", filter); + filter_set_state(filter, PW_FILTER_STATE_PAUSED, NULL); + } + break; + case SPA_NODE_COMMAND_Start: + if (filter->state == PW_FILTER_STATE_PAUSED) { + pw_log_debug(NAME" %p: start", filter); + filter_set_state(filter, PW_FILTER_STATE_STREAMING, NULL); + } + break; + default: + pw_log_warn(NAME" %p: unhandled node command %d", filter, + SPA_NODE_COMMAND_ID(command)); + break; + } + return 0; +} + +static void emit_node_info(struct filter *d) +{ + struct spa_node_info info; + + info = SPA_NODE_INFO_INIT(); + info.max_input_ports = MAX_PORTS; + info.max_output_ports = MAX_PORTS; + info.change_mask |= SPA_NODE_CHANGE_MASK_FLAGS; + info.flags = SPA_NODE_FLAG_RT; + spa_node_emit_info(&d->hooks, &info); +} + +static void emit_port_info(struct filter *d, struct port *p, bool full) +{ + if (full) + p->info.change_mask = p->change_mask_all; + if (p->info.change_mask != 0) + spa_node_emit_port_info(&d->hooks, p->direction, p->id, &p->info); +} + +static int impl_add_listener(void *object, + struct spa_hook *listener, + const struct spa_node_events *events, + void *data) +{ + struct filter *d = object; + struct spa_hook_list save; + struct port *p; + + spa_hook_list_isolate(&d->hooks, &save, listener, events, data); + + emit_node_info(d); + + spa_list_for_each(p, &d->port_list, link) + emit_port_info(d, p, true); + + spa_hook_list_join(&d->hooks, &save); + + return 0; +} + +static int impl_set_callbacks(void *object, + const struct spa_node_callbacks *callbacks, void *data) +{ + struct filter *d = object; + + d->callbacks = SPA_CALLBACKS_INIT(callbacks, data); + + return 0; +} + +static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t port_id, + uint32_t id, void *data, size_t size) +{ + struct filter *impl = object; + struct port *port; + + pw_log_debug(NAME" %p: set io %s %p %zd", impl, + spa_debug_type_find_name(spa_type_io, id), data, size); + + if ((port = get_port(impl, direction, port_id)) == NULL) + return -EINVAL; + + switch (id) { + case SPA_IO_Buffers: + if (data && size >= sizeof(struct spa_io_buffers)) + port->io = data; + else + port->io = NULL; + break; + } + + pw_filter_emit_io_changed(&impl->this, port->user_data, id, data, size); + + return 0; +} + +static int impl_port_enum_params(void *object, int seq, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t start, uint32_t num, + const struct spa_pod *filter) +{ + struct filter *d = object; + struct port *port; + struct spa_result_node_params result; + uint8_t buffer[1024]; + struct spa_pod_builder b = { 0 }; + uint32_t idx = 0, count = 0; + struct param *p; + + spa_return_val_if_fail(num != 0, -EINVAL); + + result.id = id; + result.next = start; + + if ((port = get_port(d, direction, port_id)) == NULL) + return -EINVAL; + + pw_log_debug(NAME" %p: param id %d (%s) start:%d num:%d", d, id, + spa_debug_type_find_name(spa_type_param, id), + start, num); + + spa_list_for_each(p, &port->param_list, link) { + struct spa_pod *param; + + if (idx++ < start) + continue; + + result.index = result.next++; + + param = p->param; + if (param == NULL || p->id != id) + continue; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + if (spa_pod_filter(&b, &result.param, param, filter) != 0) + continue; + + spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result); + + if (++count == num) + break; + } + return 0; +} + +static int port_set_param(struct filter *impl, struct port *port, + uint32_t id, uint32_t flags, const struct spa_pod *param) +{ + struct pw_filter *filter = &impl->this; + struct param *p; + int res, idx; + + pw_log_debug(NAME" %p: param changed: %p %d", impl, param, impl->disconnecting); + if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) + spa_debug_pod(2, NULL, param); + + clear_params(impl, port, id); + if (param) { + p = add_param(impl, port, id, param); + if (p == NULL) { + res = -errno; + goto error_exit; + } + SPA_POD_OBJECT_ID(p->param) = id; + } + else + p = NULL; + + pw_filter_emit_param_changed(filter, port->user_data, id, p ? p->param : NULL); + + if (filter->state == PW_FILTER_STATE_ERROR) + return -EIO; + + idx = get_param_index(id); + if (idx != -1) { + impl->params[idx].flags |= SPA_PARAM_INFO_READ; + impl->params[idx].flags ^= SPA_PARAM_INFO_SERIAL; + emit_port_info(impl, port, false); + } + return 0; + +error_exit: + return res; +} + +static int impl_port_set_param(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t id, uint32_t flags, + const struct spa_pod *param) +{ + struct filter *impl = object; + struct port *port; + + if (impl->disconnecting) + return param == NULL ? 0 : -EIO; + + if ((port = get_port(impl, direction, port_id)) == NULL) + return -EINVAL; + + return port_set_param(impl, port, id, flags, param); +} + +static int map_data(struct filter *impl, struct spa_data *data, int prot) +{ + void *ptr; + struct pw_map_range range; + + pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize); + + ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset); + if (ptr == MAP_FAILED) { + pw_log_error(NAME" %p: failed to mmap buffer mem: %m", impl); + return -errno; + } + data->data = SPA_MEMBER(ptr, range.start, void); + pw_log_debug(NAME" %p: fd %"PRIi64" mapped %d %d %p", impl, data->fd, + range.offset, range.size, data->data); + + return 0; +} + +static int unmap_data(struct filter *impl, struct spa_data *data) +{ + struct pw_map_range range; + + pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize); + + if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0) + pw_log_warn(NAME" %p: failed to unmap: %m", impl); + + pw_log_debug(NAME" %p: fd %"PRIi64" unmapped", impl, data->fd); + return 0; +} + +static void clear_buffers(struct port *port) +{ + uint32_t i, j; + struct filter *impl = port->filter; + + pw_log_debug(NAME" %p: clear buffers %d", impl, port->n_buffers); + + for (i = 0; i < port->n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + + pw_filter_emit_remove_buffer(&impl->this, port->user_data, &b->this); + + if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_MAPPED)) { + for (j = 0; j < b->this.buffer->n_datas; j++) { + struct spa_data *d = &b->this.buffer->datas[j]; + pw_log_debug(NAME" %p: clear buffer %d mem", + impl, b->id); + unmap_data(impl, d); + } + } + } + port->n_buffers = 0; + clear_queue(port, &port->dequeued); + clear_queue(port, &port->queued); +} + +static int impl_port_use_buffers(void *object, + enum spa_direction direction, uint32_t port_id, + uint32_t flags, + struct spa_buffer **buffers, uint32_t n_buffers) +{ + struct filter *impl = object; + struct port *port; + struct pw_filter *filter = &impl->this; + uint32_t i, j, impl_flags; + int prot, res; + int size = 0; + + if (impl->disconnecting) + return n_buffers == 0 ? 0 : -EIO; + + if ((port = get_port(impl, direction, port_id)) == NULL) + return -EINVAL; + + impl_flags = port->flags; + prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + + clear_buffers(port); + + for (i = 0; i < n_buffers; i++) { + int buf_size = 0; + struct buffer *b = &port->buffers[i]; + + b->flags = 0; + b->id = i; + + if (SPA_FLAG_IS_SET(impl_flags, PW_FILTER_PORT_FLAG_MAP_BUFFERS)) { + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = &buffers[i]->datas[j]; + if (d->type == SPA_DATA_MemFd || + d->type == SPA_DATA_DmaBuf) { + if ((res = map_data(impl, d, prot)) < 0) + return res; + } + else if (d->data == NULL) { + pw_log_error(NAME" %p: invalid buffer mem", filter); + return -EINVAL; + } + buf_size += d->maxsize; + } + SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED); + + if (size > 0 && buf_size != size) { + pw_log_error(NAME" %p: invalid buffer size %d", filter, buf_size); + return -EINVAL; + } else + size = buf_size; + } + pw_log_debug(NAME" %p: got buffer %d %d datas, mapped size %d", filter, i, + buffers[i]->n_datas, size); + } + + for (i = 0; i < n_buffers; i++) { + struct buffer *b = &port->buffers[i]; + + b->flags = 0; + b->id = i; + b->this.buffer = buffers[i]; + + if (port->direction == SPA_DIRECTION_OUTPUT) { + pw_log_trace(NAME" %p: recycle buffer %d", filter, b->id); + push_queue(port, &port->dequeued, b); + } + + pw_filter_emit_add_buffer(filter, port->user_data, &b->this); + } + + port->n_buffers = n_buffers; + + return 0; +} + +static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) +{ + struct filter *impl = object; + struct port *port; + + if ((port = get_port(impl, SPA_DIRECTION_OUTPUT, port_id)) == NULL) + return -EINVAL; + + pw_log_trace(NAME" %p: recycle buffer %d", impl, buffer_id); + if (buffer_id < port->n_buffers) + push_queue(port, &port->queued, &port->buffers[buffer_id]); + + return 0; +} + +static inline void copy_position(struct filter *impl) +{ + struct spa_io_position *p = impl->position; + if (p != NULL) { + SEQ_WRITE(impl->seq); + impl->time.now = p->clock.nsec; + impl->time.rate = p->clock.rate; + impl->time.ticks = p->clock.position; + impl->time.delay = p->clock.delay; + SEQ_WRITE(impl->seq); + } +} + +static int +do_call_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct filter *impl = user_data; + struct pw_filter *filter = &impl->this; + pw_log_trace(NAME" %p: do process", filter); + pw_filter_emit_process(filter, impl->position); + return 0; +} + +static void call_process(struct filter *impl) +{ + pw_log_trace(NAME" %p: call process", impl); + if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_RT_PROCESS)) { + do_call_process(NULL, false, 1, NULL, 0, impl); + } + else { + pw_loop_invoke(impl->core->main_loop, + do_call_process, 1, NULL, 0, false, impl); + } +} + +static int +do_call_drained(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct filter *impl = user_data; + struct pw_filter *filter = &impl->this; + pw_log_trace(NAME" %p: drained", filter); + pw_filter_emit_drained(filter); + impl->draining = false; + return 0; +} + +static void call_drained(struct filter *impl) +{ + pw_loop_invoke(impl->core->main_loop, + do_call_drained, 1, NULL, 0, false, impl); +} + +static int impl_node_process(void *object) +{ + struct filter *impl = object; + struct port *p; + struct buffer *b; + bool drained = true; + + pw_log_trace(NAME" %p: do process %p", impl, impl->position); + + /** first dequeue and recycle buffers */ + spa_list_for_each(p, &impl->port_list, link) { + struct spa_io_buffers *io = p->io; + + if (io == NULL || + io->buffer_id >= p->n_buffers) + continue; + + if (p->direction == SPA_DIRECTION_INPUT) { + if (io->status != SPA_STATUS_HAVE_DATA) + continue; + + /* push new buffer */ + b = &p->buffers[io->buffer_id]; + pw_log_trace(NAME" %p: dequeue buffer %d", impl, b->id); + push_queue(p, &p->dequeued, b); + drained = false; + } else { + if (io->status == SPA_STATUS_HAVE_DATA) + continue; + + /* recycle old buffer */ + b = &p->buffers[io->buffer_id]; + pw_log_trace(NAME" %p: recycle buffer %d", impl, b->id); + push_queue(p, &p->dequeued, b); + } + } + + copy_position(impl); + call_process(impl); + + /** recycle/push queued buffers */ + spa_list_for_each(p, &impl->port_list, link) { + struct spa_io_buffers *io = p->io; + + if (io == NULL) + continue; + + if (p->direction == SPA_DIRECTION_INPUT) { + if (io->status != SPA_STATUS_HAVE_DATA) + continue; + + /* pop buffer to recycle */ + if ((b = pop_queue(p, &p->queued)) != NULL) { + pw_log_trace(NAME" %p: recycle buffer %d", impl, b->id); + io->buffer_id = b->id; + } else { + io->buffer_id = SPA_ID_INVALID; + } + io->status = SPA_STATUS_NEED_DATA; + } else { + if (io->status == SPA_STATUS_HAVE_DATA) + continue; + + if ((b = pop_queue(p, &p->queued)) != NULL) { + pw_log_trace(NAME" %p: pop %d %p", impl, b->id, io); + io->buffer_id = b->id; + io->status = SPA_STATUS_HAVE_DATA; + drained = false; + } else { + io->buffer_id = SPA_ID_INVALID; + io->status = SPA_STATUS_NEED_DATA; + } + } + } + if (drained && impl->draining) + call_drained(impl); + + return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA; +} + +static const struct spa_node_methods impl_node = { + SPA_VERSION_NODE_METHODS, + .add_listener = impl_add_listener, + .set_callbacks = impl_set_callbacks, + .set_io = impl_set_io, + .send_command = impl_send_command, + .port_set_io = impl_port_set_io, + .port_enum_params = impl_port_enum_params, + .port_set_param = impl_port_set_param, + .port_use_buffers = impl_port_use_buffers, + .port_reuse_buffer = impl_port_reuse_buffer, + .process = impl_node_process, +}; + +static void proxy_destroy(void *_data) +{ + struct pw_filter *filter = _data; + filter->proxy = NULL; + spa_hook_remove(&filter->proxy_listener); + filter->node_id = SPA_ID_INVALID; + filter_set_state(filter, PW_FILTER_STATE_UNCONNECTED, NULL); +} + +static void proxy_error(void *_data, int seq, int res, const char *message) +{ + struct pw_filter *filter = _data; + filter_set_state(filter, PW_FILTER_STATE_ERROR, message); +} + +static const struct pw_proxy_events proxy_events = { + PW_VERSION_PROXY_EVENTS, + .destroy = proxy_destroy, + .error = proxy_error, +}; + +static void node_event_info(void *object, const struct pw_node_info *info) +{ + struct pw_filter *filter = object; + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + uint32_t subscribe[info->n_params], n_subscribe = 0; + uint32_t i; + + if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS && !impl->subscribe) { + for (i = 0; i < info->n_params; i++) { + + switch (info->params[i].id) { + case SPA_PARAM_PropInfo: + case SPA_PARAM_Props: + subscribe[n_subscribe++] = info->params[i].id; + break; + default: + break; + } + } + if (n_subscribe > 0) { + pw_node_proxy_subscribe_params((struct pw_node_proxy*)filter->proxy, + subscribe, n_subscribe); + impl->subscribe = true; + } + } +} + +static const struct pw_node_proxy_events node_events = { + PW_VERSION_NODE_PROXY_EVENTS, + .info = node_event_info, +}; + +static int handle_connect(struct pw_filter *filter) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + struct pw_properties *props; + int res; + + pw_log_debug(NAME" %p: creating node", filter); + props = pw_properties_copy(filter->properties); + + impl->node = pw_node_new(impl->core, props, 0); + if (impl->node == NULL) { + res = -errno; + goto error_node; + } + + impl->node->port_user_data_size = sizeof(struct port); + + pw_node_set_implementation(impl->node, &impl->impl_node); + + pw_log_debug(NAME" %p: export node %p", filter, impl->node); + filter->proxy = pw_remote_export(filter->remote, + PW_TYPE_INTERFACE_Node, NULL, impl->node, 0); + if (filter->proxy == NULL) { + res = -errno; + goto error_proxy; + } + + pw_proxy_add_listener(filter->proxy, &filter->proxy_listener, &proxy_events, filter); + pw_node_proxy_add_listener((struct pw_node_proxy*)filter->proxy, + &filter->node_listener, &node_events, filter); + + if (!SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_INACTIVE)) + pw_node_set_active(impl->node, true); + + return 0; + +error_node: + pw_log_error(NAME" %p: can't make node: %s", filter, spa_strerror(res)); + return res; +error_proxy: + pw_log_error(NAME" %p: can't make proxy: %s", filter, spa_strerror(res)); + return res; +} + +static void on_remote_state_changed(void *_data, enum pw_remote_state old, + enum pw_remote_state state, const char *error) +{ + struct pw_filter *filter = _data; + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + + pw_log_debug(NAME" %p: remote state %d", filter, state); + + switch (state) { + case PW_REMOTE_STATE_ERROR: + filter_set_state(filter, PW_FILTER_STATE_ERROR, error); + break; + case PW_REMOTE_STATE_UNCONNECTED: + filter_set_state(filter, PW_FILTER_STATE_UNCONNECTED, "remote unconnected"); + break; + + case PW_REMOTE_STATE_CONNECTED: + if (impl->async_connect) + handle_connect(filter); + break; + + default: + break; + } +} + +static void on_remote_exported(void *_data, uint32_t proxy_id, uint32_t global_id) +{ + struct pw_filter *filter = _data; + if (filter->proxy && filter->proxy->id == proxy_id) { + filter->node_id = global_id; + filter_set_state(filter, PW_FILTER_STATE_PAUSED, NULL); + } +} + +static const struct pw_remote_events remote_events = { + PW_VERSION_REMOTE_EVENTS, + .state_changed = on_remote_state_changed, + .exported = on_remote_exported, +}; + +SPA_EXPORT +struct pw_filter * pw_filter_new(struct pw_remote *remote, const char *name, + struct pw_properties *props) +{ + struct filter *impl; + struct pw_filter *this; + const char *str; + int res; + + impl = calloc(1, sizeof(struct filter)); + if (impl == NULL) { + res = -errno; + goto error_cleanup; + } + + this = &impl->this; + pw_log_debug(NAME" %p: new", impl); + + if (props == NULL) { + props = pw_properties_new(PW_KEY_MEDIA_NAME, name, NULL); + } else if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) { + pw_properties_set(props, PW_KEY_MEDIA_NAME, name); + } + if (props == NULL) { + res = -errno; + goto error_properties; + } + + if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL) { + const struct pw_properties *p = pw_remote_get_properties(remote); + + if ((str = pw_properties_get(p, PW_KEY_APP_NAME)) != NULL) + pw_properties_set(props, PW_KEY_NODE_NAME, str); + else if ((str = pw_properties_get(p, PW_KEY_APP_PROCESS_BINARY)) != NULL) + pw_properties_set(props, PW_KEY_NODE_NAME, str); + else + pw_properties_set(props, PW_KEY_NODE_NAME, name); + } + + spa_hook_list_init(&impl->hooks); + this->properties = props; + + this->remote = remote; + this->name = name ? strdup(name) : NULL; + this->node_id = SPA_ID_INVALID; + + spa_list_init(&impl->param_list); + spa_list_init(&impl->port_list); + + spa_hook_list_init(&this->listener_list); + spa_list_init(&this->controls); + + this->state = PW_FILTER_STATE_UNCONNECTED; + + impl->core = remote->core; + impl->pending_seq = SPA_ID_INVALID; + + pw_remote_add_listener(remote, &impl->remote_listener, &remote_events, this); + + spa_list_append(&remote->filter_list, &this->link); + + return this; + +error_properties: + free(impl); +error_cleanup: + if (props) + pw_properties_free(props); + errno = -res; + return NULL; +} + +SPA_EXPORT +struct pw_filter * +pw_filter_new_simple(struct pw_loop *loop, + const char *name, + struct pw_properties *props, + const struct pw_filter_events *events, + void *data) +{ + struct pw_filter *filter; + struct filter *impl; + struct pw_core *core; + struct pw_remote *remote; + int res; + + core = pw_core_new(loop, NULL, 0); + remote = pw_remote_new(core, NULL, 0); + + filter = pw_filter_new(remote, name, props); + if (filter == NULL) { + res = -errno; + goto error_cleanup; + } + + impl = SPA_CONTAINER_OF(filter, struct filter, this); + + impl->free_data = true; + impl->data.core = core; + impl->data.remote = remote; + + pw_filter_add_listener(filter, &impl->data.filter_listener, events, data); + + return filter; + +error_cleanup: + pw_core_destroy(core); + errno = -res; + return NULL; +} + +SPA_EXPORT +const char *pw_filter_state_as_string(enum pw_filter_state state) +{ + switch (state) { + case PW_FILTER_STATE_ERROR: + return "error"; + case PW_FILTER_STATE_UNCONNECTED: + return "unconnected"; + case PW_FILTER_STATE_CONNECTING: + return "connecting"; + case PW_FILTER_STATE_PAUSED: + return "paused"; + case PW_FILTER_STATE_STREAMING: + return "streaming"; + } + return "invalid-state"; +} + +SPA_EXPORT +void pw_filter_destroy(struct pw_filter *filter) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + + pw_log_debug(NAME" %p: destroy", filter); + + pw_filter_emit_destroy(filter); + + pw_filter_disconnect(filter); + + spa_hook_remove(&impl->remote_listener); + spa_list_remove(&filter->link); + + clear_params(impl, NULL, SPA_ID_INVALID); + + pw_log_debug(NAME" %p: free", filter); + free(filter->error); + + pw_properties_free(filter->properties); + + free(filter->name); + + if (impl->free_data) + pw_core_destroy(impl->data.core); + + free(impl); +} + +SPA_EXPORT +void pw_filter_add_listener(struct pw_filter *filter, + struct spa_hook *listener, + const struct pw_filter_events *events, + void *data) +{ + spa_hook_list_append(&filter->listener_list, listener, events, data); +} + +SPA_EXPORT +enum pw_filter_state pw_filter_get_state(struct pw_filter *filter, const char **error) +{ + if (error) + *error = filter->error; + return filter->state; +} + +SPA_EXPORT +struct pw_remote *pw_filter_get_remote(struct pw_filter *filter) +{ + return filter->remote; +} + +SPA_EXPORT +const char *pw_filter_get_name(struct pw_filter *filter) +{ + return filter->name; +} + +SPA_EXPORT +const struct pw_properties *pw_filter_get_properties(struct pw_filter *filter, void *port_data) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); + + if (port_data) { + if (port->port) + return pw_port_get_properties(port->port); + } else { + if (impl->node) + return pw_node_get_properties(impl->node); + } + return NULL; +} + +SPA_EXPORT +int pw_filter_update_properties(struct pw_filter *filter, void *port_data, const struct spa_dict *dict) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); + int changed = 0; + + if (port_data) { + if (port->port) + changed = pw_port_update_properties(port->port, dict); + } else { + if (impl->node) + changed = pw_node_update_properties(impl->node, dict); + } + return changed; +} + +SPA_EXPORT +int +pw_filter_connect(struct pw_filter *filter, + enum pw_filter_flags flags, + const struct spa_pod **params, + uint32_t n_params) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + enum pw_remote_state state; + int res; + uint32_t i; + + pw_log_debug(NAME" %p: connect", filter); + impl->flags = flags; + impl->node_methods = impl_node; + + impl->impl_node.iface = SPA_INTERFACE_INIT( + SPA_TYPE_INTERFACE_Node, + SPA_VERSION_NODE, + &impl->node_methods, impl); + + clear_params(impl, NULL, SPA_ID_INVALID); + for (i = 0; i < n_params; i++) { + add_param(impl, NULL, SPA_ID_INVALID, params[i]); + } + + impl->disconnecting = false; + filter_set_state(filter, PW_FILTER_STATE_CONNECTING, NULL); + + state = pw_remote_get_state(filter->remote, NULL); + impl->async_connect = (state == PW_REMOTE_STATE_UNCONNECTED || + state == PW_REMOTE_STATE_ERROR); + + if (impl->async_connect) + res = pw_remote_connect(filter->remote); + else + res = handle_connect(filter); + + return res; +} + +SPA_EXPORT +uint32_t pw_filter_get_node_id(struct pw_filter *filter) +{ + return filter->node_id; +} + +SPA_EXPORT +int pw_filter_disconnect(struct pw_filter *filter) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + + pw_log_debug(NAME" %p: disconnect", filter); + impl->disconnecting = true; + + if (impl->node) + pw_node_set_active(impl->node, false); + + if (filter->proxy) + pw_proxy_destroy(filter->proxy); + + if (impl->node) { + pw_node_destroy(impl->node); + impl->node = NULL; + } + + return 0; +} + +static void add_port_params(struct filter *impl, struct port *port) +{ + uint8_t buffer[4096]; + struct spa_pod_builder b; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + add_param(impl, port, SPA_PARAM_IO, + spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO, + SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers), + SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers)))); +} + +static void add_audio_dsp_port_params(struct filter *impl, struct port *port) +{ + uint8_t buffer[4096]; + struct spa_pod_builder b; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + add_param(impl, port, SPA_PARAM_EnumFormat, + spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_audio), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw), + SPA_FORMAT_AUDIO_format, SPA_POD_Id(SPA_AUDIO_FORMAT_F32P), + SPA_FORMAT_AUDIO_rate, SPA_POD_CHOICE_RANGE_Int( + 48000, 1, INT32_MAX), + SPA_FORMAT_AUDIO_channels, SPA_POD_Int(1))); + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + add_param(impl, port, SPA_PARAM_Buffers, + spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(1, 1, MAX_BUFFERS), + SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1), + SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_STEP_Int( + MAX_SAMPLES * sizeof(float), + sizeof(float), + MAX_SAMPLES * sizeof(float), + sizeof(float)), + SPA_PARAM_BUFFERS_stride, SPA_POD_Int(4), + SPA_PARAM_BUFFERS_align, SPA_POD_Int(16))); +} + +static void add_video_dsp_port_params(struct filter *impl, struct port *port) +{ + uint8_t buffer[4096]; + struct spa_pod_builder b; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + add_param(impl, port, SPA_PARAM_EnumFormat, + spa_pod_builder_add_object(&b, + SPA_TYPE_OBJECT_Format, SPA_PARAM_EnumFormat, + SPA_FORMAT_mediaType, SPA_POD_Id(SPA_MEDIA_TYPE_video), + SPA_FORMAT_mediaSubtype, SPA_POD_Id(SPA_MEDIA_SUBTYPE_raw), + SPA_FORMAT_VIDEO_format, SPA_POD_Id(SPA_VIDEO_FORMAT_RGBA_F32), + SPA_FORMAT_VIDEO_size, SPA_POD_CHOICE_RANGE_Rectangle( + &SPA_RECTANGLE(320, 240), + &SPA_RECTANGLE(1,1), + &SPA_RECTANGLE(INT32_MAX, INT32_MAX)), + SPA_FORMAT_VIDEO_framerate, SPA_POD_CHOICE_RANGE_Fraction( + &SPA_FRACTION(25,1), + &SPA_FRACTION(0,1), + &SPA_FRACTION(INT32_MAX,1)))); +} + +static int update_params(struct filter *impl, struct port *port, + const struct spa_pod **params, uint32_t n_params) +{ + uint32_t i; + int res = 0; + + for (i = 0; i < n_params; i++) { + if (!spa_pod_is_object(params[i])) + continue; + clear_params(impl, port, SPA_POD_OBJECT_ID(params[i])); + } + for (i = 0; i < n_params; i++) { + if (add_param(impl, port, SPA_ID_INVALID, params[i]) == NULL) { + res = -errno; + break; + } + } + return res; +} + +SPA_EXPORT +void *pw_filter_add_port(struct pw_filter *filter, + enum pw_direction direction, + enum pw_filter_port_flags flags, + size_t port_data_size, + struct pw_properties *props, + const struct spa_pod **params, uint32_t n_params) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + struct port *p; + const char *str; + + if (props == NULL) + props = pw_properties_new(NULL, NULL); + if (props == NULL) + return NULL; + + if ((p = alloc_port(impl, direction, port_data_size)) == NULL) { + pw_properties_free(props); + return NULL; + } + + p->alloc_buffers = SPA_FLAG_IS_SET(flags, PW_FILTER_PORT_FLAG_ALLOC_BUFFERS); + p->props = props; + p->flags = flags; + + /* first configure default params */ + add_port_params(impl, p); + if ((str = pw_properties_get(props, PW_KEY_FORMAT_DSP)) != NULL) { + if (!strcmp(str, "32 bit float mono audio")) + add_audio_dsp_port_params(impl, p); + else if (!strcmp(str, "32 bit float RGBA video")) + add_video_dsp_port_params(impl, p); + } + /* then override with user provided if any */ + update_params(impl, p, params, n_params); + + p->change_mask_all = SPA_PORT_CHANGE_MASK_FLAGS | + SPA_PORT_CHANGE_MASK_PROPS; + p->info.change_mask = 0; + p->info.flags = 0; + if (p->alloc_buffers) + p->info.flags |= SPA_PORT_FLAG_CAN_ALLOC_BUFFERS; + p->info.props = &p->props->dict; + p->change_mask_all |= SPA_PORT_CHANGE_MASK_PARAMS; + p->params[0] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, SPA_PARAM_INFO_READ); + p->params[1] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0); + p->params[2] = SPA_PARAM_INFO(SPA_PARAM_IO, 0); + p->params[3] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE); + p->params[4] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0); + p->info.params = p->params; + p->info.n_params = 5; + + emit_port_info(impl, p, true); + + return p->user_data; +} + +SPA_EXPORT +int pw_filter_remove_port(void *port_data) +{ + struct port *port = SPA_CONTAINER_OF(port_data, struct port, user_data); + struct filter *impl = port->filter; + + spa_list_remove(&port->link); + impl->ports[port->direction][port->id] = NULL; + + clear_buffers(port); + clear_params(impl, port, SPA_ID_INVALID); + free(port); + + return 0; +} + +SPA_EXPORT +int pw_filter_update_params(struct pw_filter *filter, + void *port_data, + int res, + const struct spa_pod **params, + uint32_t n_params) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + struct port *port; + + pw_log_debug(NAME" %p: update params %d %d", filter, res, impl->pending_seq); + + port = port_data ? SPA_CONTAINER_OF(port_data, struct port, user_data) : NULL; + + if (res < 0) { + pw_proxy_error(filter->proxy, res, "params failed"); + filter_set_state(filter, PW_FILTER_STATE_ERROR, "params error"); + return 0; + } + + res = update_params(impl, port, params, n_params); + + impl->pending_seq = SPA_ID_INVALID; + + return res; +} + +SPA_EXPORT +int pw_filter_set_active(struct pw_filter *filter, bool active) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + pw_log_debug(NAME" %p: active:%d", filter, active); + if (impl->node) + pw_node_set_active(impl->node, active); + return 0; +} + +SPA_EXPORT +int pw_filter_get_time(struct pw_filter *filter, struct pw_time *time) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + uintptr_t seq1, seq2; + + do { + seq1 = SEQ_READ(impl->seq); + *time = impl->time; + seq2 = SEQ_READ(impl->seq); + } while (!SEQ_READ_SUCCESS(seq1, seq2)); + + pw_log_trace(NAME" %p: %"PRIi64" %"PRIi64" %"PRIu64" %d/%d ", filter, + time->now, time->delay, time->ticks, + time->rate.num, time->rate.denom); + + return 0; +} + +static int +do_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct filter *impl = user_data; + int res = impl_node_process(impl); + return spa_node_call_ready(&impl->callbacks, res); +} + +static inline int call_trigger(struct filter *impl) +{ + int res = 0; + if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_DRIVER)) { + res = pw_loop_invoke(impl->core->data_loop, + do_process, 1, NULL, 0, false, impl); + } + return res; +} + +SPA_EXPORT +struct pw_buffer *pw_filter_dequeue_buffer(void *port_data) +{ + struct port *p = SPA_CONTAINER_OF(port_data, struct port, user_data); + struct filter *impl = p->filter; + struct buffer *b; + int res; + + if ((b = pop_queue(p, &p->dequeued)) == NULL) { + res = -errno; + pw_log_trace(NAME" %p: no more buffers: %m", impl); + errno = -res; + return NULL; + } + pw_log_trace(NAME" %p: dequeue buffer %d", impl, b->id); + + return &b->this; +} + +SPA_EXPORT +int pw_filter_queue_buffer(void *port_data, struct pw_buffer *buffer) +{ + struct port *p = SPA_CONTAINER_OF(port_data, struct port, user_data); + struct filter *impl = p->filter; + struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this); + int res; + + pw_log_trace(NAME" %p: queue buffer %d", impl, b->id); + if ((res = push_queue(p, &p->queued, b)) < 0) + return res; + + return call_trigger(impl); +} + +SPA_EXPORT +void *pw_filter_get_dsp_buffer(void *port_data, uint32_t n_samples) +{ + struct port *p = SPA_CONTAINER_OF(port_data, struct port, user_data); + struct pw_buffer *buf; + struct spa_data *d; + + if ((buf = pw_filter_dequeue_buffer(port_data)) == NULL) + return empty; + + d = &buf->buffer->datas[0]; + + if (p->direction == SPA_DIRECTION_OUTPUT) { + d->chunk->offset = 0; + d->chunk->size = n_samples * sizeof(float); + d->chunk->stride = sizeof(float); + d->chunk->flags = 0; + } + + pw_filter_queue_buffer(port_data, buf); + + return d->data; +} + +static int +do_flush(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ +#if 0 + struct filter *impl = user_data; + struct buffer *b; + + pw_log_trace(NAME" %p: flush", impl); + do { + b = pop_queue(impl, &impl->queued); + if (b != NULL) + push_queue(impl, &impl->dequeued, b); + } + while (b); + + impl->time.queued = impl->queued.outcount = impl->dequeued.incount = + impl->dequeued.outcount = impl->queued.incount; + +#endif + return 0; +} +static int +do_drain(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct filter *impl = user_data; + impl->draining = true; + return 0; +} + +SPA_EXPORT +int pw_filter_flush(struct pw_filter *filter, bool drain) +{ + struct filter *impl = SPA_CONTAINER_OF(filter, struct filter, this); + pw_loop_invoke(impl->core->data_loop, + drain ? do_drain : do_flush, 1, NULL, 0, true, impl); + return 0; +} diff --git a/src/pipewire/filter.h b/src/pipewire/filter.h new file mode 100644 index 000000000..bfe6c0255 --- /dev/null +++ b/src/pipewire/filter.h @@ -0,0 +1,235 @@ +/* PipeWire + * + * Copyright © 2019 Wim Taymans + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ + +#ifndef PIPEWIRE_FILTER_H +#define PIPEWIRE_FILTER_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** \class pw_filter + * + * \brief PipeWire filter object class + * + * The filter object provides a convenient way to implement + * processing filters. + * + * See also \ref page_filters and \ref page_core_api + */ +struct pw_filter; + +#include +#include +#include + +#include + +/** \enum pw_filter_state The state of a filter \memberof pw_filter */ +enum pw_filter_state { + PW_FILTER_STATE_ERROR = -1, /**< the strean is in error */ + PW_FILTER_STATE_UNCONNECTED = 0, /**< unconnected */ + PW_FILTER_STATE_CONNECTING = 1, /**< connection is in progress */ + PW_FILTER_STATE_PAUSED = 2, /**< filter is connected and paused */ + PW_FILTER_STATE_STREAMING = 3 /**< filter is streaming */ +}; + +#if 0 +struct pw_buffer { + struct spa_buffer *buffer; /**< the spa buffer */ + void *user_data; /**< user data attached to the buffer */ + uint64_t size; /**< For input ports, this field is set by pw_filter + * with the duration of the buffer in ticks. + * For output ports, this field is set by the user. + * This field is added for all queued buffers and + * returned in the time info. */ +}; +#endif + +/** Events for a filter. These events are always called from the mainloop + * unless explicitly documented otherwise. */ +struct pw_filter_events { +#define PW_VERSION_FILTER_EVENTS 0 + uint32_t version; + + void (*destroy) (void *data); + /** when the filter state changes */ + void (*state_changed) (void *data, enum pw_filter_state old, + enum pw_filter_state state, const char *error); + + /** when io changed on a port of the filter (when port_data is NULL). */ + void (*io_changed) (void *data, void *port_data, + uint32_t id, void *area, uint32_t size); + /** when a parameter changed on a port of the filter (when port_data is NULL). */ + void (*param_changed) (void *data, void *port_data, + uint32_t id, const struct spa_pod *param); + + /** when a new buffer was created for a port */ + void (*add_buffer) (void *data, void *port_data, struct pw_buffer *buffer); + /** when a buffer was destroyed for a port */ + void (*remove_buffer) (void *data, void *port_data, struct pw_buffer *buffer); + + /** do processing. This is normally called from the + * mainloop but can also be called directly from the realtime data + * thread if the user is prepared to deal with this. */ + void (*process) (void *data, struct spa_io_position *position); + + /** The filter is drained */ + void (*drained) (void *data); +}; + +/** Convert a filter state to a readable string \memberof pw_filter */ +const char * pw_filter_state_as_string(enum pw_filter_state state); + +/** \enum pw_filter_flags Extra flags that can be used in \ref pw_filter_connect() \memberof pw_filter */ +enum pw_filter_flags { + PW_FILTER_FLAG_NONE = 0, /**< no flags */ + PW_FILTER_FLAG_INACTIVE = (1 << 0), /**< start the filter inactive, + * pw_filter_set_active() needs to be + * called explicitly */ + PW_FILTER_FLAG_DRIVER = (1 << 1), /**< be a driver */ + PW_FILTER_FLAG_RT_PROCESS = (1 << 2), /**< call process from the realtime + * thread */ +}; + +enum pw_filter_port_flags { + PW_FILTER_PORT_FLAG_NONE = 0, /**< no flags */ + PW_FILTER_PORT_FLAG_MAP_BUFFERS = (1 << 0), /**< mmap the buffers */ + PW_FILTER_PORT_FLAG_ALLOC_BUFFERS = (1 << 1), /**< the application will allocate buffer + * memory. In the add_buffer event, the + * data of the buffer should be set */ +}; + +/** Create a new unconneced \ref pw_filter \memberof pw_filter + * \return a newly allocated \ref pw_filter */ +struct pw_filter * +pw_filter_new(struct pw_remote *remote, /**< a \ref pw_remote */ + const char *name, /**< a filter media name */ + struct pw_properties *props /**< filter properties, ownership is taken */); + +struct pw_filter * +pw_filter_new_simple(struct pw_loop *loop, /**< a \ref pw_loop to use */ + const char *name, /**< a filter media name */ + struct pw_properties *props, /**< filter properties, ownership is taken */ + const struct pw_filter_events *events, /**< filter events */ + void *data /**< data passed to events */); + +/** Destroy a filter \memberof pw_filter */ +void pw_filter_destroy(struct pw_filter *filter); + +void pw_filter_add_listener(struct pw_filter *filter, + struct spa_hook *listener, + const struct pw_filter_events *events, + void *data); + +enum pw_filter_state pw_filter_get_state(struct pw_filter *filter, const char **error); + +const char *pw_stream_get_name(struct pw_stream *stream); + +struct pw_remote *pw_filter_get_remote(struct pw_filter *filter); + +/** Connect a filter for processing. \memberof pw_filter + * \return 0 on success < 0 on error. + * + * You should connect to the process event and use pw_filter_dequeue_buffer() + * to get the latest metadata and data. */ +int +pw_filter_connect(struct pw_filter *filter, /**< a \ref pw_filter */ + enum pw_filter_flags flags, /**< filter flags */ + const struct spa_pod **params, /**< an array with params. */ + uint32_t n_params /**< number of items in \a params */); + +/** Get the node ID of the filter. \memberof pw_filter + * \return node ID. */ +uint32_t +pw_filter_get_node_id(struct pw_filter *filter); + +/** Disconnect \a filter \memberof pw_filter */ +int pw_filter_disconnect(struct pw_filter *filter); + +/** add a port to the filter, returns user data of port_data_size. */ +void *pw_filter_add_port(struct pw_filter *filter, + enum pw_direction direction, /**< port direction */ + enum pw_filter_port_flags flags, /**< port flags */ + size_t port_data_size, /**< allocated and given to the user as port_data */ + struct pw_properties *props, /**< port properties, ownership is taken */ + const struct spa_pod **params, /**< an array of params. The params should + * ideally contain the supported formats */ + uint32_t n_params /**< number of elements in \a params */); + +/** remove a port from the filter */ +int pw_filter_remove_port(void *port_data /**< data associated with port */); + +/** get properties, port_data of NULL will give global properties */ +const struct pw_properties *pw_filter_get_properties(struct pw_filter *filter, + void *port_data); + +/** Update properties, use NULL port_data for global filter properties */ +int pw_filter_update_properties(struct pw_filter *filter, + void *port_data, const struct spa_dict *dict); + +/** Update params, use NULL port_data for global filter params */ +int +pw_filter_update_params(struct pw_filter *filter, /**< a \ref pw_filter */ + void *port_data, /**< data associated with port */ + int res, /**< a result code */ + const struct spa_pod **params, /**< an array of params. */ + uint32_t n_params /**< number of elements in \a params */); + + +#if 0 +/** A time structure \memberof pw_filter */ +struct pw_time { + int64_t now; /**< the monotonic time */ + struct spa_fraction rate; /**< the rate of \a ticks and delay */ + uint64_t ticks; /**< the ticks at \a now. This is the current time that + * the remote end is reading/writing. */ +}; +#endif + +/** Query the time on the filter \memberof pw_filter */ +int pw_filter_get_time(struct pw_filter *filter, struct pw_time *time); + +/** Get a buffer that can be filled for output ports or consumed + * for input ports. */ +struct pw_buffer *pw_filter_dequeue_buffer(void *port_data); + +/** Submit a buffer for playback or recycle a buffer for capture. */ +int pw_filter_queue_buffer(void *port_data, struct pw_buffer *buffer); + +/** Get a data pointer to the buffer data */ +void *pw_filter_get_dsp_buffer(void *port_data, uint32_t n_samples); + +/** Activate or deactivate the filter \memberof pw_filter */ +int pw_filter_set_active(struct pw_filter *filter, bool active); + +/** Flush a filter. When \a drain is true, the drained callback will + * be called when all data is played or recorded */ +int pw_filter_flush(struct pw_filter *filter, bool drain); + +#ifdef __cplusplus +} +#endif + +#endif /* PIPEWIRE_FILTER_H */ diff --git a/src/pipewire/meson.build b/src/pipewire/meson.build index 93203eeb7..f929796ac 100644 --- a/src/pipewire/meson.build +++ b/src/pipewire/meson.build @@ -6,6 +6,7 @@ pipewire_headers = [ 'core.h', 'data-loop.h', 'device.h', + 'filter.h', 'global.h', 'interfaces.h', 'introspect.h', @@ -41,6 +42,7 @@ pipewire_sources = [ 'core.c', 'data-loop.c', 'device.c', + 'filter.c', 'global.c', 'introspect.c', 'link.c', diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 90b398284..0ff5283e0 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -39,6 +39,7 @@ extern "C" { #include "pipewire/introspect.h" #include "pipewire/interfaces.h" #include "pipewire/stream.h" +#include "pipewire/filter.h" #include "pipewire/log.h" #include @@ -733,6 +734,7 @@ struct pw_remote { struct pw_client_proxy *client_proxy; /**< proxy for the client object */ struct spa_list stream_list; /**< list of \ref pw_stream objects */ + struct spa_list filter_list; /**< list of \ref pw_stream objects */ struct pw_protocol_client *conn; /**< the protocol client connection */ int recv_seq; /**< last received sequence number */ @@ -780,6 +782,40 @@ struct pw_stream { struct spa_list controls; }; +#define pw_filter_emit(s,m,v,...) spa_hook_list_call(&(s)->listener_list, struct pw_filter_events, m, v, ##__VA_ARGS__) +#define pw_filter_emit_destroy(s) pw_filter_emit(s, destroy, 0) +#define pw_filter_emit_state_changed(s,o,n,e) pw_filter_emit(s, state_changed,0,o,n,e) +#define pw_filter_emit_io_changed(s,p,i,d,t) pw_filter_emit(s, io_changed,0,p,i,d,t) +#define pw_filter_emit_param_changed(s,p,i,f) pw_filter_emit(s, param_changed,0,p,i,f) +#define pw_filter_emit_add_buffer(s,p,b) pw_filter_emit(s, add_buffer, 0, p, b) +#define pw_filter_emit_remove_buffer(s,p,b) pw_filter_emit(s, remove_buffer, 0, p, b) +#define pw_filter_emit_process(s,p) pw_filter_emit(s, process, 0, p) +#define pw_filter_emit_drained(s) pw_filter_emit(s, drained, 0) + + +struct pw_filter { + struct pw_remote *remote; /**< the owner remote */ + struct spa_list link; /**< link in the remote */ + + char *name; /**< the name of the filter */ + struct pw_properties *properties; /**< properties of the filter */ + + uint32_t node_id; /**< node id for remote node, available from + * CONFIGURE state and higher */ + enum pw_filter_state state; /**< filter state */ + char *error; /**< error reason when state is in error */ + + struct spa_hook_list listener_list; + + struct pw_proxy *proxy; + struct spa_hook proxy_listener; + + struct pw_node_proxy *node; + struct spa_hook node_listener; + + struct spa_list controls; +}; + #define pw_factory_emit(s,m,v,...) spa_hook_list_call(&s->listener_list, struct pw_factory_events, m, v, ##__VA_ARGS__) #define pw_factory_emit_destroy(s) pw_factory_emit(s, destroy, 0) diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index b705fe6fd..e75f80053 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -215,6 +215,7 @@ struct pw_remote *pw_remote_new(struct pw_core *core, pw_map_init(&this->objects, 64, 32); spa_list_init(&this->stream_list); + spa_list_init(&this->filter_list); spa_hook_list_init(&this->listener_list); @@ -272,6 +273,7 @@ void pw_remote_destroy(struct pw_remote *remote) { struct remote *impl = SPA_CONTAINER_OF(remote, struct remote, this); struct pw_stream *stream; + struct pw_filter *filter; pw_log_debug(NAME" %p: destroy", remote); pw_remote_emit_destroy(remote); @@ -281,6 +283,8 @@ void pw_remote_destroy(struct pw_remote *remote) spa_list_consume(stream, &remote->stream_list, link) pw_stream_destroy(stream); + spa_list_consume(filter, &remote->filter_list, link) + pw_filter_destroy(filter); pw_protocol_client_destroy(remote->conn);