From 0738f7fcf59609022ecef4f37f33c178c57a0c01 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 7 Jul 2017 17:55:26 +0200 Subject: [PATCH] Rework node and ports Rework the node and port API so that other implementations can be used than the spa_node. The plan is to morph this into the stream and context API. Rework the graph API a little so that init + add is separated. --- pipewire/modules/meson.build | 7 +- .../modules/module-client-node/client-node.c | 16 +- pipewire/modules/module-jack.c | 119 ++-- pipewire/modules/module-jack/defs.h | 7 + pipewire/modules/module-jack/server.h | 1 + pipewire/modules/module-jack/shared.h | 373 +++++++++++-- pipewire/modules/module-jack/synchro.h | 13 +- pipewire/modules/module-mixer.c | 4 +- pipewire/modules/spa/meson.build | 2 +- pipewire/modules/spa/module-monitor.c | 1 - pipewire/modules/spa/module-node.c | 68 +-- pipewire/modules/spa/spa-monitor.c | 3 +- pipewire/modules/spa/spa-node.c | 512 ++++++++++++++++-- pipewire/modules/spa/spa-node.h | 27 +- pipewire/server/client.c | 7 +- pipewire/server/core.c | 19 +- pipewire/server/link.c | 82 ++- pipewire/server/meson.build | 2 + pipewire/server/node.c | 472 +++++----------- pipewire/server/node.h | 45 +- pipewire/server/port.c | 339 +++++++----- pipewire/server/port.h | 67 ++- spa/include/spa/graph-scheduler1.h | 39 +- spa/include/spa/graph-scheduler3.h | 55 +- spa/include/spa/graph.h | 64 ++- spa/tests/test-graph.c | 32 +- spa/tests/test-mixer.c | 46 +- spa/tests/test-perf.c | 20 +- 28 files changed, 1585 insertions(+), 857 deletions(-) diff --git a/pipewire/modules/meson.build b/pipewire/modules/meson.build index 675bdae6c..5682ba78b 100644 --- a/pipewire/modules/meson.build +++ b/pipewire/modules/meson.build @@ -23,7 +23,8 @@ pipewire_module_autolink = shared_library('pipewire-module-autolink', [ 'module- dependencies : [mathlib, dl_lib, pipewire_dep, pipewirecore_dep], ) -pipewire_module_mixer = shared_library('pipewire-module-mixer', [ 'module-mixer.c' ], +pipewire_module_mixer = shared_library('pipewire-module-mixer', + [ 'module-mixer.c', 'spa/spa-node.c' ], c_args : pipewire_module_c_args, include_directories : [configinc, spa_inc], link_with : spalib, @@ -36,6 +37,7 @@ pipewire_module_client_node = shared_library('pipewire-module-client-node', [ 'module-client-node.c', 'module-client-node/client-node.c', 'module-client-node/protocol-native.c', + 'spa/spa-node.c', 'extension-client-node.c', ], c_args : pipewire_module_c_args, include_directories : [configinc, spa_inc], @@ -69,7 +71,8 @@ pipewire_module_protocol_native = shared_library('pipewire-module-protocol-nativ if jack_dep.found() pipewire_module_jack = shared_library('pipewire-module-jack', [ 'module-jack.c', - 'module-jack/shm.c' ], + 'module-jack/shm.c', + 'module-jack/jack-node.c' ], c_args : pipewire_module_c_args, include_directories : [configinc, spa_inc], link_with : spalib, diff --git a/pipewire/modules/module-client-node/client-node.c b/pipewire/modules/module-client-node/client-node.c index 107f001a2..a50da5fba 100644 --- a/pipewire/modules/module-client-node/client-node.c +++ b/pipewire/modules/module-client-node/client-node.c @@ -37,6 +37,7 @@ #include "pipewire/client/transport.h" #include "pipewire/server/core.h" +#include "pipewire/modules/spa/spa-node.h" #include "client-node.h" /** \cond */ @@ -89,7 +90,6 @@ struct proxy { struct spa_type_map *map; struct spa_log *log; - struct spa_loop *main_loop; struct spa_loop *data_loop; const struct spa_node_callbacks *callbacks; @@ -1018,14 +1018,9 @@ proxy_init(struct proxy *this, for (i = 0; i < n_support; i++) { if (strcmp(support[i].type, SPA_TYPE__Log) == 0) this->log = support[i].data; - else if (strcmp(support[i].type, SPA_TYPE_LOOP__MainLoop) == 0) - this->main_loop = support[i].data; else if (strcmp(support[i].type, SPA_TYPE_LOOP__DataLoop) == 0) this->data_loop = support[i].data; } - if (this->main_loop == NULL) { - spa_log_error(this->log, "a main-loop is needed"); - } if (this->data_loop == NULL) { spa_log_error(this->log, "a data-loop is needed"); } @@ -1171,8 +1166,13 @@ struct pw_client_node *pw_client_node_new(struct pw_client *client, if (this->resource == NULL) goto error_no_resource; - this->node = pw_node_new(client->core, - this->resource, name, true, &impl->proxy.node, NULL, properties); + this->node = pw_spa_node_new(client->core, + this->resource, + name, + true, + &impl->proxy.node, + NULL, + properties); if (this->node == NULL) goto error_no_node; diff --git a/pipewire/modules/module-jack.c b/pipewire/modules/module-jack.c index 4f8bf0a63..2d4c648e8 100644 --- a/pipewire/modules/module-jack.c +++ b/pipewire/modules/module-jack.c @@ -61,7 +61,7 @@ #define LOCK_SUFFIX ".lock" #define LOCK_SUFFIXLEN 5 -static int segment_num = 0; +int segment_num = 0; typedef bool(*demarshal_func_t) (void *object, void *data, size_t size); @@ -223,8 +223,6 @@ handle_client_open(struct client *client) char name[JACK_CLIENT_NAME_SIZE+1]; int result, ref_num, shared_engine, shared_client, shared_graph; struct jack_client *jc; - size_t size; - jack_shm_info_t info; CheckSize(kClientOpen_size); CheckRead(&PID, sizeof(int)); @@ -241,26 +239,22 @@ handle_client_open(struct client *client) jc->owner = client; jc->ref_num = ref_num; - if (jack_synchro_alloc(&server->synchro_table[ref_num], - name, - server->engine_control->server_name, - 0, - false, - server->promiscuous) < 0) { + if (jack_synchro_init(&server->synchro_table[ref_num], + name, + server->engine_control->server_name, + 0, + false, + server->promiscuous) < 0) { result = -1; goto reply; } - size = sizeof(struct jack_client_control); - - if (jack_shm_alloc(size, &info, segment_num++) < 0) { + jc->control = jack_client_control_alloc(name, client->client->ucred.pid, ref_num, -1); + if (jc->control == NULL) { result = -1; goto reply; } - jc->control = (struct jack_client_control *)jack_shm_addr(&info); - jc->control->info = info; - server->client_table[ref_num] = jc; result = 0; @@ -481,6 +475,78 @@ static struct client *client_new(struct impl *impl, int fd) return NULL; } +static int +make_int_client(struct impl *impl, struct pw_node *node) +{ + struct jack_server *server = &impl->server; + struct jack_connection_manager *conn; + int ref_num; + struct jack_client *jc; + jack_port_id_t port_id; + + ref_num = jack_server_allocate_ref_num(server); + if (ref_num == -1) + return -1; + + if (jack_synchro_init(&server->synchro_table[ref_num], + "system", + server->engine_control->server_name, + 0, + false, + server->promiscuous) < 0) { + return -1; + } + + jc = calloc(1,sizeof(struct jack_client)); + jc->ref_num = ref_num; + jc->node = node; + jc->control = jack_client_control_alloc("system", -1, ref_num, -1); + jc->control->active = true; + + server->client_table[ref_num] = jc; + + impl->server.engine_control->driver_num++; + + conn = jack_graph_manager_next_start(server->graph_manager); + + jack_connection_manager_init_ref_num(conn, ref_num); + + port_id = jack_graph_manager_allocate_port(server->graph_manager, + ref_num, "system:playback_1", 2, + JackPortIsInput | JackPortIsPhysical | JackPortIsTerminal); + + jack_connection_manager_add_port(conn, true, ref_num, port_id); + + jack_graph_manager_next_stop(server->graph_manager); + + return 0; +} + +static bool init_nodes(struct impl *impl) +{ + struct pw_core *core = impl->core; + struct pw_node *n; + + spa_list_for_each(n, &core->node_list, link) { + const char *str; + + if (n->global == NULL) + continue; + + if (n->properties == NULL) + continue; + + if ((str = pw_properties_get(n->properties, "media.class")) == NULL) + continue; + + if (strcmp(str, "Audio/Sink") != 0) + continue; + + make_int_client(impl, n); + } + return true; +} + static struct socket *create_socket(void) { struct socket *s; @@ -591,8 +657,6 @@ static bool add_socket(struct impl *impl, struct socket *s) static int init_server(struct impl *impl, const char *name, bool promiscuous) { struct jack_server *server = &impl->server; - jack_shm_info_t info; - size_t size; int i; struct socket *s; @@ -604,28 +668,17 @@ static int init_server(struct impl *impl, const char *name, bool promiscuous) jack_cleanup_shm(); /* graph manager */ - size = sizeof(struct jack_graph_manager) + 2048 * sizeof(struct jack_port); - - if (jack_shm_alloc(size, &info, segment_num++) < 0) - return -1; - - server->graph_manager = (struct jack_graph_manager *)jack_shm_addr(&info); - server->graph_manager->info = info; + server->graph_manager = jack_graph_manager_alloc(2048); /* engine control */ - size = sizeof(struct jack_engine_control); - - if (jack_shm_alloc(size, &info, segment_num++) < 0) - return -1; - - server->engine_control = (struct jack_engine_control *)jack_shm_addr(&info); - server->engine_control->info = info; - - strcpy(server->engine_control->server_name, name); + server->engine_control = jack_engine_control_alloc(name); for (i = 0; i < CLIENT_NUM; i++) server->synchro_table[i] = JACK_SYNCHRO_INIT; + if (!init_nodes(impl)) + return -1; + s = create_socket(); if (!init_socket_name(s, name, promiscuous, 0)) diff --git a/pipewire/modules/module-jack/defs.h b/pipewire/modules/module-jack/defs.h index ed42e0555..5275d191b 100644 --- a/pipewire/modules/module-jack/defs.h +++ b/pipewire/modules/module-jack/defs.h @@ -42,6 +42,7 @@ #define CLIENT_NUM 256 #define JACK_ENGINE_ROLLING_COUNT 32 +#define JACK_ENGINE_ROLLING_INTERVAL 1024 #define TIME_POINTS 100000 #define FAILURE_TIME_POINTS 10000 @@ -55,6 +56,12 @@ #define JACK_SESSION_COMMAND_SIZE 256 +#define NO_PORT 0xFFFE +#define EMPTY 0xFFFD +#define FREE 0xFFFC + + + typedef enum { JACK_TIMER_SYSTEM_CLOCK, JACK_TIMER_HPET, diff --git a/pipewire/modules/module-jack/server.h b/pipewire/modules/module-jack/server.h index 16f83ba4c..c5f6c0c9d 100644 --- a/pipewire/modules/module-jack/server.h +++ b/pipewire/modules/module-jack/server.h @@ -21,6 +21,7 @@ struct jack_client { int ref_num; struct client *owner; struct jack_client_control *control; + struct pw_node *node; }; struct jack_server { diff --git a/pipewire/modules/module-jack/shared.h b/pipewire/modules/module-jack/shared.h index 0883a2a90..2d4cfaf83 100644 --- a/pipewire/modules/module-jack/shared.h +++ b/pipewire/modules/module-jack/shared.h @@ -17,6 +17,30 @@ * Boston, MA 02110-1301, USA. */ +#include + +extern int segment_num; + +static inline int jack_shm_alloc(size_t size, jack_shm_info_t *info, int num) +{ + char name[64]; + + snprintf(name, sizeof(name), "/jack_shared%d", num); + + if (jack_shmalloc(name, size, info)) { + pw_log_error("Cannot create shared memory segment of size = %zd (%s)", size, strerror(errno)); + return -1; + } + + if (jack_attach_shm(info)) { + jack_error("Cannot attach shared memory segment name = %s err = %s", name, strerror(errno)); + jack_destroy_shm(info); + return -1; + } + info->size = size; + return 0; +} + typedef uint16_t jack_int_t; // Internal type for ports and refnum typedef enum { @@ -56,41 +80,107 @@ struct jack_port { jack_default_audio_sample_t buffer[BUFFER_SIZE_MAX + 8]; } POST_PACKED_STRUCTURE; -#define MAKE_FIXED_ARRAY(size) \ -PRE_PACKED_STRUCTURE \ -struct { \ - jack_int_t table[size]; \ - uint32_t counter; \ +static inline void jack_port_init(struct jack_port *port, int ref_num, + const char* port_name, int type_id, enum JackPortFlags flags) +{ + port->type_id = type_id; + port->flags = flags; + strcpy(port->name, port_name); + port->alias1[0] = '\0'; + port->alias2[0] = '\0'; + port->ref_num = ref_num; + port->latency = 0; + port->total_latency = 0; + port->playback_latency.min = port->playback_latency.max = 0; + port->capture_latency.min = port->capture_latency.max = 0; + port->monitor_requests = 0; + port->in_use = true; + port->tied = NO_PORT; +} + +#define MAKE_FIXED_ARRAY(size) \ +PRE_PACKED_STRUCTURE \ +struct { \ + jack_int_t table[size]; \ + uint32_t counter; \ } POST_PACKED_STRUCTURE -#define MAKE_FIXED_ARRAY1(size) \ -PRE_PACKED_STRUCTURE \ -struct { \ - MAKE_FIXED_ARRAY(size) array; \ - bool used; \ +#define INIT_FIXED_ARRAY(arr) ({ \ + int i; \ + for (i = 0; i < SPA_N_ELEMENTS(arr.table); i++) \ + arr.table[i] = EMPTY; \ + arr.counter = 0; \ +}) + +#define ADD_FIXED_ARRAY(arr,item) ({ \ + int i,ret = -1; \ + for (i = 0; i < SPA_N_ELEMENTS(arr.table); i++) { \ + if (arr.table[i] == EMPTY) { \ + arr.table[i] = item; \ + arr.counter++; \ + ret = 0; \ + break; \ + } \ + } \ + ret; \ +}) + +#define MAKE_FIXED_ARRAY1(size) \ +PRE_PACKED_STRUCTURE \ +struct { \ + MAKE_FIXED_ARRAY(size) array; \ + bool used; \ } POST_PACKED_STRUCTURE -#define MAKE_FIXED_MATRIX(size) \ -PRE_PACKED_STRUCTURE \ -struct { \ - jack_int_t table[size][size]; \ +#define INIT_FIXED_ARRAY1(arr) ({ \ + INIT_FIXED_ARRAY(arr.array); \ + arr.used = false; \ +}) + +#define ADD_FIXED_ARRAY1(arr,item) ADD_FIXED_ARRAY(arr.array,item) + +#define MAKE_FIXED_MATRIX(size) \ +PRE_PACKED_STRUCTURE \ +struct { \ + jack_int_t table[size][size]; \ } POST_PACKED_STRUCTURE +#define INIT_FIXED_MATRIX(mat,idx) ({ \ + int i; \ + for (i = 0; i < SPA_N_ELEMENTS(mat.table[0]); i++){ \ + mat.table[idx][i] = 0; \ + mat.table[i][idx] = 0; \ + } \ +}) + PRE_PACKED_STRUCTURE struct jack_activation_count { int32_t value; int32_t count; } POST_PACKED_STRUCTURE; -#define MAKE_LOOP_FEEDBACK(size) \ -PRE_PACKED_STRUCTURE \ -struct { \ - int table[size][3]; \ +static inline void jack_activation_count_set_value(struct jack_activation_count *cnt, int32_t val) { + cnt->value = val; +} + +#define MAKE_LOOP_FEEDBACK(size) \ +PRE_PACKED_STRUCTURE \ +struct { \ + int table[size][3]; \ } POST_PACKED_STRUCTURE +#define INIT_LOOP_FEEDBACK(arr,size) ({ \ + int i; \ + for (i = 0; i < size; i++) { \ + arr.table[i][0] = EMPTY; \ + arr.table[i][1] = EMPTY; \ + arr.table[i][2] = 0; \ + } \ +}) + PRE_PACKED_STRUCTURE struct jack_connection_manager { - MAKE_FIXED_ARRAY(CONNECTION_NUM_FOR_PORT) connections[PORT_NUM_MAX]; + MAKE_FIXED_ARRAY(CONNECTION_NUM_FOR_PORT) connection[PORT_NUM_MAX]; MAKE_FIXED_ARRAY1(PORT_NUM_FOR_CLIENT) input_port[CLIENT_NUM]; MAKE_FIXED_ARRAY(PORT_NUM_FOR_CLIENT) output_port[CLIENT_NUM]; MAKE_FIXED_MATRIX(CLIENT_NUM) connection_ref; @@ -98,6 +188,38 @@ struct jack_connection_manager { MAKE_LOOP_FEEDBACK(CONNECTION_NUM_FOR_PORT) loop_feedback; } POST_PACKED_STRUCTURE; +static inline void +jack_connection_manager_init_ref_num(struct jack_connection_manager *conn, int ref_num) +{ + INIT_FIXED_ARRAY1(conn->input_port[ref_num]); + INIT_FIXED_ARRAY(conn->output_port[ref_num]); + INIT_FIXED_MATRIX(conn->connection_ref, ref_num); + jack_activation_count_set_value (&conn->input_counter[ref_num], 0); +} + +static inline void +jack_connection_manager_init(struct jack_connection_manager *conn) +{ + int i; + for (i = 0; i < PORT_NUM_MAX; i++) + INIT_FIXED_ARRAY(conn->connection[i]); + + INIT_LOOP_FEEDBACK(conn->loop_feedback, CONNECTION_NUM_FOR_PORT); + + for (i = 0; i < CLIENT_NUM; i++) + jack_connection_manager_init_ref_num(conn, i); +} + +static inline int +jack_connection_manager_add_port(struct jack_connection_manager *conn, bool input, + int ref_num, jack_port_id_t port_id) +{ + if (input) + return ADD_FIXED_ARRAY1(conn->input_port[ref_num], port_id); + else + return ADD_FIXED_ARRAY(conn->output_port[ref_num], port_id); +} + PRE_PACKED_STRUCTURE struct jack_atomic_counter { union { @@ -109,6 +231,13 @@ struct jack_atomic_counter { } info; } POST_PACKED_STRUCTURE; +#define Counter(e) (e).info.long_val +#define CurIndex(e) (e).info.scounter.short_val1 +#define NextIndex(e) (e).info.scounter.short_val2 + +#define CurArrayIndex(e) (CurIndex(e) & 0x0001) +#define NextArrayIndex(e) ((CurIndex(e) + 1) & 0x0001) + #define MAKE_ATOMIC_STATE(type) \ PRE_PACKED_STRUCTURE \ struct { \ @@ -143,6 +272,104 @@ struct jack_graph_manager { struct jack_port port_array[0]; } POST_PACKED_STRUCTURE; +static inline struct jack_graph_manager * +jack_graph_manager_alloc(int port_max) +{ + struct jack_graph_manager *mgr; + jack_shm_info_t info; + size_t i, size; + + size = sizeof(struct jack_graph_manager) + port_max * sizeof(struct jack_port); + + if (jack_shm_alloc(size, &info, segment_num++) < 0) + return NULL; + + mgr = (struct jack_graph_manager *)jack_shm_addr(&info); + mgr->info = info; + + jack_connection_manager_init(&mgr->state.state[0]); + jack_connection_manager_init(&mgr->state.state[1]); + mgr->port_max = port_max; + + for (i = 0; i < port_max; i++) { + mgr->port_array[i].in_use = false; + mgr->port_array[i].ref_num = -1; + } + return mgr; +} + +static inline jack_port_id_t +jack_graph_manager_allocate_port(struct jack_graph_manager *mgr, + int ref_num, const char* port_name, int type_id, + enum JackPortFlags flags) +{ + int i; + for (i = 1; i < mgr->port_max; i++) { + if (!mgr->port_array[i].in_use) { + jack_port_init(&mgr->port_array[i], ref_num, port_name, type_id, flags); + return i; + } + } + return NO_PORT; +} + + +static inline struct jack_connection_manager * +jack_graph_manager_next_start(struct jack_graph_manager *manager) +{ + uint32_t next_index; + + if (manager->state.call_write_counter++ == 0) { + struct jack_atomic_counter old_val; + struct jack_atomic_counter new_val; + uint32_t cur_index; + bool need_copy; + do { + old_val = manager->state.counter; + new_val = old_val; + cur_index = CurArrayIndex(new_val); + next_index = NextArrayIndex(new_val); + need_copy = (CurIndex(new_val) == NextIndex(new_val)); + NextIndex(new_val) = CurIndex(new_val); // Invalidate next index + } + while (!__atomic_compare_exchange_n((uint32_t*)&manager->state.counter, + (uint32_t*)&Counter(old_val), + Counter(new_val), + false, + __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST)); + + if (need_copy) + memcpy(&manager->state.state[next_index], + &manager->state.state[cur_index], + sizeof(struct jack_connection_manager)); + } + else { + next_index = NextArrayIndex(manager->state.counter); + } + return &manager->state.state[next_index]; +} + +static inline void +jack_graph_manager_next_stop(struct jack_graph_manager *manager) +{ + if (--manager->state.call_write_counter == 0) { + struct jack_atomic_counter old_val; + struct jack_atomic_counter new_val; + do { + old_val = manager->state.counter; + new_val = old_val; + NextIndex(new_val)++; // Set next index + } + while (!__atomic_compare_exchange_n((uint32_t*)&manager->state.counter, + (uint32_t*)&Counter(old_val), + Counter(new_val), + false, + __ATOMIC_SEQ_CST, + __ATOMIC_SEQ_CST)); + } +} + typedef enum { TransportCommandNone = 0, TransportCommandStart = 1, @@ -223,7 +450,7 @@ struct jack_engine_control { jack_shm_info_t info; jack_nframes_t buffer_size; jack_nframes_t sample_rate; - bool sync_node; + bool sync_mode; bool temporary; jack_time_t period_usecs; jack_time_t timeout_usecs; @@ -241,7 +468,6 @@ struct jack_engine_control { int driver_num; bool verbose; - // CPU Load jack_time_t prev_cycle_time; jack_time_t cur_cycle_time; jack_time_t spare_usecs; @@ -252,12 +478,10 @@ struct jack_engine_control { int rolling_interval; float CPU_load; - // For OSX thread uint64_t period; uint64_t computation; uint64_t constraint; - // Timer struct jack_frame_timer frame_timer; #ifdef JACK_MONITOR @@ -265,6 +489,63 @@ struct jack_engine_control { #endif } POST_PACKED_STRUCTURE; +static inline void +jack_engine_control_reset_rolling_usecs(struct jack_engine_control *ctrl) +{ + memset(ctrl->rolling_client_usecs, 0, sizeof(ctrl->rolling_client_usecs)); + ctrl->rolling_client_usecs_index = 0; + ctrl->rolling_client_usecs_cnt = 0; + ctrl->spare_usecs = 0; + ctrl->rolling_interval = floor((JACK_ENGINE_ROLLING_INTERVAL * 1000.f) / ctrl->period_usecs); +} + +static inline struct jack_engine_control * +jack_engine_control_alloc(const char* name) +{ + struct jack_engine_control *ctrl; + jack_shm_info_t info; + size_t size; + + size = sizeof(struct jack_engine_control); + if (jack_shm_alloc(size, &info, segment_num++) < 0) + return NULL; + + ctrl = (struct jack_engine_control *)jack_shm_addr(&info); + ctrl->info = info; + + ctrl->buffer_size = 512; + ctrl->sample_rate = 48000; + ctrl->sync_mode = false; + ctrl->temporary = false; + ctrl->period_usecs = 1000000.f / ctrl->sample_rate * ctrl->buffer_size; + ctrl->timeout_usecs = 0; + ctrl->max_delayed_usecs = 0.f; + ctrl->xrun_delayed_usecs = 0.f; + ctrl->timeout = false; + ctrl->real_time = true; + ctrl->saved_real_time = false; + ctrl->server_priority = 20; + ctrl->client_priority = 15; + ctrl->max_client_priority = 19; + strcpy(ctrl->server_name, name); + ctrl->clock_source = 0; + ctrl->driver_num = 0; + ctrl->verbose = true; + + ctrl->prev_cycle_time = 0; + ctrl->cur_cycle_time = 0; + ctrl->spare_usecs = 0; + ctrl->max_usecs = 0; + jack_engine_control_reset_rolling_usecs(ctrl); + ctrl->CPU_load = 0.f; + + ctrl->period = 0; + ctrl->computation = 0; + ctrl->constraint = 0; + + return ctrl; +} + PRE_PACKED_STRUCTURE struct jack_client_control { jack_shm_info_t info; @@ -282,23 +563,39 @@ struct jack_client_control { jack_session_flags_t session_flags; } POST_PACKED_STRUCTURE; - -static inline int jack_shm_alloc(size_t size, jack_shm_info_t *info, int num) +static inline struct jack_client_control * +jack_client_control_alloc(const char* name, int pid, int ref_num, int uuid) { - char name[64]; + struct jack_client_control *ctrl; + jack_shm_info_t info; + size_t size; - snprintf(name, sizeof(name), "/jack_shared%d", num); + size = sizeof(struct jack_client_control); + if (jack_shm_alloc(size, &info, segment_num++) < 0) + return NULL; - if (jack_shmalloc(name, size, info)) { - pw_log_error("Cannot create shared memory segment of size = %zd (%s)", size, strerror(errno)); - return -1; - } + ctrl = (struct jack_client_control *)jack_shm_addr(&info); + ctrl->info = info; - if (jack_attach_shm(info)) { - jack_error("Cannot attach shared memory segment name = %s err = %s", name, strerror(errno)); - jack_destroy_shm(info); - return -1; - } - info->size = size; - return 0; + strcpy(ctrl->name, name); + for (int i = 0; i < jack_notify_max; i++) + ctrl->callback[i] = false; + + // Always activated + ctrl->callback[jack_notify_AddClient] = true; + ctrl->callback[jack_notify_RemoveClient] = true; + ctrl->callback[jack_notify_ActivateClient] = true; + ctrl->callback[jack_notify_LatencyCallback] = true; + // So that driver synchro are correctly setup in "flush" or "normal" mode + ctrl->callback[jack_notify_StartFreewheelCallback] = true; + ctrl->callback[jack_notify_StopFreewheelCallback] = true; + ctrl->ref_num = ref_num; + ctrl->PID = pid; + ctrl->transport_state = JackTransportStopped; + ctrl->transport_sync = false; + ctrl->transport_timebase = false; + ctrl->active = false; + ctrl->session_ID = uuid; + + return ctrl; } diff --git a/pipewire/modules/module-jack/synchro.h b/pipewire/modules/module-jack/synchro.h index c0563f26e..e58a368de 100644 --- a/pipewire/modules/module-jack/synchro.h +++ b/pipewire/modules/module-jack/synchro.h @@ -26,11 +26,11 @@ struct jack_synchro { #define JACK_SYNCHRO_INIT (struct jack_synchro) { { 0, }, false, NULL } static inline int -jack_synchro_alloc(struct jack_synchro *synchro, - const char *client_name, - const char *server_name, - int value, bool internal, - bool promiscuous) +jack_synchro_init(struct jack_synchro *synchro, + const char *client_name, + const char *server_name, + int value, bool internal, + bool promiscuous) { if (promiscuous) snprintf(synchro->name, sizeof(synchro->name), @@ -39,9 +39,10 @@ jack_synchro_alloc(struct jack_synchro *synchro, snprintf(synchro->name, sizeof(synchro->name), "jack_sem.%d_%s_%s", getuid(), server_name, client_name); + synchro->flush = false; if ((synchro->semaphore = sem_open(synchro->name, O_CREAT | O_RDWR, 0777, value)) == (sem_t*)SEM_FAILED) { pw_log_error("can't check in named semaphore name = %s err = %s", synchro->name, strerror(errno)); return -1; } - return true; + return 0; } diff --git a/pipewire/modules/module-mixer.c b/pipewire/modules/module-mixer.c index cf010f374..12b9b43db 100644 --- a/pipewire/modules/module-mixer.c +++ b/pipewire/modules/module-mixer.c @@ -26,6 +26,7 @@ #include "pipewire/server/core.h" #include "pipewire/server/module.h" +#include "pipewire/modules/spa/spa-node.h" #define AUDIOMIXER_LIB "audiomixer/libspa-audiomixer" @@ -108,7 +109,8 @@ static struct pw_node *make_node(struct impl *impl) } spa_clock = iface; - node = pw_node_new(impl->core, NULL, "audiomixer", false, spa_node, spa_clock, NULL); + node = pw_spa_node_new(impl->core, NULL, "audiomixer", false, spa_node, spa_clock, NULL); + return node; interface_failed: diff --git a/pipewire/modules/spa/meson.build b/pipewire/modules/spa/meson.build index bdada4b23..27a062eb3 100644 --- a/pipewire/modules/spa/meson.build +++ b/pipewire/modules/spa/meson.build @@ -4,7 +4,7 @@ pipewire_module_spa_c_args = [ ] pipewire_module_spa_monitor = shared_library('pipewire-module-spa-monitor', - [ 'module-monitor.c', 'spa-monitor.c' ], + [ 'module-monitor.c', 'spa-monitor.c', 'spa-node.c' ], c_args : pipewire_module_spa_c_args, include_directories : [configinc, spa_inc], link_with : spalib, diff --git a/pipewire/modules/spa/module-monitor.c b/pipewire/modules/spa/module-monitor.c index b1a5a4810..47df5ab9e 100644 --- a/pipewire/modules/spa/module-monitor.c +++ b/pipewire/modules/spa/module-monitor.c @@ -32,7 +32,6 @@ #include #include "spa-monitor.h" -#include "spa-node.h" bool pipewire__module_init(struct pw_module *module, const char *args) { diff --git a/pipewire/modules/spa/module-node.c b/pipewire/modules/spa/module-node.c index 0113060c8..0ef3054cb 100644 --- a/pipewire/modules/spa/module-node.c +++ b/pipewire/modules/spa/module-node.c @@ -34,71 +34,8 @@ #include "spa-monitor.h" #include "spa-node.h" -static int -setup_props(struct pw_core *core, struct spa_node *spa_node, struct pw_properties *pw_props) -{ - int res; - struct spa_props *props; - void *state = NULL; - const char *key; - - if ((res = spa_node_get_props(spa_node, &props)) != SPA_RESULT_OK) { - pw_log_debug("spa_node_get_props failed: %d", res); - return SPA_RESULT_ERROR; - } - - while ((key = pw_properties_iterate(pw_props, &state))) { - struct spa_pod_prop *prop; - uint32_t id; - - if (!spa_type_is_a(key, SPA_TYPE_PROPS_BASE)) - continue; - - id = spa_type_map_get_id(core->type.map, key); - if (id == SPA_ID_INVALID) - continue; - - if ((prop = spa_pod_object_find_prop(&props->object, id))) { - const char *value = pw_properties_get(pw_props, key); - - pw_log_info("configure prop %s", key); - - switch(prop->body.value.type) { - case SPA_POD_TYPE_ID: - SPA_POD_VALUE(struct spa_pod_id, &prop->body.value) = - spa_type_map_get_id(core->type.map, value); - break; - case SPA_POD_TYPE_INT: - SPA_POD_VALUE(struct spa_pod_int, &prop->body.value) = atoi(value); - break; - case SPA_POD_TYPE_LONG: - SPA_POD_VALUE(struct spa_pod_long, &prop->body.value) = atoi(value); - break; - case SPA_POD_TYPE_FLOAT: - SPA_POD_VALUE(struct spa_pod_float, &prop->body.value) = atof(value); - break; - case SPA_POD_TYPE_DOUBLE: - SPA_POD_VALUE(struct spa_pod_double, &prop->body.value) = atof(value); - break; - case SPA_POD_TYPE_STRING: - break; - default: - break; - } - } - } - - if ((res = spa_node_set_props(spa_node, props)) != SPA_RESULT_OK) { - pw_log_debug("spa_node_set_props failed: %d", res); - return SPA_RESULT_ERROR; - } - - return SPA_RESULT_OK; -} - bool pipewire__module_init(struct pw_module *module, const char *args) { - const char *dir; struct pw_properties *props = NULL; char **argv; int i, n_tokens; @@ -110,9 +47,6 @@ bool pipewire__module_init(struct pw_module *module, const char *args) if (n_tokens < 3) goto not_enough_arguments; - if ((dir = getenv("SPA_PLUGIN_DIR")) == NULL) - dir = PLUGINDIR; - props = pw_properties_new(NULL, NULL); for (i = 3; i < n_tokens; i++) { @@ -126,7 +60,7 @@ bool pipewire__module_init(struct pw_module *module, const char *args) pw_free_strv(prop); } - pw_spa_node_load(module->core, dir, argv[0], argv[1], argv[2], props, setup_props); + pw_spa_node_load(module->core, NULL, argv[0], argv[1], argv[2], props); pw_free_strv(argv); diff --git a/pipewire/modules/spa/spa-monitor.c b/pipewire/modules/spa/spa-monitor.c index a285206af..ff1ac1a67 100644 --- a/pipewire/modules/spa/spa-monitor.c +++ b/pipewire/modules/spa/spa-monitor.c @@ -33,6 +33,7 @@ #include #include "spa-monitor.h" +#include "spa-node.h" struct monitor_item { char *id; @@ -107,7 +108,7 @@ static void add_item(struct pw_spa_monitor *this, struct spa_monitor_item *item) mitem = calloc(1, sizeof(struct monitor_item)); mitem->id = strdup(id); - mitem->node = pw_node_new(impl->core, NULL, name, false, node_iface, clock_iface, props); + mitem->node = pw_spa_node_new(impl->core, NULL, name, false, node_iface, clock_iface, props); spa_list_insert(impl->item_list.prev, &mitem->link); } diff --git a/pipewire/modules/spa/spa-node.c b/pipewire/modules/spa/spa-node.c index f1b647180..d03f6ac4f 100644 --- a/pipewire/modules/spa/spa-node.c +++ b/pipewire/modules/spa/spa-node.c @@ -17,6 +17,12 @@ * Boston, MA 02110-1301, USA. */ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include + #include #include #include @@ -26,20 +32,466 @@ #include "spa-node.h" struct impl { - struct pw_spa_node this; - struct pw_core *core; + struct pw_node *this; + + bool async_init; void *hnd; + struct spa_handle *handle; + struct spa_node *node; /**< handle to SPA node */ + char *lib; + char *factory_name; }; -struct pw_spa_node *pw_spa_node_load(struct pw_core *core, - const char *dir, - const char *lib, - const char *factory_name, - const char *name, - struct pw_properties *properties, setup_node_t setup_func) +struct port { + struct pw_port *port; + + struct spa_node *node; +}; + + +static int port_impl_enum_formats(struct pw_port *port, + struct spa_format **format, + const struct spa_format *filter, + int32_t index) { - struct pw_spa_node *this; + struct port *p = port->user_data; + return spa_node_port_enum_formats(p->node, port->direction, port->port_id, format, filter, index); +} + +static int port_impl_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) +{ + struct port *p = port->user_data; + return spa_node_port_set_format(p->node, port->direction, port->port_id, flags, format); +} + +static int port_impl_get_format(struct pw_port *port, const struct spa_format **format) +{ + struct port *p = port->user_data; + return spa_node_port_get_format(p->node, port->direction, port->port_id, format); +} + +static int port_impl_get_info(struct pw_port *port, const struct spa_port_info **info) +{ + struct port *p = port->user_data; + return spa_node_port_get_info(p->node, port->direction, port->port_id, info); +} + +static int port_impl_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param) +{ + struct port *p = port->user_data; + return spa_node_port_enum_params(p->node, port->direction, port->port_id, index, param); +} + +static int port_impl_set_param(struct pw_port *port, struct spa_param *param) +{ + struct port *p = port->user_data; + return spa_node_port_set_param(p->node, port->direction, port->port_id, param); +} + +static int port_impl_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) +{ + struct port *p = port->user_data; + return spa_node_port_use_buffers(p->node, port->direction, port->port_id, buffers, n_buffers); +} + +static int port_impl_alloc_buffers(struct pw_port *port, + struct spa_param **params, uint32_t n_params, + struct spa_buffer **buffers, uint32_t *n_buffers) +{ + struct port *p = port->user_data; + return spa_node_port_alloc_buffers(p->node, port->direction, port->port_id, + params, n_params, buffers, n_buffers); +} + +static int port_impl_reuse_buffer(struct pw_port *port, uint32_t buffer_id) +{ + struct port *p = port->user_data; + return spa_node_port_reuse_buffer(p->node, port->port_id, buffer_id); +} + +static int port_impl_send_command(struct pw_port *port, struct spa_command *command) +{ + struct port *p = port->user_data; + return spa_node_port_send_command(p->node, + port->direction, + port->port_id, + command); +} + +const struct pw_port_implementation port_impl = { + PW_VERSION_PORT_IMPLEMENTATION, + port_impl_enum_formats, + port_impl_set_format, + port_impl_get_format, + port_impl_get_info, + port_impl_enum_params, + port_impl_set_param, + port_impl_use_buffers, + port_impl_alloc_buffers, + port_impl_reuse_buffer, + port_impl_send_command, +}; + +static struct pw_port * +make_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id) +{ + struct impl *impl = node->user_data; + struct pw_port *port; + struct port *p; + + port = pw_port_new(direction, port_id, sizeof(struct port)); + if (port == NULL) + return NULL; + + p = port->user_data; + p->node = impl->node; + + port->implementation = &port_impl; + + spa_node_port_set_io(impl->node, direction, port_id, &port->io); + + pw_port_add(port, node); + + return port; +} + +static void update_port_ids(struct impl *impl) +{ + struct pw_node *this = impl->this; + uint32_t *input_port_ids, *output_port_ids; + uint32_t n_input_ports, n_output_ports, max_input_ports, max_output_ports; + uint32_t i; + struct spa_list *ports; + + spa_node_get_n_ports(impl->node, + &n_input_ports, &max_input_ports, &n_output_ports, &max_output_ports); + + this->info.max_input_ports = max_input_ports; + this->info.max_output_ports = max_output_ports; + + input_port_ids = alloca(sizeof(uint32_t) * n_input_ports); + output_port_ids = alloca(sizeof(uint32_t) * n_output_ports); + + spa_node_get_port_ids(impl->node, + max_input_ports, input_port_ids, max_output_ports, output_port_ids); + + pw_log_debug("node %p: update_port ids %u/%u, %u/%u", this, + n_input_ports, max_input_ports, n_output_ports, max_output_ports); + + i = 0; + ports = &this->input_ports; + while (true) { + struct pw_port *p = (ports == &this->input_ports) ? NULL : + SPA_CONTAINER_OF(ports, struct pw_port, link); + + if (p && i < n_input_ports && p->port_id == input_port_ids[i]) { + pw_log_debug("node %p: exiting input port %d", this, input_port_ids[i]); + i++; + ports = ports->next; + } else if ((p && i < n_input_ports && input_port_ids[i] < p->port_id) + || i < n_input_ports) { + struct pw_port *np; + pw_log_debug("node %p: input port added %d", this, input_port_ids[i]); + np = make_port(this, PW_DIRECTION_INPUT, input_port_ids[i]); + + ports = np->link.next; + i++; + } else if (p) { + ports = ports->next; + pw_log_debug("node %p: input port removed %d", this, p->port_id); + pw_port_destroy(p); + } else { + pw_log_debug("node %p: no more input ports", this); + break; + } + } + + i = 0; + ports = &this->output_ports; + while (true) { + struct pw_port *p = (ports == &this->output_ports) ? NULL : + SPA_CONTAINER_OF(ports, struct pw_port, link); + + if (p && i < n_output_ports && p->port_id == output_port_ids[i]) { + pw_log_debug("node %p: exiting output port %d", this, output_port_ids[i]); + i++; + ports = ports->next; + } else if ((p && i < n_output_ports && output_port_ids[i] < p->port_id) + || i < n_output_ports) { + struct pw_port *np; + pw_log_debug("node %p: output port added %d", this, output_port_ids[i]); + np = make_port(this, PW_DIRECTION_OUTPUT, output_port_ids[i]); + ports = np->link.next; + i++; + } else if (p) { + ports = ports->next; + pw_log_debug("node %p: output port removed %d", this, p->port_id); + pw_port_destroy(p); + } else { + pw_log_debug("node %p: no more output ports", this); + break; + } + } +} + + +static int node_impl_get_props(struct pw_node *node, struct spa_props **props) +{ + struct impl *impl = node->user_data; + return spa_node_get_props(impl->node, props); +} + +static int node_impl_set_props(struct pw_node *node, const struct spa_props *props) +{ + struct impl *impl = node->user_data; + return spa_node_set_props(impl->node, props); +} + +static int node_impl_send_command(struct pw_node *node, struct spa_command *command) +{ + struct impl *impl = node->user_data; + return spa_node_send_command(impl->node, command); +} + +static struct pw_port* +node_impl_add_port(struct pw_node *node, + enum pw_direction direction, + uint32_t port_id) +{ + struct impl *impl = node->user_data; + int res; + + if ((res = spa_node_add_port(impl->node, direction, port_id)) < 0) { + pw_log_error("node %p: could not add port %d %d", node, port_id, res); + return NULL; + } + + return make_port(node, direction, port_id); +} + +static const struct pw_node_implementation node_impl = { + PW_VERSION_NODE_IMPLEMENTATION, + node_impl_get_props, + node_impl_set_props, + node_impl_send_command, + node_impl_add_port, +}; + +static void pw_spa_node_destroy(void *object) +{ + struct pw_node *node = object; + struct impl *impl = node->user_data; + + pw_log_debug("spa-node %p: destroy", node); + + if (impl->handle) { + spa_handle_clear(impl->handle); + free(impl->handle); + } + free(impl->lib); + free(impl->factory_name); + if (impl->hnd) + dlclose(impl->hnd); +} + +static void complete_init(struct impl *impl) +{ + struct pw_node *this = impl->this; + update_port_ids(impl); + pw_node_export(this); +} + +static void on_node_done(struct spa_node *node, int seq, int res, void *user_data) +{ + struct impl *impl = user_data; + struct pw_node *this = impl->this; + + if (impl->async_init) { + complete_init(impl); + impl->async_init = false; + } + + pw_log_debug("spa-node %p: async complete event %d %d", this, seq, res); + pw_signal_emit(&this->async_complete, this, seq, res); +} + +static void on_node_event(struct spa_node *node, struct spa_event *event, void *user_data) +{ + struct impl *impl = user_data; + struct pw_node *this = impl->this; + + pw_signal_emit(&this->event, this, event); +} + +static void on_node_need_input(struct spa_node *node, void *user_data) +{ + struct impl *impl = user_data; + struct pw_node *this = impl->this; + + spa_graph_scheduler_pull(this->rt.sched, &this->rt.node); + while (spa_graph_scheduler_iterate(this->rt.sched)); +} + +static void on_node_have_output(struct spa_node *node, void *user_data) +{ + struct impl *impl = user_data; + struct pw_node *this = impl->this; + + spa_graph_scheduler_push(this->rt.sched, &this->rt.node); + while (spa_graph_scheduler_iterate(this->rt.sched)); +} + +static void +on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data) +{ + struct impl *impl = user_data; + struct pw_node *this = impl->this; + struct spa_graph_port *p; + + spa_list_for_each(p, &this->rt.node.ports[SPA_DIRECTION_INPUT], link) { + if (p->port_id != port_id) + continue; + + if (p->peer && p->peer->node->methods->reuse_buffer) { + struct spa_graph_node *n = p->peer->node; + n->methods->reuse_buffer(p->peer, buffer_id, n->user_data); + } + break; + } +} + +static const struct spa_node_callbacks node_callbacks = { + SPA_VERSION_NODE_CALLBACKS, + &on_node_done, + &on_node_event, + &on_node_need_input, + &on_node_have_output, + &on_node_reuse_buffer, +}; + +struct pw_node * +pw_spa_node_new(struct pw_core *core, + struct pw_resource *owner, + const char *name, + bool async, + struct spa_node *node, + struct spa_clock *clock, + struct pw_properties *properties) +{ + struct pw_node *this; + struct impl *impl; + + if (node->info) { + uint32_t i; + + if (properties == NULL) + properties = pw_properties_new(NULL, NULL); + + if (properties) + return NULL; + + for (i = 0; i < node->info->n_items; i++) + pw_properties_set(properties, + node->info->items[i].key, + node->info->items[i].value); + } + + this = pw_node_new(core, owner, name, properties, sizeof(struct impl)); + if (this == NULL) + return NULL; + + this->destroy = pw_spa_node_destroy; + this->implementation = &node_impl; + this->rt.methods = spa_graph_scheduler_default; + this->rt.node.user_data = node; + this->clock = clock; + + impl = this->user_data; + impl->this = this; + impl->node = node; + impl->async_init = async; + + if (spa_node_set_callbacks(impl->node, &node_callbacks, impl) < 0) + pw_log_warn("spa-node %p: error setting callback", this); + + if (!async) { + complete_init(impl); + } + + return this; +} + +static int +setup_props(struct pw_core *core, struct spa_node *spa_node, struct pw_properties *pw_props) +{ + int res; + struct spa_props *props; + void *state = NULL; + const char *key; + + if ((res = spa_node_get_props(spa_node, &props)) != SPA_RESULT_OK) { + pw_log_debug("spa_node_get_props failed: %d", res); + return SPA_RESULT_ERROR; + } + + while ((key = pw_properties_iterate(pw_props, &state))) { + struct spa_pod_prop *prop; + uint32_t id; + + if (!spa_type_is_a(key, SPA_TYPE_PROPS_BASE)) + continue; + + id = spa_type_map_get_id(core->type.map, key); + if (id == SPA_ID_INVALID) + continue; + + if ((prop = spa_pod_object_find_prop(&props->object, id))) { + const char *value = pw_properties_get(pw_props, key); + + pw_log_info("configure prop %s", key); + + switch(prop->body.value.type) { + case SPA_POD_TYPE_ID: + SPA_POD_VALUE(struct spa_pod_id, &prop->body.value) = + spa_type_map_get_id(core->type.map, value); + break; + case SPA_POD_TYPE_INT: + SPA_POD_VALUE(struct spa_pod_int, &prop->body.value) = atoi(value); + break; + case SPA_POD_TYPE_LONG: + SPA_POD_VALUE(struct spa_pod_long, &prop->body.value) = atoi(value); + break; + case SPA_POD_TYPE_FLOAT: + SPA_POD_VALUE(struct spa_pod_float, &prop->body.value) = atof(value); + break; + case SPA_POD_TYPE_DOUBLE: + SPA_POD_VALUE(struct spa_pod_double, &prop->body.value) = atof(value); + break; + case SPA_POD_TYPE_STRING: + break; + default: + break; + } + } + } + + if ((res = spa_node_set_props(spa_node, props)) != SPA_RESULT_OK) { + pw_log_debug("spa_node_set_props failed: %d", res); + return SPA_RESULT_ERROR; + } + return SPA_RESULT_OK; +} + + +struct pw_node *pw_spa_node_load(struct pw_core *core, + struct pw_resource *owner, + const char *lib, + const char *factory_name, + const char *name, + struct pw_properties *properties) +{ + struct pw_node *this; struct impl *impl; struct spa_node *spa_node; struct spa_clock *spa_clock; @@ -51,6 +503,11 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core, const struct spa_handle_factory *factory; void *iface; char *filename; + const char *dir; + bool async; + + if ((dir = getenv("SPA_PLUGIN_DIR")) == NULL) + dir = PLUGINDIR; asprintf(&filename, "%s/%s.so", dir, lib); @@ -79,6 +536,8 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core, pw_log_error("can't make factory instance: %d", res); goto init_failed; } + async = SPA_RESULT_IS_ASYNC(res); + if ((res = spa_handle_get_interface(handle, core->type.spa_node, &iface)) < 0) { pw_log_error("can't get node interface %d", res); goto interface_failed; @@ -90,21 +549,17 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core, } spa_clock = iface; - impl = calloc(1, sizeof(struct impl)); - impl->core = core; - impl->hnd = hnd; - this = &impl->this; - - if (setup_func != NULL) { - if (setup_func(core, spa_node, properties) != SPA_RESULT_OK) { + if (properties != NULL) { + if (setup_props(core, spa_node, properties) != SPA_RESULT_OK) { pw_log_debug("Unrecognized properties"); } } - this->node = pw_node_new(core, NULL, name, false, spa_node, spa_clock, properties); - this->lib = filename; - this->factory_name = strdup(factory_name); - this->handle = handle; + this = pw_spa_node_new(core, owner, name, async, spa_node, spa_clock, properties); + impl->hnd = hnd; + impl->handle = handle; + impl->lib = filename; + impl->factory_name = strdup(factory_name); return this; @@ -119,20 +574,3 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core, free(filename); return NULL; } - -void pw_spa_node_destroy(struct pw_spa_node *node) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - - pw_log_debug("spa-node %p: destroy", impl); - pw_signal_emit(&node->destroy_signal, node); - - pw_node_destroy(node->node); - - spa_handle_clear(node->handle); - free(node->handle); - free(node->lib); - free(node->factory_name); - dlclose(impl->hnd); - free(impl); -} diff --git a/pipewire/modules/spa/spa-node.h b/pipewire/modules/spa/spa-node.h index de857ad12..e4cf12059 100644 --- a/pipewire/modules/spa/spa-node.h +++ b/pipewire/modules/spa/spa-node.h @@ -27,27 +27,22 @@ extern "C" { #endif -struct pw_spa_node { - struct pw_node *node; +struct pw_node * +pw_spa_node_new(struct pw_core *core, + struct pw_resource *owner, /**< optional owner */ + const char *name, + bool async, + struct spa_node *node, + struct spa_clock *clock, + struct pw_properties *properties); - char *lib; - char *factory_name; - struct spa_handle *handle; - PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_spa_node *node)); -}; - -typedef int (*setup_node_t) (struct pw_core *core, - struct spa_node *spa_node, - struct pw_properties *pw_props); - -struct pw_spa_node * +struct pw_node * pw_spa_node_load(struct pw_core *core, - const char *dir, + struct pw_resource *owner, /**< optional owner */ const char *lib, const char *factory_name, const char *name, - struct pw_properties *properties, - setup_node_t setup_func); + struct pw_properties *properties); #ifdef __cplusplus } diff --git a/pipewire/server/client.c b/pipewire/server/client.c index 12424d6ff..21a20f9b6 100644 --- a/pipewire/server/client.c +++ b/pipewire/server/client.c @@ -146,15 +146,16 @@ void pw_client_destroy(struct pw_client *client) pw_map_for_each(&client->objects, destroy_resource, client); pw_log_debug("client %p: free", impl); + + if (client->destroy) + client->destroy(client); + pw_map_clear(&client->objects); pw_map_clear(&client->types); if (client->properties) pw_properties_free(client->properties); - if (client->destroy) - client->destroy(client); - free(impl); } diff --git a/pipewire/server/core.c b/pipewire/server/core.c index 9b8c981a8..0f030a860 100644 --- a/pipewire/server/core.c +++ b/pipewire/server/core.c @@ -621,19 +621,13 @@ struct spa_format *pw_core_find_format(struct pw_core *core, if (in_state == PW_PORT_STATE_CONFIGURE && out_state > PW_PORT_STATE_CONFIGURE) { /* only input needs format */ - if ((res = spa_node_port_get_format(output->node->node, - SPA_DIRECTION_OUTPUT, - output->port_id, - (const struct spa_format **) &format)) < 0) { + if ((res = pw_port_get_format(output, (const struct spa_format **) &format)) < 0) { asprintf(error, "error get output format: %d", res); goto error; } } else if (out_state == PW_PORT_STATE_CONFIGURE && in_state > PW_PORT_STATE_CONFIGURE) { /* only output needs format */ - if ((res = spa_node_port_get_format(input->node->node, - SPA_DIRECTION_INPUT, - input->port_id, - (const struct spa_format **) &format)) < 0) { + if ((res = pw_port_get_format(input, (const struct spa_format **) &format)) < 0) { asprintf(error, "error get input format: %d", res); goto error; } @@ -641,9 +635,7 @@ struct spa_format *pw_core_find_format(struct pw_core *core, again: /* both ports need a format */ pw_log_debug("core %p: finding best format", core); - if ((res = spa_node_port_enum_formats(input->node->node, - SPA_DIRECTION_INPUT, - input->port_id, &filter, NULL, iidx)) < 0) { + if ((res = pw_port_enum_formats(input, &filter, NULL, iidx)) < 0) { if (res == SPA_RESULT_ENUM_END && iidx != 0) { asprintf(error, "error input enum formats: %d", res); goto error; @@ -653,10 +645,7 @@ struct spa_format *pw_core_find_format(struct pw_core *core, if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) spa_debug_format(filter); - if ((res = spa_node_port_enum_formats(output->node->node, - SPA_DIRECTION_OUTPUT, - output->port_id, - &format, filter, oidx)) < 0) { + if ((res = pw_port_enum_formats(output, &format, filter, oidx)) < 0) { if (res == SPA_RESULT_ENUM_END) { oidx = 0; iidx++; diff --git a/pipewire/server/link.c b/pipewire/server/link.c index e4e7523ad..065e3978d 100644 --- a/pipewire/server/link.c +++ b/pipewire/server/link.c @@ -124,10 +124,8 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st format = spa_format_copy(format); if (out_state > PW_PORT_STATE_CONFIGURE && this->output->node->info.state == PW_NODE_STATE_IDLE) { - if ((res = spa_node_port_get_format(this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port_id, - (const struct spa_format **) ¤t)) < 0) { + if ((res = pw_port_get_format(this->output, + (const struct spa_format **) ¤t)) < 0) { asprintf(&error, "error get output format: %d", res); goto error; } @@ -140,10 +138,8 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st pw_node_update_state(this->output->node, PW_NODE_STATE_RUNNING, NULL); } if (in_state > PW_PORT_STATE_CONFIGURE && this->input->node->info.state == PW_NODE_STATE_IDLE) { - if ((res = spa_node_port_get_format(this->input->node->node, - SPA_DIRECTION_INPUT, - this->input->port_id, - (const struct spa_format **) ¤t)) < 0) { + if ((res = pw_port_get_format(this->input, + (const struct spa_format **) ¤t)) < 0) { asprintf(&error, "error get input format: %d", res); goto error; } @@ -360,17 +356,17 @@ static struct spa_buffer **alloc_buffers(struct pw_link *this, } static int -spa_node_param_filter(struct pw_link *this, - struct spa_node *in_node, - uint32_t in_port, - struct spa_node *out_node, uint32_t out_port, struct spa_pod_builder *result) +param_filter(struct pw_link *this, + struct pw_port *in_port, + struct pw_port *out_port, + struct spa_pod_builder *result) { int res; struct spa_param *oparam, *iparam; int iidx, oidx, num = 0; for (iidx = 0;; iidx++) { - if (spa_node_port_enum_params(in_node, SPA_DIRECTION_INPUT, in_port, iidx, &iparam) + if (pw_port_enum_params(in_port, iidx, &iparam) < 0) break; @@ -381,8 +377,7 @@ spa_node_param_filter(struct pw_link *this, struct spa_pod_frame f; uint32_t offset; - if (spa_node_port_enum_params(out_node, SPA_DIRECTION_OUTPUT, - out_port, oidx, &oparam) < 0) + if (pw_port_enum_params(out_port, oidx, &oparam) < 0) break; if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG)) @@ -397,8 +392,7 @@ spa_node_param_filter(struct pw_link *this, SPA_POD_CONTENTS(struct spa_param, iparam), SPA_POD_CONTENTS_SIZE(struct spa_param, iparam), SPA_POD_CONTENTS(struct spa_param, oparam), - SPA_POD_CONTENTS_SIZE(struct spa_param, - oparam))) < 0) { + SPA_POD_CONTENTS_SIZE(struct spa_param, oparam))) < 0) { result->offset = offset; result->stack = NULL; continue; @@ -426,14 +420,11 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s pw_log_debug("link %p: doing alloc buffers %p %p", this, this->output->node, this->input->node); /* find out what's possible */ - if ((res = spa_node_port_get_info(this->output->node->node, - SPA_DIRECTION_OUTPUT, - this->output->port_id, &oinfo)) < 0) { + if ((res = pw_port_get_info(this->output, &oinfo)) < 0) { asprintf(&error, "error get output port info: %d", res); goto error; } - if ((res = spa_node_port_get_info(this->input->node->node, - SPA_DIRECTION_INPUT, this->input->port_id, &iinfo)) < 0) { + if ((res = pw_port_get_info(this->input, &iinfo)) < 0) { asprintf(&error, "error get input port info: %d", res); goto error; } @@ -493,11 +484,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s uint32_t max_buffers; size_t minsize = 1024, stride = 0; - n_params = spa_node_param_filter(this, - this->input->node->node, - this->input->port_id, - this->output->node->node, - this->output->port_id, &b); + n_params = param_filter(this, this->input, this->output, &b); params = alloca(n_params * sizeof(struct spa_param *)); for (i = 0, offset = 0; i < n_params; i++) { @@ -775,8 +762,7 @@ do_remove_input(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_link *this = user_data; - struct pw_port *port = ((struct pw_port **) data)[0]; - spa_graph_port_remove(port->rt.graph, &this->rt.in_port); + spa_graph_port_remove(&this->rt.in_port); return SPA_RESULT_OK; } @@ -789,7 +775,7 @@ static void input_remove(struct pw_link *this, struct pw_port *port) pw_signal_remove(&impl->input_async_complete); pw_loop_invoke(port->node->data_loop->loop, - do_remove_input, 1, sizeof(struct pw_port*), &port, true, this); + do_remove_input, 1, 0, NULL, true, this); clear_port_buffers(this, this->input); } @@ -799,8 +785,7 @@ do_remove_output(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_link *this = user_data; - struct pw_port *port = ((struct pw_port **) data)[0]; - spa_graph_port_remove(port->rt.graph, &this->rt.out_port); + spa_graph_port_remove(&this->rt.out_port); return SPA_RESULT_OK; } @@ -813,7 +798,7 @@ static void output_remove(struct pw_link *this, struct pw_port *port) pw_signal_remove(&impl->output_async_complete); pw_loop_invoke(port->node->data_loop->loop, - do_remove_output, 1, sizeof(struct pw_port*), &port, true, this); + do_remove_output, 1, 0, NULL, true, this); clear_port_buffers(this, this->output); } @@ -866,7 +851,7 @@ do_activate_link(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_link *this = user_data; - spa_graph_port_link(this->output->node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port); + spa_graph_port_link(&this->rt.out_port, &this->rt.in_port); return SPA_RESULT_OK; } @@ -897,7 +882,7 @@ do_deactivate_link(struct spa_loop *loop, bool async, uint32_t seq, size_t size, void *data, void *user_data) { struct pw_link *this = user_data; - spa_graph_port_unlink(this->output->node->rt.sched->graph, &this->rt.out_port); + spa_graph_port_unlink(&this->rt.out_port); return SPA_RESULT_OK; } @@ -990,21 +975,9 @@ do_add_link(struct spa_loop *loop, struct pw_port *port = ((struct pw_port **) data)[0]; if (port->direction == PW_DIRECTION_OUTPUT) { - spa_graph_port_add(port->rt.graph, - &port->rt.mix_node, - &this->rt.out_port, - PW_DIRECTION_OUTPUT, - this->rt.out_port.port_id, - 0, - &this->io); + spa_graph_port_add(&port->rt.mix_node, &this->rt.out_port); } else { - spa_graph_port_add(port->rt.graph, - &port->rt.mix_node, - &this->rt.in_port, - PW_DIRECTION_INPUT, - this->rt.in_port.port_id, - 0, - &this->io); + spa_graph_port_add(&port->rt.mix_node, &this->rt.in_port); } return SPA_RESULT_OK; @@ -1089,6 +1062,17 @@ struct pw_link *pw_link_new(struct pw_core *core, this->info.input_port_id = input ? input->port_id : -1; this->info.format = NULL; + spa_graph_port_init(&this->rt.out_port, + PW_DIRECTION_OUTPUT, + this->rt.out_port.port_id, + 0, + &this->io); + spa_graph_port_init(&this->rt.in_port, + PW_DIRECTION_INPUT, + this->rt.in_port.port_id, + 0, + &this->io); + pw_loop_invoke(output_node->data_loop->loop, do_add_link, SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this); diff --git a/pipewire/server/meson.build b/pipewire/server/meson.build index f03ff83d1..8fc7613ad 100644 --- a/pipewire/server/meson.build +++ b/pipewire/server/meson.build @@ -10,6 +10,7 @@ pipewirecore_headers = [ 'node-factory.h', 'port.h', 'resource.h', +# 'spa-node.h', 'work-queue.h', ] @@ -25,6 +26,7 @@ pipewirecore_sources = [ 'node-factory.c', 'port.c', 'resource.c', +# 'spa-node.c', 'work-queue.c', ] diff --git a/pipewire/server/node.c b/pipewire/server/node.c index 96ca23949..c93be3bcb 100644 --- a/pipewire/server/node.c +++ b/pipewire/server/node.c @@ -21,8 +21,6 @@ #include #include -#include - #include "pipewire/client/pipewire.h" #include "pipewire/client/interfaces.h" @@ -36,156 +34,37 @@ struct impl { struct pw_node this; struct pw_work_queue *work; + struct pw_listener on_async_complete; + struct pw_listener on_event; - bool async_init; + bool exported; }; /** \endcond */ -static void init_complete(struct pw_node *this); - -static void update_port_ids(struct pw_node *node) -{ - struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); - uint32_t *input_port_ids, *output_port_ids; - uint32_t n_input_ports, n_output_ports, max_input_ports, max_output_ports; - uint32_t i; - struct spa_list *ports; - int res; - - if (node->node == NULL) - return; - - spa_node_get_n_ports(node->node, - &n_input_ports, &max_input_ports, &n_output_ports, &max_output_ports); - - node->info.n_input_ports = n_input_ports; - node->info.max_input_ports = max_input_ports; - node->info.n_output_ports = n_output_ports; - node->info.max_output_ports = max_output_ports; - - node->input_port_map = calloc(max_input_ports, sizeof(struct pw_port *)); - node->output_port_map = calloc(max_output_ports, sizeof(struct pw_port *)); - - input_port_ids = alloca(sizeof(uint32_t) * n_input_ports); - output_port_ids = alloca(sizeof(uint32_t) * n_output_ports); - - spa_node_get_port_ids(node->node, - max_input_ports, input_port_ids, max_output_ports, output_port_ids); - - pw_log_debug("node %p: update_port ids %u/%u, %u/%u", node, - n_input_ports, max_input_ports, n_output_ports, max_output_ports); - - i = 0; - ports = &node->input_ports; - while (true) { - struct pw_port *p = (ports == &node->input_ports) ? NULL : - SPA_CONTAINER_OF(ports, struct pw_port, link); - - if (p && i < n_input_ports && p->port_id == input_port_ids[i]) { - node->input_port_map[p->port_id] = p; - pw_log_debug("node %p: exiting input port %d", node, input_port_ids[i]); - i++; - ports = ports->next; - } else if ((p && i < n_input_ports && input_port_ids[i] < p->port_id) - || i < n_input_ports) { - struct pw_port *np; - pw_log_debug("node %p: input port added %d", node, input_port_ids[i]); - - np = pw_port_new(node, PW_DIRECTION_INPUT, input_port_ids[i]); - if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_INPUT, - np->port_id, &np->io)) < 0) - pw_log_warn("node %p: can't set input IO %d", node, res); - - spa_list_insert(ports, &np->link); - ports = np->link.next; - node->input_port_map[np->port_id] = np; - - if (!impl->async_init) - pw_signal_emit(&node->port_added, node, np); - i++; - } else if (p) { - node->input_port_map[p->port_id] = NULL; - ports = ports->next; - if (!impl->async_init) - pw_signal_emit(&node->port_removed, node, p); - pw_log_debug("node %p: input port removed %d", node, p->port_id); - pw_port_destroy(p); - } else { - pw_log_debug("node %p: no more input ports", node); - break; - } - } - - i = 0; - ports = &node->output_ports; - while (true) { - struct pw_port *p = (ports == &node->output_ports) ? NULL : - SPA_CONTAINER_OF(ports, struct pw_port, link); - - if (p && i < n_output_ports && p->port_id == output_port_ids[i]) { - pw_log_debug("node %p: exiting output port %d", node, output_port_ids[i]); - i++; - ports = ports->next; - node->output_port_map[p->port_id] = p; - } else if ((p && i < n_output_ports && output_port_ids[i] < p->port_id) - || i < n_output_ports) { - struct pw_port *np; - pw_log_debug("node %p: output port added %d", node, output_port_ids[i]); - - np = pw_port_new(node, PW_DIRECTION_OUTPUT, output_port_ids[i]); - if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_OUTPUT, - np->port_id, &np->io)) < 0) - pw_log_warn("node %p: can't set output IO %d", node, res); - - spa_list_insert(ports, &np->link); - ports = np->link.next; - node->output_port_map[np->port_id] = np; - - if (!impl->async_init) - pw_signal_emit(&node->port_added, node, np); - i++; - } else if (p) { - node->output_port_map[p->port_id] = NULL; - ports = ports->next; - if (!impl->async_init) - pw_signal_emit(&node->port_removed, node, p); - pw_log_debug("node %p: output port removed %d", node, p->port_id); - pw_port_destroy(p); - } else { - pw_log_debug("node %p: no more output ports", node); - break; - } - } -} - static int pause_node(struct pw_node *this) { - int res; + int res = SPA_RESULT_OK; if (this->info.state <= PW_NODE_STATE_IDLE) return SPA_RESULT_OK; pw_log_debug("node %p: pause node", this); - - if ((res = spa_node_send_command(this->node, - &SPA_COMMAND_INIT(this->core->type.command_node.Pause))) < - 0) - pw_log_debug("got error %d", res); + if ((res = this->implementation->send_command(this, + &SPA_COMMAND_INIT(this->core->type.command_node.Pause))) < 0) + pw_log_debug("node %p: send command error %d", this, res); return res; } static int start_node(struct pw_node *this) { - int res; + int res = SPA_RESULT_OK; pw_log_debug("node %p: start node", this); - - if ((res = spa_node_send_command(this->node, - &SPA_COMMAND_INIT(this->core->type.command_node.Start))) < - 0) - pw_log_debug("got error %d", res); + if ((res = this->implementation->send_command(this, + &SPA_COMMAND_INIT(this->core->type.command_node.Start))) < 0) + pw_log_debug("node %p: send command error %d", this, res); return res; } @@ -211,23 +90,33 @@ static int suspend_node(struct pw_node *this) return res; } +static void on_async_complete(struct pw_listener *listener, + struct pw_node *node, uint32_t seq, int res) +{ + struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, on_async_complete); + struct pw_node *this = &impl->this; + + pw_log_debug("node %p: async complete event %d %d", this, seq, res); + pw_work_queue_complete(impl->work, this, seq, res); +} + static void send_clock_update(struct pw_node *this) { int res; struct spa_command_node_clock_update cu = - SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(this->core->type.command_node.ClockUpdate, - SPA_COMMAND_NODE_CLOCK_UPDATE_TIME | - SPA_COMMAND_NODE_CLOCK_UPDATE_SCALE | - SPA_COMMAND_NODE_CLOCK_UPDATE_STATE | - SPA_COMMAND_NODE_CLOCK_UPDATE_LATENCY, /* change_mask */ - 1, /* rate */ - 0, /* ticks */ - 0, /* monotonic_time */ - 0, /* offset */ - (1 << 16) | 1, /* scale */ - SPA_CLOCK_STATE_RUNNING, /* state */ - 0, /* flags */ - 0); /* latency */ + SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(this->core->type.command_node.ClockUpdate, + SPA_COMMAND_NODE_CLOCK_UPDATE_TIME | + SPA_COMMAND_NODE_CLOCK_UPDATE_SCALE | + SPA_COMMAND_NODE_CLOCK_UPDATE_STATE | + SPA_COMMAND_NODE_CLOCK_UPDATE_LATENCY, /* change_mask */ + 1, /* rate */ + 0, /* ticks */ + 0, /* monotonic_time */ + 0, /* offset */ + (1 << 16) | 1, /* scale */ + SPA_CLOCK_STATE_RUNNING, /* state */ + 0, /* flags */ + 0); /* latency */ if (this->clock && this->live) { cu.body.flags.value = SPA_COMMAND_NODE_CLOCK_UPDATE_FLAG_LIVE; @@ -236,73 +125,20 @@ static void send_clock_update(struct pw_node *this) &cu.body.ticks.value, &cu.body.monotonic_time.value); } - - if ((res = spa_node_send_command(this->node, (struct spa_command *) &cu)) < 0) - pw_log_debug("got error %d", res); + if ((res = this->implementation->send_command(this, (struct spa_command *) &cu)) < 0) + pw_log_debug("node %p: send clock update error %d", this, res); } -static void on_node_done(struct spa_node *node, int seq, int res, void *user_data) +static void on_event(struct pw_listener *listener, + struct pw_node *node, struct spa_event *event) { - struct impl *impl = user_data; + struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, on_event); struct pw_node *this = &impl->this; - pw_log_debug("node %p: async complete event %d %d", this, seq, res); - pw_work_queue_complete(impl->work, this, seq, res); - pw_signal_emit(&this->async_complete, this, seq, res); -} - -static void on_node_event(struct spa_node *node, struct spa_event *event, void *user_data) -{ - struct impl *impl = user_data; - struct pw_node *this = &impl->this; - - if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) { - send_clock_update(this); - } -} - -static void on_node_need_input(struct spa_node *node, void *user_data) -{ - struct impl *impl = user_data; - struct pw_node *this = &impl->this; - - spa_graph_scheduler_pull(this->rt.sched, &this->rt.node); - while (spa_graph_scheduler_iterate(this->rt.sched)); -} - -static void on_node_have_output(struct spa_node *node, void *user_data) -{ - struct impl *impl = user_data; - struct pw_node *this = &impl->this; - - spa_graph_scheduler_push(this->rt.sched, &this->rt.node); - while (spa_graph_scheduler_iterate(this->rt.sched)); -} - -static void -on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data) -{ - -#if 0 - struct impl *impl = user_data; - struct pw_node *this = &impl->this; - struct pw_port *inport; - - pw_log_trace("node %p: reuse buffer %u", this, buffer_id); - - spa_list_for_each(inport, &this->input_ports, link) { - struct pw_link *link; - struct pw_port *outport; - - spa_list_for_each(link, &inport->rt.links, rt.input_link) { - if (link->rt.input == NULL || link->rt.output == NULL) - continue; - - outport = link->rt.output; - outport->io.buffer_id = buffer_id; - } - } -#endif + pw_log_trace("node %p: event %d", this, SPA_EVENT_TYPE(event)); + if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) { + send_clock_update(this); + } } static void node_unbind_func(void *data) @@ -316,32 +152,38 @@ update_info(struct pw_node *this) { this->info.id = this->global->id; this->info.input_formats = NULL; - for (this->info.n_input_formats = 0;; this->info.n_input_formats++) { - struct spa_format *fmt; - if (spa_node_port_enum_formats(this->node, - SPA_DIRECTION_INPUT, - 0, &fmt, NULL, this->info.n_input_formats) < 0) - break; + if (!spa_list_is_empty(&this->input_ports)) { + struct pw_port *port = spa_list_first(&this->input_ports, struct pw_port, link); - this->info.input_formats = - realloc(this->info.input_formats, - sizeof(struct spa_format *) * (this->info.n_input_formats + 1)); - this->info.input_formats[this->info.n_input_formats] = spa_format_copy(fmt); + for (this->info.n_input_formats = 0;; this->info.n_input_formats++) { + struct spa_format *fmt; + + if (pw_port_enum_formats(port, &fmt, NULL, this->info.n_input_formats) < 0) + break; + + this->info.input_formats = + realloc(this->info.input_formats, + sizeof(struct spa_format *) * (this->info.n_input_formats + 1)); + this->info.input_formats[this->info.n_input_formats] = spa_format_copy(fmt); + } } + this->info.output_formats = NULL; - for (this->info.n_output_formats = 0;; this->info.n_output_formats++) { - struct spa_format *fmt; + if (!spa_list_is_empty(&this->output_ports)) { + struct pw_port *port = spa_list_first(&this->output_ports, struct pw_port, link); - if (spa_node_port_enum_formats(this->node, - SPA_DIRECTION_OUTPUT, - 0, &fmt, NULL, this->info.n_output_formats) < 0) - break; + for (this->info.n_output_formats = 0;; this->info.n_output_formats++) { + struct spa_format *fmt; - this->info.output_formats = - realloc(this->info.output_formats, - sizeof(struct spa_format *) * (this->info.n_output_formats + 1)); - this->info.output_formats[this->info.n_output_formats] = spa_format_copy(fmt); + if (pw_port_enum_formats(port, &fmt, NULL, this->info.n_output_formats) < 0) + break; + + this->info.output_formats = + realloc(this->info.output_formats, + sizeof(struct spa_format *) * (this->info.n_output_formats + 1)); + this->info.output_formats[this->info.n_output_formats] = spa_format_copy(fmt); + } } this->info.props = this->properties ? &this->properties->dict : NULL; } @@ -396,18 +238,22 @@ node_bind_func(struct pw_global *global, struct pw_client *client, uint32_t vers return SPA_RESULT_NO_MEMORY; } -static void init_complete(struct pw_node *this) +static int +do_node_add(struct spa_loop *loop, + bool async, uint32_t seq, size_t size, void *data, void *user_data) +{ + struct pw_node *this = user_data; + + spa_graph_node_add(this->rt.sched->graph, &this->rt.node); + + return SPA_RESULT_OK; +} + +void pw_node_export(struct pw_node *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); - spa_graph_node_add(this->rt.sched->graph, - &this->rt.node, - spa_graph_scheduler_default, - this->node); - - update_port_ids(this); - pw_log_debug("node %p: init completed", this); - impl->async_init = false; + pw_log_debug("node %p: export", this); spa_list_insert(this->core->node_list.prev, &this->link); @@ -415,33 +261,27 @@ static void init_complete(struct pw_node *this) this->owner, this->core->type.node, 0, this, node_bind_func, &this->global); + pw_loop_invoke(this->data_loop->loop, do_node_add, 1, 0, NULL, false, this); + update_info(this); + impl->exported = true; + pw_signal_emit(&this->initialized, this); pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL); } -static const struct spa_node_callbacks node_callbacks = { - SPA_VERSION_NODE_CALLBACKS, - &on_node_done, - &on_node_event, - &on_node_need_input, - &on_node_have_output, - &on_node_reuse_buffer, -}; - struct pw_node *pw_node_new(struct pw_core *core, struct pw_resource *owner, const char *name, - bool async, - struct spa_node *node, - struct spa_clock *clock, struct pw_properties *properties) + struct pw_properties *properties, + size_t user_data_size) { struct impl *impl; struct pw_node *this; - impl = calloc(1, sizeof(struct impl)); + impl = calloc(1, sizeof(struct impl) + user_data_size); if (impl == NULL) return NULL; @@ -450,66 +290,45 @@ struct pw_node *pw_node_new(struct pw_core *core, this->owner = owner; pw_log_debug("node %p: new, owner %p", this, owner); + if (user_data_size > 0) + this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); + impl->work = pw_work_queue_new(this->core->main_loop->loop); this->info.name = strdup(name); this->properties = properties; - this->node = node; - this->clock = clock; this->data_loop = core->data_loop; this->rt.sched = &core->rt.sched; spa_list_init(&this->resource_list); - if (spa_node_set_callbacks(this->node, &node_callbacks, impl) < 0) - pw_log_warn("node %p: error setting callback", this); - - pw_signal_init(&this->destroy_signal); - pw_signal_init(&this->port_added); - pw_signal_init(&this->port_removed); pw_signal_init(&this->state_request); pw_signal_init(&this->state_changed); + pw_signal_init(&this->initialized); + pw_signal_init(&this->port_added); + pw_signal_init(&this->port_removed); + pw_signal_init(&this->destroy_signal); pw_signal_init(&this->free_signal); pw_signal_init(&this->async_complete); - pw_signal_init(&this->initialized); + pw_signal_init(&this->event); + + pw_signal_add(&this->async_complete, &impl->on_async_complete, on_async_complete); + pw_signal_add(&this->event, &impl->on_event, on_event); this->info.state = PW_NODE_STATE_CREATING; spa_list_init(&this->input_ports); + pw_map_init(&this->input_port_map, 64, 64); spa_list_init(&this->output_ports); + pw_map_init(&this->output_port_map, 64, 64); - if (this->node->info) { - uint32_t i; - - if (this->properties == NULL) - this->properties = pw_properties_new(NULL, NULL); - - if (this->properties) - goto no_mem; - - for (i = 0; i < this->node->info->n_items; i++) - pw_properties_set(this->properties, - this->node->info->items[i].key, - this->node->info->items[i].value); - } - - impl->async_init = async; - if (async) { - pw_work_queue_add(impl->work, - this, - SPA_RESULT_RETURN_ASYNC(0), (pw_work_func_t) init_complete, NULL); - } else { - init_complete(this); - } + spa_graph_node_init(&this->rt.node, + &this->rt.methods, + this); return this; - - no_mem: - free((char *)this->info.name); - free(impl); - return NULL; } static int @@ -520,7 +339,7 @@ do_node_remove(struct spa_loop *loop, pause_node(this); - spa_graph_node_remove(this->rt.sched->graph, &this->rt.node); + spa_graph_node_remove(&this->rt.node); return SPA_RESULT_OK; } @@ -542,9 +361,9 @@ void pw_node_destroy(struct pw_node *node) pw_log_debug("node %p: destroy", impl); pw_signal_emit(&node->destroy_signal, node); - if (!impl->async_init) { - pw_loop_invoke(node->data_loop->loop, do_node_remove, 1, 0, NULL, true, node); + pw_loop_invoke(node->data_loop->loop, do_node_remove, 1, 0, NULL, true, node); + if (impl->exported) { spa_list_remove(&node->link); pw_global_destroy(node->global); node->global = NULL; @@ -564,22 +383,23 @@ void pw_node_destroy(struct pw_node *node) pw_port_destroy(port); } - pw_log_debug("node %p: free", node); pw_signal_emit(&node->free_signal, node); + if (node->destroy) + node->destroy(node); + pw_work_queue_destroy(impl->work); - if (node->input_port_map) - free(node->input_port_map); - if (node->output_port_map) - free(node->output_port_map); + pw_map_clear(&node->input_port_map); + pw_map_clear(&node->output_port_map); if (node->properties) pw_properties_free(node->properties); - clear_info(node); - free(impl); + clear_info(node); + + free(impl); } /** @@ -594,68 +414,46 @@ void pw_node_destroy(struct pw_node *node) */ struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction direction) { - uint32_t *n_ports, max_ports; - uint64_t change_mask; + uint32_t n_ports, max_ports; struct spa_list *ports; - struct pw_port *port = NULL, *p, **portmap; - int res; - int i; + struct pw_port *port = NULL, *p, *mixport = NULL; + struct pw_map *portmap; if (direction == PW_DIRECTION_INPUT) { max_ports = node->info.max_input_ports; - n_ports = &node->info.n_input_ports; + n_ports = node->info.n_input_ports; ports = &node->input_ports; - portmap = node->input_port_map; - change_mask = 1 << 1; + portmap = &node->input_port_map; } else { max_ports = node->info.max_output_ports; - n_ports = &node->info.n_output_ports; + n_ports = node->info.n_output_ports; ports = &node->output_ports; - portmap = node->output_port_map; - change_mask = 1 << 3; + portmap = &node->output_port_map; } - pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, *n_ports); + pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports); /* first try to find an unlinked port */ spa_list_for_each(p, ports, link) { if (spa_list_is_empty(&p->links)) return p; + /* for output we can reuse an existing port, for input only + * when there is a multiplex */ + if (direction == PW_DIRECTION_OUTPUT || p->mix != NULL) + mixport = p; } /* no port, can we create one ? */ - if (*n_ports < max_ports) { - for (i = 0; i < max_ports; i++) { - if (portmap[i] == NULL) { - pw_log_debug("node %p: creating port direction %d %u", node, direction, i); + if (max_ports == 0 || n_ports < max_ports) { + uint32_t port_id = pw_map_insert_new(portmap, NULL); - port = pw_port_new(node, direction, i); - if (port == NULL) - goto no_mem; + pw_log_debug("node %p: creating port direction %d %u", node, direction, port_id); - spa_list_insert(ports, &port->link); - - if ((res = spa_node_add_port(node->node, direction, i)) < 0) { - pw_log_error("node %p: could not add port %d", node, i); - pw_port_destroy(port); - continue; - } else { - spa_node_port_set_io(node->node, direction, i, &port->io); - } - (*n_ports)++; - node->info.change_mask |= change_mask; - portmap[i] = port; - break; - } - } + port = node->implementation->add_port(node, direction, port_id); + if (port == NULL) + goto no_mem; } else { - if (!spa_list_is_empty(ports)) { - port = spa_list_first(ports, struct pw_port, link); - /* for output we can reuse an existing port, for input only - * when there is a multiplex */ - if (direction == PW_DIRECTION_INPUT && port->mix == NULL) - port = NULL; - } + port = mixport; } return port; diff --git a/pipewire/server/node.h b/pipewire/server/node.h index 379eb3643..4ba59634e 100644 --- a/pipewire/server/node.h +++ b/pipewire/server/node.h @@ -39,6 +39,26 @@ extern "C" { #include #include +struct pw_node; + + +#define PW_VERSION_NODE_IMPLEMENTATION 0 + +struct pw_node_implementation { + uint32_t version; + + int (*get_props) (struct pw_node *node, struct spa_props **props); + + int (*set_props) (struct pw_node *node, const struct spa_props *props); + + int (*send_command) (struct pw_node *node, + struct spa_command *command); + + struct pw_port* (*add_port) (struct pw_node *node, + enum pw_direction direction, + uint32_t port_id); +}; + /** \page page_node Node * * \section page_node_overview Overview @@ -71,23 +91,24 @@ struct pw_node { struct pw_node *object, enum pw_node_state old, enum pw_node_state state)); - struct spa_handle *handle; /**< handle to SPA factory */ - struct spa_node *node; /**< handle to SPA node */ bool live; /**< if the node is live */ struct spa_clock *clock; /**< handle to SPA clock if any */ struct spa_list resource_list; /**< list of resources for this node */ + /** Implementation of core node functions */ + const struct pw_node_implementation *implementation; + /** Emited when the node is initialized */ PW_SIGNAL(initialized, (struct pw_listener *listener, struct pw_node *object)); struct spa_list input_ports; /**< list of input ports */ - struct pw_port **input_port_map; /**< map from port_id to port */ + struct pw_map input_port_map; /**< map from port_id to port */ uint32_t n_used_input_links; /**< number of active input links */ uint32_t idle_used_input_links; /**< number of active input to be idle */ struct spa_list output_ports; /**< list of output ports */ - struct pw_port **output_port_map; /**< map from port_id to port */ + struct pw_map output_port_map; /**< map from port_id to port */ uint32_t n_used_output_links; /**< number of active output links */ uint32_t idle_used_output_links; /**< number of active output to be idle */ @@ -107,13 +128,20 @@ struct pw_node { PW_SIGNAL(async_complete, (struct pw_listener *listener, struct pw_node *node, uint32_t seq, int res)); + /** an event is emited */ + PW_SIGNAL(event, (struct pw_listener *listener, + struct pw_node *node, struct spa_event *event)); + struct pw_data_loop *data_loop; /**< the data loop for this node */ struct { + struct spa_graph_node_methods methods; struct spa_graph_scheduler *sched; struct spa_graph_node node; } rt; + void *user_data; /**< extra user data */ + pw_destroy_t destroy; /**< function to clean up the object */ }; /** Create a new node \memberof pw_node */ @@ -121,10 +149,11 @@ struct pw_node * pw_node_new(struct pw_core *core, /**< the core */ struct pw_resource *owner, /**< optional owner */ const char *name, /**< node name */ - bool async, /**< if the node will initialize async */ - struct spa_node *node, /**< the node */ - struct spa_clock *clock, /**< optional clock */ - struct pw_properties *properties /**< extra properties */); + struct pw_properties *properties, /**< extra properties */ + size_t user_data_size /**< user data size */); + +/** Complete initialization of the node */ +void pw_node_export(struct pw_node *node); /** Destroy a node */ void pw_node_destroy(struct pw_node *node); diff --git a/pipewire/server/port.c b/pipewire/server/port.c index 2d8c81877..cdb555262 100644 --- a/pipewire/server/port.c +++ b/pipewire/server/port.c @@ -28,70 +28,148 @@ /** \cond */ struct impl { struct pw_port this; - - uint32_t seq; }; /** \endcond */ -static int schedule_tee(struct spa_graph_node *node) +static void port_update_state(struct pw_port *port, enum pw_port_state state) { - int res; - struct pw_port *this = node->user_data; - struct spa_graph_port *p; - struct spa_port_io *io = this->rt.mix_port.io; - - if (node->action == SPA_GRAPH_ACTION_IN) { - if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) { - io->status = SPA_RESULT_NEED_BUFFER; - res = SPA_RESULT_NEED_BUFFER; - } - else { - spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) - *p->io = *io; - io->status = SPA_RESULT_OK; - io->buffer_id = SPA_ID_INVALID; - res = SPA_RESULT_HAVE_BUFFER; - } + if (port->state != state) { + pw_log_debug("port %p: state %d -> %d", port, port->state, state); + port->state = state; + pw_signal_emit(&port->state_changed, port); } - else if (node->action == SPA_GRAPH_ACTION_OUT) { - spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) - *io = *p->io; - io->status = SPA_RESULT_NEED_BUFFER; - res = SPA_RESULT_NEED_BUFFER; - } - else - res = SPA_RESULT_ERROR; - - return res; } -static int schedule_mix(struct spa_graph_node *node) +static int schedule_tee_input(struct spa_graph_node *node, void *user_data) { int res; - struct pw_port *this = node->user_data; + struct pw_port *this = user_data; struct spa_graph_port *p; struct spa_port_io *io = this->rt.mix_port.io; - if (node->action == SPA_GRAPH_ACTION_IN) { - spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - *io = *p->io; - p->io->status = SPA_RESULT_OK; - p->io->buffer_id = SPA_ID_INVALID; - } + if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) { + io->status = SPA_RESULT_NEED_BUFFER; + res = SPA_RESULT_NEED_BUFFER; + } + else { + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) + *p->io = *io; + io->status = SPA_RESULT_OK; + io->buffer_id = SPA_ID_INVALID; res = SPA_RESULT_HAVE_BUFFER; } - else if (node->action == SPA_GRAPH_ACTION_OUT) { - io->status = SPA_RESULT_NEED_BUFFER; - spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) - *p->io = *io; - io->buffer_id = SPA_ID_INVALID; - res = SPA_RESULT_NEED_BUFFER; - } - else - res = SPA_RESULT_ERROR; + return res; +} +static int schedule_tee_output(struct spa_graph_node *node, void *user_data) +{ + struct pw_port *this = user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; - return res; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) + *io = *p->io; + io->status = SPA_RESULT_NEED_BUFFER; + + return SPA_RESULT_NEED_BUFFER; +} + +static int schedule_tee_reuse_buffer(struct spa_graph_port *port, uint32_t buffer_id, void *user_data) +{ + return SPA_RESULT_OK; +} + +static const struct spa_graph_node_methods schedule_tee = { + SPA_VERSION_GRAPH_NODE_METHODS, + schedule_tee_input, + schedule_tee_output, + schedule_tee_reuse_buffer, +}; + +static int schedule_mix_input(struct spa_graph_node *node, void *user_data) +{ + struct pw_port *this = user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; + + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + *io = *p->io; + p->io->status = SPA_RESULT_OK; + p->io->buffer_id = SPA_ID_INVALID; + } + return SPA_RESULT_HAVE_BUFFER; +} + +static int schedule_mix_output(struct spa_graph_node *node, void *user_data) +{ + struct pw_port *this = user_data; + struct spa_graph_port *p; + struct spa_port_io *io = this->rt.mix_port.io; + + io->status = SPA_RESULT_NEED_BUFFER; + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) + *p->io = *io; + io->buffer_id = SPA_ID_INVALID; + + return SPA_RESULT_NEED_BUFFER; +} + +static int schedule_mix_reuse_buffer(struct spa_graph_port *port, uint32_t buffer_id, void *user_data) +{ + return SPA_RESULT_OK; +} + +static const struct spa_graph_node_methods schedule_mix = { + SPA_VERSION_GRAPH_NODE_METHODS, + schedule_mix_input, + schedule_mix_output, + schedule_mix_reuse_buffer, +}; + +struct pw_port *pw_port_new(enum pw_direction direction, + uint32_t port_id, + size_t user_data_size) +{ + struct impl *impl; + struct pw_port *this; + + impl = calloc(1, sizeof(struct impl) + user_data_size); + if (impl == NULL) + return NULL; + + this = &impl->this; + pw_log_debug("port %p: new", this); + + this->direction = direction; + this->port_id = port_id; + this->state = PW_PORT_STATE_INIT; + this->io.status = SPA_RESULT_OK; + this->io.buffer_id = SPA_ID_INVALID; + + if (user_data_size > 0) + this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void); + + spa_list_init(&this->links); + + pw_signal_init(&this->state_changed); + pw_signal_init(&this->destroy_signal); + + spa_graph_port_init(&this->rt.port, + this->direction, + this->port_id, + 0, + &this->io); + spa_graph_node_init(&this->rt.mix_node, + this->direction == PW_DIRECTION_INPUT ? + &schedule_mix : + &schedule_tee, + this); + spa_graph_port_init(&this->rt.mix_port, + pw_direction_reverse(this->direction), + 0, + 0, + &this->io); + return this; } static int do_add_port(struct spa_loop *loop, @@ -99,57 +177,38 @@ static int do_add_port(struct spa_loop *loop, { struct pw_port *this = user_data; - spa_graph_port_add(this->rt.graph, - &this->node->rt.node, - &this->rt.port, - this->direction, - this->port_id, - 0, - &this->io); - spa_graph_node_add(this->rt.graph, - &this->rt.mix_node, - this->direction == PW_DIRECTION_INPUT ? schedule_mix : schedule_tee, - this); - spa_graph_port_add(this->rt.graph, - &this->rt.mix_node, - &this->rt.mix_port, - pw_direction_reverse(this->direction), - 0, - 0, - &this->io); - spa_graph_port_link(this->rt.graph, - &this->rt.port, - &this->rt.mix_port); + spa_graph_port_add(&this->node->rt.node, &this->rt.port); + spa_graph_node_add(this->rt.graph, &this->rt.mix_node); + spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port); + spa_graph_port_link(&this->rt.port, &this->rt.mix_port); return SPA_RESULT_OK; } -struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id) +void pw_port_add(struct pw_port *port, struct pw_node *node) { - struct impl *impl; - struct pw_port *this; + port->node = node; - impl = calloc(1, sizeof(struct impl)); - if (impl == NULL) - return NULL; + pw_log_debug("port %p: add to node %p", port, node); + if (port->direction == PW_DIRECTION_INPUT) { + spa_list_insert(&node->input_ports, &port->link); + pw_map_insert_at(&node->input_port_map, port->port_id, port); + node->info.n_input_ports++; + node->info.change_mask |= 1 << 1; + } + else { + spa_list_insert(&node->output_ports, &port->link); + pw_map_insert_at(&node->output_port_map, port->port_id, port); + node->info.n_output_ports++; + node->info.change_mask |= 1 << 3; + } - this = &impl->this; - this->node = node; - this->direction = direction; - this->port_id = port_id; - this->state = PW_PORT_STATE_CONFIGURE; - this->io.status = SPA_RESULT_OK; - this->io.buffer_id = SPA_ID_INVALID; + port->rt.graph = node->rt.sched->graph; + pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port); - spa_list_init(&this->links); + port_update_state(port, PW_PORT_STATE_CONFIGURE); - pw_signal_init(&this->destroy_signal); - - this->rt.graph = node->rt.sched->graph; - - pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, this); - - return this; + pw_signal_emit(&node->port_added, node, port); } static int do_remove_port(struct spa_loop *loop, @@ -158,61 +217,70 @@ static int do_remove_port(struct spa_loop *loop, struct pw_port *this = user_data; struct spa_graph_port *p; - spa_graph_port_unlink(this->rt.graph, - &this->rt.port); - spa_graph_port_remove(this->rt.graph, - &this->rt.port); + spa_graph_port_unlink(&this->rt.port); + spa_graph_port_remove(&this->rt.port); spa_list_for_each(p, &this->rt.mix_node.ports[this->direction], link) - spa_graph_port_remove(this->rt.graph, p); + spa_graph_port_remove(p); - spa_graph_port_remove(this->rt.graph, - &this->rt.mix_port); - spa_graph_node_remove(this->rt.graph, - &this->rt.mix_node); + spa_graph_port_remove(&this->rt.mix_port); + spa_graph_node_remove(&this->rt.mix_node); return SPA_RESULT_OK; } void pw_port_destroy(struct pw_port *port) { + struct pw_node *node = port->node; + pw_log_debug("port %p: destroy", port); pw_signal_emit(&port->destroy_signal, port); - pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port); + if (node) { + pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port); - spa_list_remove(&port->link); + if (port->direction == PW_DIRECTION_INPUT) { + pw_map_remove(&node->input_port_map, port->port_id); + node->info.n_input_ports--; + } + else { + pw_map_remove(&node->output_port_map, port->port_id); + node->info.n_output_ports--; + } + spa_list_remove(&port->link); + pw_signal_emit(&node->port_removed, node, port); + } + + if (port->destroy) + port->destroy(port); free(port); } -static void port_update_state(struct pw_port *port, enum pw_port_state state) -{ - if (port->state != state) { - pw_log_debug("port %p: state %d -> %d", port, port->state, state); - port->state = state; - } -} - static int do_port_pause(struct spa_loop *loop, - bool async, uint32_t seq, size_t size, void *data, void *user_data) + bool async, uint32_t seq, size_t size, void *data, void *user_data) { - struct pw_port *port = user_data; + struct pw_port *port = user_data; - return spa_node_port_send_command(port->node->node, - port->direction, - port->port_id, - &SPA_COMMAND_INIT(port->node->core->type.command_node. - Pause)); + return port->implementation->send_command(port, + &SPA_COMMAND_INIT(port->node->core->type.command_node.Pause)); +} + +int pw_port_enum_formats(struct pw_port *port, + struct spa_format **format, + const struct spa_format *filter, + int32_t index) +{ + return port->implementation->enum_formats(port, format, filter, index); } int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format) { int res; - res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format); + res = port->implementation->set_format(port, flags, format); pw_log_debug("port %p: set format %d", port, res); if (!SPA_RESULT_IS_ASYNC(res)) { @@ -231,9 +299,28 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format * return res; } +int pw_port_get_format(struct pw_port *port, const struct spa_format **format) +{ + return port->implementation->get_format(port, format); +} + +int pw_port_get_info(struct pw_port *port, const struct spa_port_info **info) +{ + return port->implementation->get_info(port, info); +} + +int pw_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param) +{ + return port->implementation->enum_params(port, index, param); +} + +int pw_port_set_param(struct pw_port *port, struct spa_param *param) +{ + return port->implementation->set_param(port, param); +} + int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) { - struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); int res; if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) @@ -243,13 +330,14 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 return SPA_RESULT_NO_FORMAT; if (port->state > PW_PORT_STATE_PAUSED) { - res = pw_loop_invoke(port->node->data_loop->loop, - do_port_pause, impl->seq++, 0, NULL, true, port); + pw_loop_invoke(port->node->data_loop->loop, + do_port_pause, 0, 0, NULL, true, port); port_update_state (port, PW_PORT_STATE_PAUSED); } pw_log_debug("port %p: use %d buffers", port, n_buffers); - res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers); + res = port->implementation->use_buffers(port, buffers, n_buffers); + port->buffers = buffers; port->n_buffers = n_buffers; if (port->allocated) @@ -268,21 +356,20 @@ int pw_port_alloc_buffers(struct pw_port *port, struct spa_param **params, uint32_t n_params, struct spa_buffer **buffers, uint32_t *n_buffers) { - struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this); int res; if (port->state < PW_PORT_STATE_READY) return SPA_RESULT_NO_FORMAT; if (port->state > PW_PORT_STATE_PAUSED) { - res = pw_loop_invoke(port->node->data_loop->loop, - do_port_pause, impl->seq++, 0, NULL, true, port); + pw_loop_invoke(port->node->data_loop->loop, + do_port_pause, 0, 0, NULL, true, port); port_update_state (port, PW_PORT_STATE_PAUSED); } pw_log_debug("port %p: alloc %d buffers", port, *n_buffers); - res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id, - params, n_params, buffers, n_buffers); + res = port->implementation->alloc_buffers(port, params, n_params, buffers, n_buffers); + port->buffers = buffers; port->n_buffers = *n_buffers; port->allocated = true; diff --git a/pipewire/server/port.h b/pipewire/server/port.h index be548ebc1..faa374535 100644 --- a/pipewire/server/port.h +++ b/pipewire/server/port.h @@ -44,6 +44,38 @@ enum pw_port_state { PW_PORT_STATE_STREAMING = 4, }; +struct pw_port; + +#define PW_VERSION_PORT_IMPLEMENTATION 0 + +struct pw_port_implementation { + uint32_t version; + + int (*enum_formats) (struct pw_port *port, + struct spa_format **format, + const struct spa_format *filter, + int32_t index); + + int (*set_format) (struct pw_port *port, uint32_t flags, struct spa_format *format); + + int (*get_format) (struct pw_port *port, const struct spa_format **format); + + int (*get_info) (struct pw_port *port, const struct spa_port_info **info); + + int (*enum_params) (struct pw_port *port, uint32_t index, struct spa_param **param); + + int (*set_param) (struct pw_port *port, struct spa_param *param); + + int (*use_buffers) (struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers); + + int (*alloc_buffers) (struct pw_port *port, + struct spa_param **params, uint32_t n_params, + struct spa_buffer **buffers, uint32_t *n_buffers); + int (*reuse_buffer) (struct pw_port *port, uint32_t buffer_id); + + int (*send_command) (struct pw_port *port, struct spa_command *command); +}; + /** \page page_port Port * * \section page_node_overview Overview @@ -60,9 +92,16 @@ struct pw_port { PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_port *)); struct pw_node *node; /**< owner node */ + enum pw_direction direction; /**< port direction */ uint32_t port_id; /**< port id */ + enum pw_port_state state; /**< state of the port */ + /** Emited when the port state changes */ + PW_SIGNAL(state_changed, (struct pw_listener *listener, struct pw_port *port)); + + const struct pw_port_implementation *implementation; + struct spa_port_io io; /**< io area of the port */ bool allocated; /**< if buffers are allocated */ @@ -80,19 +119,45 @@ struct pw_port { struct spa_graph_port mix_port; struct spa_graph_node mix_node; } rt; /**< data only accessed from the data thread */ + + void *user_data; /**< extra user data */ + pw_destroy_t destroy; /**< function to clean up the object */ }; /** Create a new port \memberof pw_port * \return a newly allocated port */ struct pw_port * -pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id); +pw_port_new(enum pw_direction direction, + uint32_t port_id, + size_t user_data_size); + +/** Add a port to a node \memberof pw_port */ +void pw_port_add(struct pw_port *port, struct pw_node *node); /** Destroy a port \memberof pw_port */ void pw_port_destroy(struct pw_port *port); +/** Get the current format on a port \memberof pw_port */ +int pw_port_enum_formats(struct pw_port *port, + struct spa_format **format, + const struct spa_format *filter, + int32_t index); + /** Set a format on a port \memberof pw_port */ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format); +/** Get the current format on a port \memberof pw_port */ +int pw_port_get_format(struct pw_port *port, const struct spa_format **format); + +/** Get the info on a port \memberof pw_port */ +int pw_port_get_info(struct pw_port *port, const struct spa_port_info **info); + +/** Enumerate the port parameters \memberof pw_port */ +int pw_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param); + +/** Set a port parameter \memberof pw_port */ +int pw_port_set_param(struct pw_port *port, struct spa_param *param); + /** Use buffers on a port \memberof pw_port */ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers); diff --git a/spa/include/spa/graph-scheduler1.h b/spa/include/spa/graph-scheduler1.h index 505d9f951..7c3636d51 100644 --- a/spa/include/spa/graph-scheduler1.h +++ b/spa/include/spa/graph-scheduler1.h @@ -42,21 +42,24 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, sched->node = NULL; } -static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +static inline int spa_graph_scheduler_input(struct spa_graph_node *node, void *user_data) { - int res; struct spa_node *n = node->user_data; - - if (node->action == SPA_GRAPH_ACTION_IN) - res = spa_node_process_input(n); - else if (node->action == SPA_GRAPH_ACTION_OUT) - res = spa_node_process_output(n); - else - res = SPA_RESULT_ERROR; - - return res; + return spa_node_process_input(n); } +static inline int spa_graph_scheduler_output(struct spa_graph_node *node, void *user_data) +{ + struct spa_node *n = node->user_data; + return spa_node_process_output(n); +} + +static const struct spa_graph_node_methods spa_graph_scheduler_default = { + SPA_VERSION_GRAPH_NODE_METHODS, + spa_graph_scheduler_input, + spa_graph_scheduler_output, +}; + static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, struct spa_graph_port *port) { struct spa_graph_node *node = port->node; @@ -93,15 +96,21 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched switch (n->action) { case SPA_GRAPH_ACTION_IN: - case SPA_GRAPH_ACTION_OUT: - n->state = n->schedule(n); - debug("node %p scheduled action %d state %d\n", n, n->action, n->state); - if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node) + n->state = n->methods->schedule_input(n, n->user_data); + debug("node %p scheduled input state %d\n", n, n->state); + if (n == sched->node) break; n->action = SPA_GRAPH_ACTION_CHECK; spa_list_insert(sched->ready.prev, &n->ready_link); break; + case SPA_GRAPH_ACTION_OUT: + n->state = n->methods->schedule_output(n, n->user_data); + debug("node %p scheduled output state %d\n", n, n->state); + n->action = SPA_GRAPH_ACTION_CHECK; + spa_list_insert(sched->ready.prev, &n->ready_link); + break; + case SPA_GRAPH_ACTION_CHECK: if (n->state == SPA_RESULT_NEED_BUFFER) { n->ready_in = 0; diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h index bff4fc24f..df97a032f 100644 --- a/spa/include/spa/graph-scheduler3.h +++ b/spa/include/spa/graph-scheduler3.h @@ -38,21 +38,34 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, sched->node = NULL; } -static inline int spa_graph_scheduler_default(struct spa_graph_node *node) +static inline int spa_graph_scheduler_input(struct spa_graph_node *node, void *user_data) { - int res; struct spa_node *n = node->user_data; - - if (node->action == SPA_GRAPH_ACTION_IN) - res = spa_node_process_input(n); - else if (node->action == SPA_GRAPH_ACTION_OUT) - res = spa_node_process_output(n); - else - res = SPA_RESULT_ERROR; - - return res; + return spa_node_process_input(n); } +static inline int spa_graph_scheduler_output(struct spa_graph_node *node, void *user_data) +{ + struct spa_node *n = node->user_data; + return spa_node_process_output(n); +} + +static inline int spa_graph_scheduler_reuse_buffer(struct spa_graph_port *port, + uint32_t buffer_id, void *user_data) +{ + printf("port %p reuse buffer %d\n", port, buffer_id); + struct spa_graph_node *node = port->node; + struct spa_node *n = node->user_data; + return spa_node_port_reuse_buffer(n, port->port_id, buffer_id); +} + +static const struct spa_graph_node_methods spa_graph_scheduler_default = { + SPA_VERSION_GRAPH_NODE_METHODS, + spa_graph_scheduler_input, + spa_graph_scheduler_output, + spa_graph_scheduler_reuse_buffer, +}; + static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node) { struct spa_graph_port *p; @@ -79,9 +92,8 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s } spa_list_for_each_safe(n, t, &ready, ready_link) { - n->action = SPA_GRAPH_ACTION_OUT; - n->state = n->schedule(n); - debug("peer %p scheduled %d %d\n", n, n->action, n->state); + n->state = n->methods->schedule_output(n, n->user_data); + debug("peer %p scheduled out %d\n", n, n->state); if (n->state == SPA_RESULT_NEED_BUFFER) spa_graph_scheduler_pull(sched, n); else { @@ -97,9 +109,8 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s debug("node %p %d %d\n", node, node->ready_in, node->required_in); if (node->required_in > 0 && node->ready_in == node->required_in) { - node->action = SPA_GRAPH_ACTION_IN; - node->state = node->schedule(node); - debug("node %p scheduled %d %d\n", node, node->action, node->state); + node->state = node->methods->schedule_input(node, node->user_data); + debug("node %p scheduled in %d\n", node, node->state); if (node->state == SPA_RESULT_HAVE_BUFFER) { spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { if (p->io->status == SPA_RESULT_HAVE_BUFFER) @@ -143,9 +154,8 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s } spa_list_for_each_safe(n, t, &ready, ready_link) { - n->action = SPA_GRAPH_ACTION_IN; - n->state = n->schedule(n); - debug("peer %p scheduled %d %d\n", n, n->action, n->state); + n->state = n->methods->schedule_input(n, n->user_data); + debug("peer %p scheduled in %d\n", n, n->state); if (n->state == SPA_RESULT_HAVE_BUFFER) spa_graph_scheduler_push(sched, n); else { @@ -159,9 +169,8 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s n->ready_link.next = NULL; } - node->action = SPA_GRAPH_ACTION_OUT; - node->state = node->schedule(node); - debug("node %p scheduled %d %d\n", node, node->action, node->state); + node->state = node->methods->schedule_output(node, node->user_data); + debug("node %p scheduled out %d\n", node, node->state); if (node->state == SPA_RESULT_NEED_BUFFER) { node->ready_in = 0; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { diff --git a/spa/include/spa/graph.h b/spa/include/spa/graph.h index 9283cad05..58928fa8c 100644 --- a/spa/include/spa/graph.h +++ b/spa/include/spa/graph.h @@ -42,7 +42,14 @@ struct spa_graph { struct spa_list nodes; }; -typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node); +#define SPA_VERSION_GRAPH_NODE_METHODS 0 +struct spa_graph_node_methods { + uint32_t version; + + int (*schedule_input) (struct spa_graph_node *node, void *user_data); + int (*schedule_output) (struct spa_graph_node *node, void *user_data); + int (*reuse_buffer) (struct spa_graph_port *port, uint32_t buffer_id, void *user_data); +}; struct spa_graph_node { struct spa_list link; @@ -55,7 +62,7 @@ struct spa_graph_node { #define SPA_GRAPH_ACTION_IN 1 #define SPA_GRAPH_ACTION_OUT 2 uint32_t action; - spa_graph_node_func_t schedule; + const struct spa_graph_node_methods *methods; void *user_data; uint32_t max_in; uint32_t required_in; @@ -78,44 +85,57 @@ static inline void spa_graph_init(struct spa_graph *graph) } static inline void -spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node, - spa_graph_node_func_t schedule, void *user_data) +spa_graph_node_init(struct spa_graph_node *node, + const struct spa_graph_node_methods *methods, + void *user_data) { spa_list_init(&node->ports[SPA_DIRECTION_INPUT]); spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]); node->flags = 0; + node->methods = methods; + node->user_data = user_data; + node->max_in = node->required_in = node->ready_in = 0; + debug("node %p init\n", node); +} + +static inline void +spa_graph_node_add(struct spa_graph *graph, + struct spa_graph_node *node) +{ node->state = SPA_RESULT_NEED_BUFFER; node->action = SPA_GRAPH_ACTION_OUT; - node->schedule = schedule; - node->user_data = user_data; node->ready_link.next = NULL; spa_list_insert(graph->nodes.prev, &node->link); - node->max_in = node->required_in = node->ready_in = 0; debug("node %p add\n", node); } static inline void -spa_graph_port_add(struct spa_graph *graph, - struct spa_graph_node *node, - struct spa_graph_port *port, - enum spa_direction direction, - uint32_t port_id, - uint32_t flags, - struct spa_port_io *io) +spa_graph_port_init(struct spa_graph_port *port, + enum spa_direction direction, + uint32_t port_id, + uint32_t flags, + struct spa_port_io *io) { - debug("port %p add type %d id %d to node %p \n", port, direction, port_id, node); - port->node = node; + debug("port %p init type %d id %d\n", port, direction, port_id); port->direction = direction; port->port_id = port_id; port->flags = flags; port->io = io; - spa_list_insert(node->ports[direction].prev, &port->link); +} + +static inline void +spa_graph_port_add(struct spa_graph_node *node, + struct spa_graph_port *port) +{ + debug("port %p add to node %p\n", port, node); + port->node = node; + spa_list_insert(node->ports[port->direction].prev, &port->link); node->max_in++; - if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT) + if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT) node->required_in++; } -static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node) +static inline void spa_graph_node_remove(struct spa_graph_node *node) { debug("node %p remove\n", node); spa_list_remove(&node->link); @@ -123,7 +143,7 @@ static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_gra spa_list_remove(&node->ready_link); } -static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_graph_port *port) +static inline void spa_graph_port_remove(struct spa_graph_port *port) { debug("port %p remove\n", port); spa_list_remove(&port->link); @@ -132,7 +152,7 @@ static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_gra } static inline void -spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct spa_graph_port *in) +spa_graph_port_link(struct spa_graph_port *out, struct spa_graph_port *in) { debug("port %p link to %p \n", out, in); out->peer = in; @@ -140,7 +160,7 @@ spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct } static inline void -spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *port) +spa_graph_port_unlink(struct spa_graph_port *port) { debug("port %p unlink from %p \n", port, port->peer); if (port->peer) { diff --git a/spa/tests/test-graph.c b/spa/tests/test-graph.c index f215da860..138d180e4 100644 --- a/spa/tests/test-graph.c +++ b/spa/tests/test-graph.c @@ -340,27 +340,27 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]); spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]); - spa_graph_node_add(&data->graph, &data->source_node, spa_graph_scheduler_default, - data->source); - spa_graph_port_add(&data->graph, &data->source_node, &data->source_out, - SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]); + spa_graph_node_init(&data->source_node, &spa_graph_scheduler_default, data->source); + spa_graph_node_add(&data->graph, &data->source_node); + spa_graph_port_init(&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]); + spa_graph_port_add(&data->source_node, &data->source_out); - spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_scheduler_default, - data->volume); - spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_in, SPA_DIRECTION_INPUT, - 0, 0, &data->source_volume_io[0]); + spa_graph_node_init(&data->volume_node, &spa_graph_scheduler_default, data->volume); + spa_graph_node_add(&data->graph, &data->volume_node); + spa_graph_port_init(&data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]); + spa_graph_port_add(&data->volume_node, &data->volume_in); - spa_graph_port_link(&data->graph, &data->source_out, &data->volume_in); + spa_graph_port_link(&data->source_out, &data->volume_in); - spa_graph_port_add(&data->graph, &data->volume_node, - &data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]); + spa_graph_port_init(&data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]); + spa_graph_port_add(&data->volume_node, &data->volume_out); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, - data->sink); - spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0, - 0, &data->volume_sink_io[0]); + spa_graph_node_init(&data->sink_node, &spa_graph_scheduler_default, data->sink); + spa_graph_node_add(&data->graph, &data->sink_node); + spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]); + spa_graph_port_add(&data->sink_node, &data->sink_in); - spa_graph_port_link(&data->graph, &data->volume_out, &data->sink_in); + spa_graph_port_link(&data->volume_out, &data->sink_in); return res; } diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 136ec4a64..87f9186de 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -418,35 +418,37 @@ static int make_nodes(struct data *data, const char *device) spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]); #ifdef USE_GRAPH - spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_scheduler_default, - data->source1); - spa_graph_port_add(&data->graph, &data->source1_node, &data->source1_out, - SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]); + spa_graph_node_init(&data->source1_node, &spa_graph_scheduler_default, data->source1); + spa_graph_port_init(&data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]); + spa_graph_port_add(&data->source1_node, &data->source1_out); + spa_graph_node_add(&data->graph, &data->source1_node); - spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_scheduler_default, - data->source2); - spa_graph_port_add(&data->graph, &data->source2_node, &data->source2_out, - SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]); + spa_graph_node_init(&data->source2_node, &spa_graph_scheduler_default, data->source2); + spa_graph_port_init(&data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]); + spa_graph_port_add(&data->source2_node, &data->source2_out); + spa_graph_node_add(&data->graph, &data->source2_node); - spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_scheduler_default, - data->mix); - spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[0], SPA_DIRECTION_INPUT, - data->mix_ports[0], 0, &data->source1_mix_io[0]); - spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[1], SPA_DIRECTION_INPUT, + spa_graph_node_init(&data->mix_node, &spa_graph_scheduler_default, data->mix); + spa_graph_port_init(&data->mix_in[0], SPA_DIRECTION_INPUT, + data->mix_ports[0], 0, &data->source1_mix_io[0]); + spa_graph_port_add(&data->mix_node, &data->mix_in[0]); + spa_graph_port_init(&data->mix_in[1], SPA_DIRECTION_INPUT, data->mix_ports[1], 0, &data->source2_mix_io[0]); + spa_graph_port_add(&data->mix_node, &data->mix_in[1]); + spa_graph_node_add(&data->graph, &data->mix_node); - spa_graph_port_link(&data->graph, &data->source1_out, &data->mix_in[0]); - spa_graph_port_link(&data->graph, &data->source2_out, &data->mix_in[1]); + spa_graph_port_link(&data->source1_out, &data->mix_in[0]); + spa_graph_port_link(&data->source2_out, &data->mix_in[1]); - spa_graph_port_add(&data->graph, &data->mix_node, - &data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]); + spa_graph_port_init(&data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]); + spa_graph_port_add(&data->mix_node, &data->mix_out); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, - data->sink); - spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0, - 0, &data->mix_sink_io[0]); + spa_graph_node_init(&data->sink_node, &spa_graph_scheduler_default, data->sink); + spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]); + spa_graph_port_add(&data->sink_node, &data->sink_in); + spa_graph_node_add(&data->graph, &data->sink_node); - spa_graph_port_link(&data->graph, &data->mix_out, &data->sink_in); + spa_graph_port_link(&data->mix_out, &data->sink_in); #endif return res; diff --git a/spa/tests/test-perf.c b/spa/tests/test-perf.c index e2b0ee022..44f39f4f4 100644 --- a/spa/tests/test-perf.c +++ b/spa/tests/test-perf.c @@ -366,19 +366,21 @@ static int make_nodes(struct data *data) spa_node_port_set_io(data->source, SPA_DIRECTION_OUTPUT, 0, &data->source_sink_io[0]); spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]); - spa_graph_node_add(&data->graph, &data->source_node, - spa_graph_scheduler_default, data->source); + spa_graph_node_init(&data->source_node, &spa_graph_scheduler_default, data->source); + spa_graph_node_add(&data->graph, &data->source_node); + data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; - spa_graph_port_add(&data->graph, &data->source_node, - &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]); + spa_graph_port_init( &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]); + spa_graph_port_add(&data->source_node, &data->source_out); + + spa_graph_node_init(&data->sink_node, &spa_graph_scheduler_default, data->sink); + spa_graph_node_add(&data->graph, &data->sink_node); - spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default, - data->sink); data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0; - spa_graph_port_add(&data->graph, &data->sink_node, - &data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_sink_io[0]); + spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_sink_io[0]); + spa_graph_port_add(&data->sink_node, &data->sink_in); - spa_graph_port_link(&data->graph, &data->source_out, &data->sink_in); + spa_graph_port_link(&data->source_out, &data->sink_in); return res; }