Rework node and ports

Rework the node and port API so that other implementations can be used
than the spa_node. The plan is to morph this into the stream and
context API.
Rework the graph API a little so that init + add is separated.
This commit is contained in:
Wim Taymans 2017-07-07 17:55:26 +02:00
parent b0f5d34cf7
commit 0738f7fcf5
28 changed files with 1585 additions and 857 deletions

View file

@ -23,7 +23,8 @@ pipewire_module_autolink = shared_library('pipewire-module-autolink', [ 'module-
dependencies : [mathlib, dl_lib, pipewire_dep, pipewirecore_dep],
)
pipewire_module_mixer = shared_library('pipewire-module-mixer', [ 'module-mixer.c' ],
pipewire_module_mixer = shared_library('pipewire-module-mixer',
[ 'module-mixer.c', 'spa/spa-node.c' ],
c_args : pipewire_module_c_args,
include_directories : [configinc, spa_inc],
link_with : spalib,
@ -36,6 +37,7 @@ pipewire_module_client_node = shared_library('pipewire-module-client-node',
[ 'module-client-node.c',
'module-client-node/client-node.c',
'module-client-node/protocol-native.c',
'spa/spa-node.c',
'extension-client-node.c', ],
c_args : pipewire_module_c_args,
include_directories : [configinc, spa_inc],
@ -69,7 +71,8 @@ pipewire_module_protocol_native = shared_library('pipewire-module-protocol-nativ
if jack_dep.found()
pipewire_module_jack = shared_library('pipewire-module-jack',
[ 'module-jack.c',
'module-jack/shm.c' ],
'module-jack/shm.c',
'module-jack/jack-node.c' ],
c_args : pipewire_module_c_args,
include_directories : [configinc, spa_inc],
link_with : spalib,

View file

@ -37,6 +37,7 @@
#include "pipewire/client/transport.h"
#include "pipewire/server/core.h"
#include "pipewire/modules/spa/spa-node.h"
#include "client-node.h"
/** \cond */
@ -89,7 +90,6 @@ struct proxy {
struct spa_type_map *map;
struct spa_log *log;
struct spa_loop *main_loop;
struct spa_loop *data_loop;
const struct spa_node_callbacks *callbacks;
@ -1018,14 +1018,9 @@ proxy_init(struct proxy *this,
for (i = 0; i < n_support; i++) {
if (strcmp(support[i].type, SPA_TYPE__Log) == 0)
this->log = support[i].data;
else if (strcmp(support[i].type, SPA_TYPE_LOOP__MainLoop) == 0)
this->main_loop = support[i].data;
else if (strcmp(support[i].type, SPA_TYPE_LOOP__DataLoop) == 0)
this->data_loop = support[i].data;
}
if (this->main_loop == NULL) {
spa_log_error(this->log, "a main-loop is needed");
}
if (this->data_loop == NULL) {
spa_log_error(this->log, "a data-loop is needed");
}
@ -1171,8 +1166,13 @@ struct pw_client_node *pw_client_node_new(struct pw_client *client,
if (this->resource == NULL)
goto error_no_resource;
this->node = pw_node_new(client->core,
this->resource, name, true, &impl->proxy.node, NULL, properties);
this->node = pw_spa_node_new(client->core,
this->resource,
name,
true,
&impl->proxy.node,
NULL,
properties);
if (this->node == NULL)
goto error_no_node;

View file

@ -61,7 +61,7 @@
#define LOCK_SUFFIX ".lock"
#define LOCK_SUFFIXLEN 5
static int segment_num = 0;
int segment_num = 0;
typedef bool(*demarshal_func_t) (void *object, void *data, size_t size);
@ -223,8 +223,6 @@ handle_client_open(struct client *client)
char name[JACK_CLIENT_NAME_SIZE+1];
int result, ref_num, shared_engine, shared_client, shared_graph;
struct jack_client *jc;
size_t size;
jack_shm_info_t info;
CheckSize(kClientOpen_size);
CheckRead(&PID, sizeof(int));
@ -241,26 +239,22 @@ handle_client_open(struct client *client)
jc->owner = client;
jc->ref_num = ref_num;
if (jack_synchro_alloc(&server->synchro_table[ref_num],
name,
server->engine_control->server_name,
0,
false,
server->promiscuous) < 0) {
if (jack_synchro_init(&server->synchro_table[ref_num],
name,
server->engine_control->server_name,
0,
false,
server->promiscuous) < 0) {
result = -1;
goto reply;
}
size = sizeof(struct jack_client_control);
if (jack_shm_alloc(size, &info, segment_num++) < 0) {
jc->control = jack_client_control_alloc(name, client->client->ucred.pid, ref_num, -1);
if (jc->control == NULL) {
result = -1;
goto reply;
}
jc->control = (struct jack_client_control *)jack_shm_addr(&info);
jc->control->info = info;
server->client_table[ref_num] = jc;
result = 0;
@ -481,6 +475,78 @@ static struct client *client_new(struct impl *impl, int fd)
return NULL;
}
static int
make_int_client(struct impl *impl, struct pw_node *node)
{
struct jack_server *server = &impl->server;
struct jack_connection_manager *conn;
int ref_num;
struct jack_client *jc;
jack_port_id_t port_id;
ref_num = jack_server_allocate_ref_num(server);
if (ref_num == -1)
return -1;
if (jack_synchro_init(&server->synchro_table[ref_num],
"system",
server->engine_control->server_name,
0,
false,
server->promiscuous) < 0) {
return -1;
}
jc = calloc(1,sizeof(struct jack_client));
jc->ref_num = ref_num;
jc->node = node;
jc->control = jack_client_control_alloc("system", -1, ref_num, -1);
jc->control->active = true;
server->client_table[ref_num] = jc;
impl->server.engine_control->driver_num++;
conn = jack_graph_manager_next_start(server->graph_manager);
jack_connection_manager_init_ref_num(conn, ref_num);
port_id = jack_graph_manager_allocate_port(server->graph_manager,
ref_num, "system:playback_1", 2,
JackPortIsInput | JackPortIsPhysical | JackPortIsTerminal);
jack_connection_manager_add_port(conn, true, ref_num, port_id);
jack_graph_manager_next_stop(server->graph_manager);
return 0;
}
static bool init_nodes(struct impl *impl)
{
struct pw_core *core = impl->core;
struct pw_node *n;
spa_list_for_each(n, &core->node_list, link) {
const char *str;
if (n->global == NULL)
continue;
if (n->properties == NULL)
continue;
if ((str = pw_properties_get(n->properties, "media.class")) == NULL)
continue;
if (strcmp(str, "Audio/Sink") != 0)
continue;
make_int_client(impl, n);
}
return true;
}
static struct socket *create_socket(void)
{
struct socket *s;
@ -591,8 +657,6 @@ static bool add_socket(struct impl *impl, struct socket *s)
static int init_server(struct impl *impl, const char *name, bool promiscuous)
{
struct jack_server *server = &impl->server;
jack_shm_info_t info;
size_t size;
int i;
struct socket *s;
@ -604,28 +668,17 @@ static int init_server(struct impl *impl, const char *name, bool promiscuous)
jack_cleanup_shm();
/* graph manager */
size = sizeof(struct jack_graph_manager) + 2048 * sizeof(struct jack_port);
if (jack_shm_alloc(size, &info, segment_num++) < 0)
return -1;
server->graph_manager = (struct jack_graph_manager *)jack_shm_addr(&info);
server->graph_manager->info = info;
server->graph_manager = jack_graph_manager_alloc(2048);
/* engine control */
size = sizeof(struct jack_engine_control);
if (jack_shm_alloc(size, &info, segment_num++) < 0)
return -1;
server->engine_control = (struct jack_engine_control *)jack_shm_addr(&info);
server->engine_control->info = info;
strcpy(server->engine_control->server_name, name);
server->engine_control = jack_engine_control_alloc(name);
for (i = 0; i < CLIENT_NUM; i++)
server->synchro_table[i] = JACK_SYNCHRO_INIT;
if (!init_nodes(impl))
return -1;
s = create_socket();
if (!init_socket_name(s, name, promiscuous, 0))

View file

@ -42,6 +42,7 @@
#define CLIENT_NUM 256
#define JACK_ENGINE_ROLLING_COUNT 32
#define JACK_ENGINE_ROLLING_INTERVAL 1024
#define TIME_POINTS 100000
#define FAILURE_TIME_POINTS 10000
@ -55,6 +56,12 @@
#define JACK_SESSION_COMMAND_SIZE 256
#define NO_PORT 0xFFFE
#define EMPTY 0xFFFD
#define FREE 0xFFFC
typedef enum {
JACK_TIMER_SYSTEM_CLOCK,
JACK_TIMER_HPET,

View file

@ -21,6 +21,7 @@ struct jack_client {
int ref_num;
struct client *owner;
struct jack_client_control *control;
struct pw_node *node;
};
struct jack_server {

View file

@ -17,6 +17,30 @@
* Boston, MA 02110-1301, USA.
*/
#include <math.h>
extern int segment_num;
static inline int jack_shm_alloc(size_t size, jack_shm_info_t *info, int num)
{
char name[64];
snprintf(name, sizeof(name), "/jack_shared%d", num);
if (jack_shmalloc(name, size, info)) {
pw_log_error("Cannot create shared memory segment of size = %zd (%s)", size, strerror(errno));
return -1;
}
if (jack_attach_shm(info)) {
jack_error("Cannot attach shared memory segment name = %s err = %s", name, strerror(errno));
jack_destroy_shm(info);
return -1;
}
info->size = size;
return 0;
}
typedef uint16_t jack_int_t; // Internal type for ports and refnum
typedef enum {
@ -56,41 +80,107 @@ struct jack_port {
jack_default_audio_sample_t buffer[BUFFER_SIZE_MAX + 8];
} POST_PACKED_STRUCTURE;
#define MAKE_FIXED_ARRAY(size) \
PRE_PACKED_STRUCTURE \
struct { \
jack_int_t table[size]; \
uint32_t counter; \
static inline void jack_port_init(struct jack_port *port, int ref_num,
const char* port_name, int type_id, enum JackPortFlags flags)
{
port->type_id = type_id;
port->flags = flags;
strcpy(port->name, port_name);
port->alias1[0] = '\0';
port->alias2[0] = '\0';
port->ref_num = ref_num;
port->latency = 0;
port->total_latency = 0;
port->playback_latency.min = port->playback_latency.max = 0;
port->capture_latency.min = port->capture_latency.max = 0;
port->monitor_requests = 0;
port->in_use = true;
port->tied = NO_PORT;
}
#define MAKE_FIXED_ARRAY(size) \
PRE_PACKED_STRUCTURE \
struct { \
jack_int_t table[size]; \
uint32_t counter; \
} POST_PACKED_STRUCTURE
#define MAKE_FIXED_ARRAY1(size) \
PRE_PACKED_STRUCTURE \
struct { \
MAKE_FIXED_ARRAY(size) array; \
bool used; \
#define INIT_FIXED_ARRAY(arr) ({ \
int i; \
for (i = 0; i < SPA_N_ELEMENTS(arr.table); i++) \
arr.table[i] = EMPTY; \
arr.counter = 0; \
})
#define ADD_FIXED_ARRAY(arr,item) ({ \
int i,ret = -1; \
for (i = 0; i < SPA_N_ELEMENTS(arr.table); i++) { \
if (arr.table[i] == EMPTY) { \
arr.table[i] = item; \
arr.counter++; \
ret = 0; \
break; \
} \
} \
ret; \
})
#define MAKE_FIXED_ARRAY1(size) \
PRE_PACKED_STRUCTURE \
struct { \
MAKE_FIXED_ARRAY(size) array; \
bool used; \
} POST_PACKED_STRUCTURE
#define MAKE_FIXED_MATRIX(size) \
PRE_PACKED_STRUCTURE \
struct { \
jack_int_t table[size][size]; \
#define INIT_FIXED_ARRAY1(arr) ({ \
INIT_FIXED_ARRAY(arr.array); \
arr.used = false; \
})
#define ADD_FIXED_ARRAY1(arr,item) ADD_FIXED_ARRAY(arr.array,item)
#define MAKE_FIXED_MATRIX(size) \
PRE_PACKED_STRUCTURE \
struct { \
jack_int_t table[size][size]; \
} POST_PACKED_STRUCTURE
#define INIT_FIXED_MATRIX(mat,idx) ({ \
int i; \
for (i = 0; i < SPA_N_ELEMENTS(mat.table[0]); i++){ \
mat.table[idx][i] = 0; \
mat.table[i][idx] = 0; \
} \
})
PRE_PACKED_STRUCTURE
struct jack_activation_count {
int32_t value;
int32_t count;
} POST_PACKED_STRUCTURE;
#define MAKE_LOOP_FEEDBACK(size) \
PRE_PACKED_STRUCTURE \
struct { \
int table[size][3]; \
static inline void jack_activation_count_set_value(struct jack_activation_count *cnt, int32_t val) {
cnt->value = val;
}
#define MAKE_LOOP_FEEDBACK(size) \
PRE_PACKED_STRUCTURE \
struct { \
int table[size][3]; \
} POST_PACKED_STRUCTURE
#define INIT_LOOP_FEEDBACK(arr,size) ({ \
int i; \
for (i = 0; i < size; i++) { \
arr.table[i][0] = EMPTY; \
arr.table[i][1] = EMPTY; \
arr.table[i][2] = 0; \
} \
})
PRE_PACKED_STRUCTURE
struct jack_connection_manager {
MAKE_FIXED_ARRAY(CONNECTION_NUM_FOR_PORT) connections[PORT_NUM_MAX];
MAKE_FIXED_ARRAY(CONNECTION_NUM_FOR_PORT) connection[PORT_NUM_MAX];
MAKE_FIXED_ARRAY1(PORT_NUM_FOR_CLIENT) input_port[CLIENT_NUM];
MAKE_FIXED_ARRAY(PORT_NUM_FOR_CLIENT) output_port[CLIENT_NUM];
MAKE_FIXED_MATRIX(CLIENT_NUM) connection_ref;
@ -98,6 +188,38 @@ struct jack_connection_manager {
MAKE_LOOP_FEEDBACK(CONNECTION_NUM_FOR_PORT) loop_feedback;
} POST_PACKED_STRUCTURE;
static inline void
jack_connection_manager_init_ref_num(struct jack_connection_manager *conn, int ref_num)
{
INIT_FIXED_ARRAY1(conn->input_port[ref_num]);
INIT_FIXED_ARRAY(conn->output_port[ref_num]);
INIT_FIXED_MATRIX(conn->connection_ref, ref_num);
jack_activation_count_set_value (&conn->input_counter[ref_num], 0);
}
static inline void
jack_connection_manager_init(struct jack_connection_manager *conn)
{
int i;
for (i = 0; i < PORT_NUM_MAX; i++)
INIT_FIXED_ARRAY(conn->connection[i]);
INIT_LOOP_FEEDBACK(conn->loop_feedback, CONNECTION_NUM_FOR_PORT);
for (i = 0; i < CLIENT_NUM; i++)
jack_connection_manager_init_ref_num(conn, i);
}
static inline int
jack_connection_manager_add_port(struct jack_connection_manager *conn, bool input,
int ref_num, jack_port_id_t port_id)
{
if (input)
return ADD_FIXED_ARRAY1(conn->input_port[ref_num], port_id);
else
return ADD_FIXED_ARRAY(conn->output_port[ref_num], port_id);
}
PRE_PACKED_STRUCTURE
struct jack_atomic_counter {
union {
@ -109,6 +231,13 @@ struct jack_atomic_counter {
} info;
} POST_PACKED_STRUCTURE;
#define Counter(e) (e).info.long_val
#define CurIndex(e) (e).info.scounter.short_val1
#define NextIndex(e) (e).info.scounter.short_val2
#define CurArrayIndex(e) (CurIndex(e) & 0x0001)
#define NextArrayIndex(e) ((CurIndex(e) + 1) & 0x0001)
#define MAKE_ATOMIC_STATE(type) \
PRE_PACKED_STRUCTURE \
struct { \
@ -143,6 +272,104 @@ struct jack_graph_manager {
struct jack_port port_array[0];
} POST_PACKED_STRUCTURE;
static inline struct jack_graph_manager *
jack_graph_manager_alloc(int port_max)
{
struct jack_graph_manager *mgr;
jack_shm_info_t info;
size_t i, size;
size = sizeof(struct jack_graph_manager) + port_max * sizeof(struct jack_port);
if (jack_shm_alloc(size, &info, segment_num++) < 0)
return NULL;
mgr = (struct jack_graph_manager *)jack_shm_addr(&info);
mgr->info = info;
jack_connection_manager_init(&mgr->state.state[0]);
jack_connection_manager_init(&mgr->state.state[1]);
mgr->port_max = port_max;
for (i = 0; i < port_max; i++) {
mgr->port_array[i].in_use = false;
mgr->port_array[i].ref_num = -1;
}
return mgr;
}
static inline jack_port_id_t
jack_graph_manager_allocate_port(struct jack_graph_manager *mgr,
int ref_num, const char* port_name, int type_id,
enum JackPortFlags flags)
{
int i;
for (i = 1; i < mgr->port_max; i++) {
if (!mgr->port_array[i].in_use) {
jack_port_init(&mgr->port_array[i], ref_num, port_name, type_id, flags);
return i;
}
}
return NO_PORT;
}
static inline struct jack_connection_manager *
jack_graph_manager_next_start(struct jack_graph_manager *manager)
{
uint32_t next_index;
if (manager->state.call_write_counter++ == 0) {
struct jack_atomic_counter old_val;
struct jack_atomic_counter new_val;
uint32_t cur_index;
bool need_copy;
do {
old_val = manager->state.counter;
new_val = old_val;
cur_index = CurArrayIndex(new_val);
next_index = NextArrayIndex(new_val);
need_copy = (CurIndex(new_val) == NextIndex(new_val));
NextIndex(new_val) = CurIndex(new_val); // Invalidate next index
}
while (!__atomic_compare_exchange_n((uint32_t*)&manager->state.counter,
(uint32_t*)&Counter(old_val),
Counter(new_val),
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST));
if (need_copy)
memcpy(&manager->state.state[next_index],
&manager->state.state[cur_index],
sizeof(struct jack_connection_manager));
}
else {
next_index = NextArrayIndex(manager->state.counter);
}
return &manager->state.state[next_index];
}
static inline void
jack_graph_manager_next_stop(struct jack_graph_manager *manager)
{
if (--manager->state.call_write_counter == 0) {
struct jack_atomic_counter old_val;
struct jack_atomic_counter new_val;
do {
old_val = manager->state.counter;
new_val = old_val;
NextIndex(new_val)++; // Set next index
}
while (!__atomic_compare_exchange_n((uint32_t*)&manager->state.counter,
(uint32_t*)&Counter(old_val),
Counter(new_val),
false,
__ATOMIC_SEQ_CST,
__ATOMIC_SEQ_CST));
}
}
typedef enum {
TransportCommandNone = 0,
TransportCommandStart = 1,
@ -223,7 +450,7 @@ struct jack_engine_control {
jack_shm_info_t info;
jack_nframes_t buffer_size;
jack_nframes_t sample_rate;
bool sync_node;
bool sync_mode;
bool temporary;
jack_time_t period_usecs;
jack_time_t timeout_usecs;
@ -241,7 +468,6 @@ struct jack_engine_control {
int driver_num;
bool verbose;
// CPU Load
jack_time_t prev_cycle_time;
jack_time_t cur_cycle_time;
jack_time_t spare_usecs;
@ -252,12 +478,10 @@ struct jack_engine_control {
int rolling_interval;
float CPU_load;
// For OSX thread
uint64_t period;
uint64_t computation;
uint64_t constraint;
// Timer
struct jack_frame_timer frame_timer;
#ifdef JACK_MONITOR
@ -265,6 +489,63 @@ struct jack_engine_control {
#endif
} POST_PACKED_STRUCTURE;
static inline void
jack_engine_control_reset_rolling_usecs(struct jack_engine_control *ctrl)
{
memset(ctrl->rolling_client_usecs, 0, sizeof(ctrl->rolling_client_usecs));
ctrl->rolling_client_usecs_index = 0;
ctrl->rolling_client_usecs_cnt = 0;
ctrl->spare_usecs = 0;
ctrl->rolling_interval = floor((JACK_ENGINE_ROLLING_INTERVAL * 1000.f) / ctrl->period_usecs);
}
static inline struct jack_engine_control *
jack_engine_control_alloc(const char* name)
{
struct jack_engine_control *ctrl;
jack_shm_info_t info;
size_t size;
size = sizeof(struct jack_engine_control);
if (jack_shm_alloc(size, &info, segment_num++) < 0)
return NULL;
ctrl = (struct jack_engine_control *)jack_shm_addr(&info);
ctrl->info = info;
ctrl->buffer_size = 512;
ctrl->sample_rate = 48000;
ctrl->sync_mode = false;
ctrl->temporary = false;
ctrl->period_usecs = 1000000.f / ctrl->sample_rate * ctrl->buffer_size;
ctrl->timeout_usecs = 0;
ctrl->max_delayed_usecs = 0.f;
ctrl->xrun_delayed_usecs = 0.f;
ctrl->timeout = false;
ctrl->real_time = true;
ctrl->saved_real_time = false;
ctrl->server_priority = 20;
ctrl->client_priority = 15;
ctrl->max_client_priority = 19;
strcpy(ctrl->server_name, name);
ctrl->clock_source = 0;
ctrl->driver_num = 0;
ctrl->verbose = true;
ctrl->prev_cycle_time = 0;
ctrl->cur_cycle_time = 0;
ctrl->spare_usecs = 0;
ctrl->max_usecs = 0;
jack_engine_control_reset_rolling_usecs(ctrl);
ctrl->CPU_load = 0.f;
ctrl->period = 0;
ctrl->computation = 0;
ctrl->constraint = 0;
return ctrl;
}
PRE_PACKED_STRUCTURE
struct jack_client_control {
jack_shm_info_t info;
@ -282,23 +563,39 @@ struct jack_client_control {
jack_session_flags_t session_flags;
} POST_PACKED_STRUCTURE;
static inline int jack_shm_alloc(size_t size, jack_shm_info_t *info, int num)
static inline struct jack_client_control *
jack_client_control_alloc(const char* name, int pid, int ref_num, int uuid)
{
char name[64];
struct jack_client_control *ctrl;
jack_shm_info_t info;
size_t size;
snprintf(name, sizeof(name), "/jack_shared%d", num);
size = sizeof(struct jack_client_control);
if (jack_shm_alloc(size, &info, segment_num++) < 0)
return NULL;
if (jack_shmalloc(name, size, info)) {
pw_log_error("Cannot create shared memory segment of size = %zd (%s)", size, strerror(errno));
return -1;
}
ctrl = (struct jack_client_control *)jack_shm_addr(&info);
ctrl->info = info;
if (jack_attach_shm(info)) {
jack_error("Cannot attach shared memory segment name = %s err = %s", name, strerror(errno));
jack_destroy_shm(info);
return -1;
}
info->size = size;
return 0;
strcpy(ctrl->name, name);
for (int i = 0; i < jack_notify_max; i++)
ctrl->callback[i] = false;
// Always activated
ctrl->callback[jack_notify_AddClient] = true;
ctrl->callback[jack_notify_RemoveClient] = true;
ctrl->callback[jack_notify_ActivateClient] = true;
ctrl->callback[jack_notify_LatencyCallback] = true;
// So that driver synchro are correctly setup in "flush" or "normal" mode
ctrl->callback[jack_notify_StartFreewheelCallback] = true;
ctrl->callback[jack_notify_StopFreewheelCallback] = true;
ctrl->ref_num = ref_num;
ctrl->PID = pid;
ctrl->transport_state = JackTransportStopped;
ctrl->transport_sync = false;
ctrl->transport_timebase = false;
ctrl->active = false;
ctrl->session_ID = uuid;
return ctrl;
}

View file

@ -26,11 +26,11 @@ struct jack_synchro {
#define JACK_SYNCHRO_INIT (struct jack_synchro) { { 0, }, false, NULL }
static inline int
jack_synchro_alloc(struct jack_synchro *synchro,
const char *client_name,
const char *server_name,
int value, bool internal,
bool promiscuous)
jack_synchro_init(struct jack_synchro *synchro,
const char *client_name,
const char *server_name,
int value, bool internal,
bool promiscuous)
{
if (promiscuous)
snprintf(synchro->name, sizeof(synchro->name),
@ -39,9 +39,10 @@ jack_synchro_alloc(struct jack_synchro *synchro,
snprintf(synchro->name, sizeof(synchro->name),
"jack_sem.%d_%s_%s", getuid(), server_name, client_name);
synchro->flush = false;
if ((synchro->semaphore = sem_open(synchro->name, O_CREAT | O_RDWR, 0777, value)) == (sem_t*)SEM_FAILED) {
pw_log_error("can't check in named semaphore name = %s err = %s", synchro->name, strerror(errno));
return -1;
}
return true;
return 0;
}

View file

@ -26,6 +26,7 @@
#include "pipewire/server/core.h"
#include "pipewire/server/module.h"
#include "pipewire/modules/spa/spa-node.h"
#define AUDIOMIXER_LIB "audiomixer/libspa-audiomixer"
@ -108,7 +109,8 @@ static struct pw_node *make_node(struct impl *impl)
}
spa_clock = iface;
node = pw_node_new(impl->core, NULL, "audiomixer", false, spa_node, spa_clock, NULL);
node = pw_spa_node_new(impl->core, NULL, "audiomixer", false, spa_node, spa_clock, NULL);
return node;
interface_failed:

View file

@ -4,7 +4,7 @@ pipewire_module_spa_c_args = [
]
pipewire_module_spa_monitor = shared_library('pipewire-module-spa-monitor',
[ 'module-monitor.c', 'spa-monitor.c' ],
[ 'module-monitor.c', 'spa-monitor.c', 'spa-node.c' ],
c_args : pipewire_module_spa_c_args,
include_directories : [configinc, spa_inc],
link_with : spalib,

View file

@ -32,7 +32,6 @@
#include <pipewire/server/module.h>
#include "spa-monitor.h"
#include "spa-node.h"
bool pipewire__module_init(struct pw_module *module, const char *args)
{

View file

@ -34,71 +34,8 @@
#include "spa-monitor.h"
#include "spa-node.h"
static int
setup_props(struct pw_core *core, struct spa_node *spa_node, struct pw_properties *pw_props)
{
int res;
struct spa_props *props;
void *state = NULL;
const char *key;
if ((res = spa_node_get_props(spa_node, &props)) != SPA_RESULT_OK) {
pw_log_debug("spa_node_get_props failed: %d", res);
return SPA_RESULT_ERROR;
}
while ((key = pw_properties_iterate(pw_props, &state))) {
struct spa_pod_prop *prop;
uint32_t id;
if (!spa_type_is_a(key, SPA_TYPE_PROPS_BASE))
continue;
id = spa_type_map_get_id(core->type.map, key);
if (id == SPA_ID_INVALID)
continue;
if ((prop = spa_pod_object_find_prop(&props->object, id))) {
const char *value = pw_properties_get(pw_props, key);
pw_log_info("configure prop %s", key);
switch(prop->body.value.type) {
case SPA_POD_TYPE_ID:
SPA_POD_VALUE(struct spa_pod_id, &prop->body.value) =
spa_type_map_get_id(core->type.map, value);
break;
case SPA_POD_TYPE_INT:
SPA_POD_VALUE(struct spa_pod_int, &prop->body.value) = atoi(value);
break;
case SPA_POD_TYPE_LONG:
SPA_POD_VALUE(struct spa_pod_long, &prop->body.value) = atoi(value);
break;
case SPA_POD_TYPE_FLOAT:
SPA_POD_VALUE(struct spa_pod_float, &prop->body.value) = atof(value);
break;
case SPA_POD_TYPE_DOUBLE:
SPA_POD_VALUE(struct spa_pod_double, &prop->body.value) = atof(value);
break;
case SPA_POD_TYPE_STRING:
break;
default:
break;
}
}
}
if ((res = spa_node_set_props(spa_node, props)) != SPA_RESULT_OK) {
pw_log_debug("spa_node_set_props failed: %d", res);
return SPA_RESULT_ERROR;
}
return SPA_RESULT_OK;
}
bool pipewire__module_init(struct pw_module *module, const char *args)
{
const char *dir;
struct pw_properties *props = NULL;
char **argv;
int i, n_tokens;
@ -110,9 +47,6 @@ bool pipewire__module_init(struct pw_module *module, const char *args)
if (n_tokens < 3)
goto not_enough_arguments;
if ((dir = getenv("SPA_PLUGIN_DIR")) == NULL)
dir = PLUGINDIR;
props = pw_properties_new(NULL, NULL);
for (i = 3; i < n_tokens; i++) {
@ -126,7 +60,7 @@ bool pipewire__module_init(struct pw_module *module, const char *args)
pw_free_strv(prop);
}
pw_spa_node_load(module->core, dir, argv[0], argv[1], argv[2], props, setup_props);
pw_spa_node_load(module->core, NULL, argv[0], argv[1], argv[2], props);
pw_free_strv(argv);

View file

@ -33,6 +33,7 @@
#include <pipewire/server/node.h>
#include "spa-monitor.h"
#include "spa-node.h"
struct monitor_item {
char *id;
@ -107,7 +108,7 @@ static void add_item(struct pw_spa_monitor *this, struct spa_monitor_item *item)
mitem = calloc(1, sizeof(struct monitor_item));
mitem->id = strdup(id);
mitem->node = pw_node_new(impl->core, NULL, name, false, node_iface, clock_iface, props);
mitem->node = pw_spa_node_new(impl->core, NULL, name, false, node_iface, clock_iface, props);
spa_list_insert(impl->item_list.prev, &mitem->link);
}

View file

@ -17,6 +17,12 @@
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <spa/graph-scheduler3.h>
#include <string.h>
#include <stdio.h>
#include <dlfcn.h>
@ -26,20 +32,466 @@
#include "spa-node.h"
struct impl {
struct pw_spa_node this;
struct pw_core *core;
struct pw_node *this;
bool async_init;
void *hnd;
struct spa_handle *handle;
struct spa_node *node; /**< handle to SPA node */
char *lib;
char *factory_name;
};
struct pw_spa_node *pw_spa_node_load(struct pw_core *core,
const char *dir,
const char *lib,
const char *factory_name,
const char *name,
struct pw_properties *properties, setup_node_t setup_func)
struct port {
struct pw_port *port;
struct spa_node *node;
};
static int port_impl_enum_formats(struct pw_port *port,
struct spa_format **format,
const struct spa_format *filter,
int32_t index)
{
struct pw_spa_node *this;
struct port *p = port->user_data;
return spa_node_port_enum_formats(p->node, port->direction, port->port_id, format, filter, index);
}
static int port_impl_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format)
{
struct port *p = port->user_data;
return spa_node_port_set_format(p->node, port->direction, port->port_id, flags, format);
}
static int port_impl_get_format(struct pw_port *port, const struct spa_format **format)
{
struct port *p = port->user_data;
return spa_node_port_get_format(p->node, port->direction, port->port_id, format);
}
static int port_impl_get_info(struct pw_port *port, const struct spa_port_info **info)
{
struct port *p = port->user_data;
return spa_node_port_get_info(p->node, port->direction, port->port_id, info);
}
static int port_impl_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param)
{
struct port *p = port->user_data;
return spa_node_port_enum_params(p->node, port->direction, port->port_id, index, param);
}
static int port_impl_set_param(struct pw_port *port, struct spa_param *param)
{
struct port *p = port->user_data;
return spa_node_port_set_param(p->node, port->direction, port->port_id, param);
}
static int port_impl_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers)
{
struct port *p = port->user_data;
return spa_node_port_use_buffers(p->node, port->direction, port->port_id, buffers, n_buffers);
}
static int port_impl_alloc_buffers(struct pw_port *port,
struct spa_param **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers)
{
struct port *p = port->user_data;
return spa_node_port_alloc_buffers(p->node, port->direction, port->port_id,
params, n_params, buffers, n_buffers);
}
static int port_impl_reuse_buffer(struct pw_port *port, uint32_t buffer_id)
{
struct port *p = port->user_data;
return spa_node_port_reuse_buffer(p->node, port->port_id, buffer_id);
}
static int port_impl_send_command(struct pw_port *port, struct spa_command *command)
{
struct port *p = port->user_data;
return spa_node_port_send_command(p->node,
port->direction,
port->port_id,
command);
}
const struct pw_port_implementation port_impl = {
PW_VERSION_PORT_IMPLEMENTATION,
port_impl_enum_formats,
port_impl_set_format,
port_impl_get_format,
port_impl_get_info,
port_impl_enum_params,
port_impl_set_param,
port_impl_use_buffers,
port_impl_alloc_buffers,
port_impl_reuse_buffer,
port_impl_send_command,
};
static struct pw_port *
make_port(struct pw_node *node, enum pw_direction direction, uint32_t port_id)
{
struct impl *impl = node->user_data;
struct pw_port *port;
struct port *p;
port = pw_port_new(direction, port_id, sizeof(struct port));
if (port == NULL)
return NULL;
p = port->user_data;
p->node = impl->node;
port->implementation = &port_impl;
spa_node_port_set_io(impl->node, direction, port_id, &port->io);
pw_port_add(port, node);
return port;
}
static void update_port_ids(struct impl *impl)
{
struct pw_node *this = impl->this;
uint32_t *input_port_ids, *output_port_ids;
uint32_t n_input_ports, n_output_ports, max_input_ports, max_output_ports;
uint32_t i;
struct spa_list *ports;
spa_node_get_n_ports(impl->node,
&n_input_ports, &max_input_ports, &n_output_ports, &max_output_ports);
this->info.max_input_ports = max_input_ports;
this->info.max_output_ports = max_output_ports;
input_port_ids = alloca(sizeof(uint32_t) * n_input_ports);
output_port_ids = alloca(sizeof(uint32_t) * n_output_ports);
spa_node_get_port_ids(impl->node,
max_input_ports, input_port_ids, max_output_ports, output_port_ids);
pw_log_debug("node %p: update_port ids %u/%u, %u/%u", this,
n_input_ports, max_input_ports, n_output_ports, max_output_ports);
i = 0;
ports = &this->input_ports;
while (true) {
struct pw_port *p = (ports == &this->input_ports) ? NULL :
SPA_CONTAINER_OF(ports, struct pw_port, link);
if (p && i < n_input_ports && p->port_id == input_port_ids[i]) {
pw_log_debug("node %p: exiting input port %d", this, input_port_ids[i]);
i++;
ports = ports->next;
} else if ((p && i < n_input_ports && input_port_ids[i] < p->port_id)
|| i < n_input_ports) {
struct pw_port *np;
pw_log_debug("node %p: input port added %d", this, input_port_ids[i]);
np = make_port(this, PW_DIRECTION_INPUT, input_port_ids[i]);
ports = np->link.next;
i++;
} else if (p) {
ports = ports->next;
pw_log_debug("node %p: input port removed %d", this, p->port_id);
pw_port_destroy(p);
} else {
pw_log_debug("node %p: no more input ports", this);
break;
}
}
i = 0;
ports = &this->output_ports;
while (true) {
struct pw_port *p = (ports == &this->output_ports) ? NULL :
SPA_CONTAINER_OF(ports, struct pw_port, link);
if (p && i < n_output_ports && p->port_id == output_port_ids[i]) {
pw_log_debug("node %p: exiting output port %d", this, output_port_ids[i]);
i++;
ports = ports->next;
} else if ((p && i < n_output_ports && output_port_ids[i] < p->port_id)
|| i < n_output_ports) {
struct pw_port *np;
pw_log_debug("node %p: output port added %d", this, output_port_ids[i]);
np = make_port(this, PW_DIRECTION_OUTPUT, output_port_ids[i]);
ports = np->link.next;
i++;
} else if (p) {
ports = ports->next;
pw_log_debug("node %p: output port removed %d", this, p->port_id);
pw_port_destroy(p);
} else {
pw_log_debug("node %p: no more output ports", this);
break;
}
}
}
static int node_impl_get_props(struct pw_node *node, struct spa_props **props)
{
struct impl *impl = node->user_data;
return spa_node_get_props(impl->node, props);
}
static int node_impl_set_props(struct pw_node *node, const struct spa_props *props)
{
struct impl *impl = node->user_data;
return spa_node_set_props(impl->node, props);
}
static int node_impl_send_command(struct pw_node *node, struct spa_command *command)
{
struct impl *impl = node->user_data;
return spa_node_send_command(impl->node, command);
}
static struct pw_port*
node_impl_add_port(struct pw_node *node,
enum pw_direction direction,
uint32_t port_id)
{
struct impl *impl = node->user_data;
int res;
if ((res = spa_node_add_port(impl->node, direction, port_id)) < 0) {
pw_log_error("node %p: could not add port %d %d", node, port_id, res);
return NULL;
}
return make_port(node, direction, port_id);
}
static const struct pw_node_implementation node_impl = {
PW_VERSION_NODE_IMPLEMENTATION,
node_impl_get_props,
node_impl_set_props,
node_impl_send_command,
node_impl_add_port,
};
static void pw_spa_node_destroy(void *object)
{
struct pw_node *node = object;
struct impl *impl = node->user_data;
pw_log_debug("spa-node %p: destroy", node);
if (impl->handle) {
spa_handle_clear(impl->handle);
free(impl->handle);
}
free(impl->lib);
free(impl->factory_name);
if (impl->hnd)
dlclose(impl->hnd);
}
static void complete_init(struct impl *impl)
{
struct pw_node *this = impl->this;
update_port_ids(impl);
pw_node_export(this);
}
static void on_node_done(struct spa_node *node, int seq, int res, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = impl->this;
if (impl->async_init) {
complete_init(impl);
impl->async_init = false;
}
pw_log_debug("spa-node %p: async complete event %d %d", this, seq, res);
pw_signal_emit(&this->async_complete, this, seq, res);
}
static void on_node_event(struct spa_node *node, struct spa_event *event, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = impl->this;
pw_signal_emit(&this->event, this, event);
}
static void on_node_need_input(struct spa_node *node, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = impl->this;
spa_graph_scheduler_pull(this->rt.sched, &this->rt.node);
while (spa_graph_scheduler_iterate(this->rt.sched));
}
static void on_node_have_output(struct spa_node *node, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = impl->this;
spa_graph_scheduler_push(this->rt.sched, &this->rt.node);
while (spa_graph_scheduler_iterate(this->rt.sched));
}
static void
on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = impl->this;
struct spa_graph_port *p;
spa_list_for_each(p, &this->rt.node.ports[SPA_DIRECTION_INPUT], link) {
if (p->port_id != port_id)
continue;
if (p->peer && p->peer->node->methods->reuse_buffer) {
struct spa_graph_node *n = p->peer->node;
n->methods->reuse_buffer(p->peer, buffer_id, n->user_data);
}
break;
}
}
static const struct spa_node_callbacks node_callbacks = {
SPA_VERSION_NODE_CALLBACKS,
&on_node_done,
&on_node_event,
&on_node_need_input,
&on_node_have_output,
&on_node_reuse_buffer,
};
struct pw_node *
pw_spa_node_new(struct pw_core *core,
struct pw_resource *owner,
const char *name,
bool async,
struct spa_node *node,
struct spa_clock *clock,
struct pw_properties *properties)
{
struct pw_node *this;
struct impl *impl;
if (node->info) {
uint32_t i;
if (properties == NULL)
properties = pw_properties_new(NULL, NULL);
if (properties)
return NULL;
for (i = 0; i < node->info->n_items; i++)
pw_properties_set(properties,
node->info->items[i].key,
node->info->items[i].value);
}
this = pw_node_new(core, owner, name, properties, sizeof(struct impl));
if (this == NULL)
return NULL;
this->destroy = pw_spa_node_destroy;
this->implementation = &node_impl;
this->rt.methods = spa_graph_scheduler_default;
this->rt.node.user_data = node;
this->clock = clock;
impl = this->user_data;
impl->this = this;
impl->node = node;
impl->async_init = async;
if (spa_node_set_callbacks(impl->node, &node_callbacks, impl) < 0)
pw_log_warn("spa-node %p: error setting callback", this);
if (!async) {
complete_init(impl);
}
return this;
}
static int
setup_props(struct pw_core *core, struct spa_node *spa_node, struct pw_properties *pw_props)
{
int res;
struct spa_props *props;
void *state = NULL;
const char *key;
if ((res = spa_node_get_props(spa_node, &props)) != SPA_RESULT_OK) {
pw_log_debug("spa_node_get_props failed: %d", res);
return SPA_RESULT_ERROR;
}
while ((key = pw_properties_iterate(pw_props, &state))) {
struct spa_pod_prop *prop;
uint32_t id;
if (!spa_type_is_a(key, SPA_TYPE_PROPS_BASE))
continue;
id = spa_type_map_get_id(core->type.map, key);
if (id == SPA_ID_INVALID)
continue;
if ((prop = spa_pod_object_find_prop(&props->object, id))) {
const char *value = pw_properties_get(pw_props, key);
pw_log_info("configure prop %s", key);
switch(prop->body.value.type) {
case SPA_POD_TYPE_ID:
SPA_POD_VALUE(struct spa_pod_id, &prop->body.value) =
spa_type_map_get_id(core->type.map, value);
break;
case SPA_POD_TYPE_INT:
SPA_POD_VALUE(struct spa_pod_int, &prop->body.value) = atoi(value);
break;
case SPA_POD_TYPE_LONG:
SPA_POD_VALUE(struct spa_pod_long, &prop->body.value) = atoi(value);
break;
case SPA_POD_TYPE_FLOAT:
SPA_POD_VALUE(struct spa_pod_float, &prop->body.value) = atof(value);
break;
case SPA_POD_TYPE_DOUBLE:
SPA_POD_VALUE(struct spa_pod_double, &prop->body.value) = atof(value);
break;
case SPA_POD_TYPE_STRING:
break;
default:
break;
}
}
}
if ((res = spa_node_set_props(spa_node, props)) != SPA_RESULT_OK) {
pw_log_debug("spa_node_set_props failed: %d", res);
return SPA_RESULT_ERROR;
}
return SPA_RESULT_OK;
}
struct pw_node *pw_spa_node_load(struct pw_core *core,
struct pw_resource *owner,
const char *lib,
const char *factory_name,
const char *name,
struct pw_properties *properties)
{
struct pw_node *this;
struct impl *impl;
struct spa_node *spa_node;
struct spa_clock *spa_clock;
@ -51,6 +503,11 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core,
const struct spa_handle_factory *factory;
void *iface;
char *filename;
const char *dir;
bool async;
if ((dir = getenv("SPA_PLUGIN_DIR")) == NULL)
dir = PLUGINDIR;
asprintf(&filename, "%s/%s.so", dir, lib);
@ -79,6 +536,8 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core,
pw_log_error("can't make factory instance: %d", res);
goto init_failed;
}
async = SPA_RESULT_IS_ASYNC(res);
if ((res = spa_handle_get_interface(handle, core->type.spa_node, &iface)) < 0) {
pw_log_error("can't get node interface %d", res);
goto interface_failed;
@ -90,21 +549,17 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core,
}
spa_clock = iface;
impl = calloc(1, sizeof(struct impl));
impl->core = core;
impl->hnd = hnd;
this = &impl->this;
if (setup_func != NULL) {
if (setup_func(core, spa_node, properties) != SPA_RESULT_OK) {
if (properties != NULL) {
if (setup_props(core, spa_node, properties) != SPA_RESULT_OK) {
pw_log_debug("Unrecognized properties");
}
}
this->node = pw_node_new(core, NULL, name, false, spa_node, spa_clock, properties);
this->lib = filename;
this->factory_name = strdup(factory_name);
this->handle = handle;
this = pw_spa_node_new(core, owner, name, async, spa_node, spa_clock, properties);
impl->hnd = hnd;
impl->handle = handle;
impl->lib = filename;
impl->factory_name = strdup(factory_name);
return this;
@ -119,20 +574,3 @@ struct pw_spa_node *pw_spa_node_load(struct pw_core *core,
free(filename);
return NULL;
}
void pw_spa_node_destroy(struct pw_spa_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
pw_log_debug("spa-node %p: destroy", impl);
pw_signal_emit(&node->destroy_signal, node);
pw_node_destroy(node->node);
spa_handle_clear(node->handle);
free(node->handle);
free(node->lib);
free(node->factory_name);
dlclose(impl->hnd);
free(impl);
}

View file

@ -27,27 +27,22 @@
extern "C" {
#endif
struct pw_spa_node {
struct pw_node *node;
struct pw_node *
pw_spa_node_new(struct pw_core *core,
struct pw_resource *owner, /**< optional owner */
const char *name,
bool async,
struct spa_node *node,
struct spa_clock *clock,
struct pw_properties *properties);
char *lib;
char *factory_name;
struct spa_handle *handle;
PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_spa_node *node));
};
typedef int (*setup_node_t) (struct pw_core *core,
struct spa_node *spa_node,
struct pw_properties *pw_props);
struct pw_spa_node *
struct pw_node *
pw_spa_node_load(struct pw_core *core,
const char *dir,
struct pw_resource *owner, /**< optional owner */
const char *lib,
const char *factory_name,
const char *name,
struct pw_properties *properties,
setup_node_t setup_func);
struct pw_properties *properties);
#ifdef __cplusplus
}

View file

@ -146,15 +146,16 @@ void pw_client_destroy(struct pw_client *client)
pw_map_for_each(&client->objects, destroy_resource, client);
pw_log_debug("client %p: free", impl);
if (client->destroy)
client->destroy(client);
pw_map_clear(&client->objects);
pw_map_clear(&client->types);
if (client->properties)
pw_properties_free(client->properties);
if (client->destroy)
client->destroy(client);
free(impl);
}

View file

@ -621,19 +621,13 @@ struct spa_format *pw_core_find_format(struct pw_core *core,
if (in_state == PW_PORT_STATE_CONFIGURE && out_state > PW_PORT_STATE_CONFIGURE) {
/* only input needs format */
if ((res = spa_node_port_get_format(output->node->node,
SPA_DIRECTION_OUTPUT,
output->port_id,
(const struct spa_format **) &format)) < 0) {
if ((res = pw_port_get_format(output, (const struct spa_format **) &format)) < 0) {
asprintf(error, "error get output format: %d", res);
goto error;
}
} else if (out_state == PW_PORT_STATE_CONFIGURE && in_state > PW_PORT_STATE_CONFIGURE) {
/* only output needs format */
if ((res = spa_node_port_get_format(input->node->node,
SPA_DIRECTION_INPUT,
input->port_id,
(const struct spa_format **) &format)) < 0) {
if ((res = pw_port_get_format(input, (const struct spa_format **) &format)) < 0) {
asprintf(error, "error get input format: %d", res);
goto error;
}
@ -641,9 +635,7 @@ struct spa_format *pw_core_find_format(struct pw_core *core,
again:
/* both ports need a format */
pw_log_debug("core %p: finding best format", core);
if ((res = spa_node_port_enum_formats(input->node->node,
SPA_DIRECTION_INPUT,
input->port_id, &filter, NULL, iidx)) < 0) {
if ((res = pw_port_enum_formats(input, &filter, NULL, iidx)) < 0) {
if (res == SPA_RESULT_ENUM_END && iidx != 0) {
asprintf(error, "error input enum formats: %d", res);
goto error;
@ -653,10 +645,7 @@ struct spa_format *pw_core_find_format(struct pw_core *core,
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
spa_debug_format(filter);
if ((res = spa_node_port_enum_formats(output->node->node,
SPA_DIRECTION_OUTPUT,
output->port_id,
&format, filter, oidx)) < 0) {
if ((res = pw_port_enum_formats(output, &format, filter, oidx)) < 0) {
if (res == SPA_RESULT_ENUM_END) {
oidx = 0;
iidx++;

View file

@ -124,10 +124,8 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
format = spa_format_copy(format);
if (out_state > PW_PORT_STATE_CONFIGURE && this->output->node->info.state == PW_NODE_STATE_IDLE) {
if ((res = spa_node_port_get_format(this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port_id,
(const struct spa_format **) &current)) < 0) {
if ((res = pw_port_get_format(this->output,
(const struct spa_format **) &current)) < 0) {
asprintf(&error, "error get output format: %d", res);
goto error;
}
@ -140,10 +138,8 @@ static int do_negotiate(struct pw_link *this, uint32_t in_state, uint32_t out_st
pw_node_update_state(this->output->node, PW_NODE_STATE_RUNNING, NULL);
}
if (in_state > PW_PORT_STATE_CONFIGURE && this->input->node->info.state == PW_NODE_STATE_IDLE) {
if ((res = spa_node_port_get_format(this->input->node->node,
SPA_DIRECTION_INPUT,
this->input->port_id,
(const struct spa_format **) &current)) < 0) {
if ((res = pw_port_get_format(this->input,
(const struct spa_format **) &current)) < 0) {
asprintf(&error, "error get input format: %d", res);
goto error;
}
@ -360,17 +356,17 @@ static struct spa_buffer **alloc_buffers(struct pw_link *this,
}
static int
spa_node_param_filter(struct pw_link *this,
struct spa_node *in_node,
uint32_t in_port,
struct spa_node *out_node, uint32_t out_port, struct spa_pod_builder *result)
param_filter(struct pw_link *this,
struct pw_port *in_port,
struct pw_port *out_port,
struct spa_pod_builder *result)
{
int res;
struct spa_param *oparam, *iparam;
int iidx, oidx, num = 0;
for (iidx = 0;; iidx++) {
if (spa_node_port_enum_params(in_node, SPA_DIRECTION_INPUT, in_port, iidx, &iparam)
if (pw_port_enum_params(in_port, iidx, &iparam)
< 0)
break;
@ -381,8 +377,7 @@ spa_node_param_filter(struct pw_link *this,
struct spa_pod_frame f;
uint32_t offset;
if (spa_node_port_enum_params(out_node, SPA_DIRECTION_OUTPUT,
out_port, oidx, &oparam) < 0)
if (pw_port_enum_params(out_port, oidx, &oparam) < 0)
break;
if (pw_log_level_enabled(SPA_LOG_LEVEL_DEBUG))
@ -397,8 +392,7 @@ spa_node_param_filter(struct pw_link *this,
SPA_POD_CONTENTS(struct spa_param, iparam),
SPA_POD_CONTENTS_SIZE(struct spa_param, iparam),
SPA_POD_CONTENTS(struct spa_param, oparam),
SPA_POD_CONTENTS_SIZE(struct spa_param,
oparam))) < 0) {
SPA_POD_CONTENTS_SIZE(struct spa_param, oparam))) < 0) {
result->offset = offset;
result->stack = NULL;
continue;
@ -426,14 +420,11 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_log_debug("link %p: doing alloc buffers %p %p", this, this->output->node,
this->input->node);
/* find out what's possible */
if ((res = spa_node_port_get_info(this->output->node->node,
SPA_DIRECTION_OUTPUT,
this->output->port_id, &oinfo)) < 0) {
if ((res = pw_port_get_info(this->output, &oinfo)) < 0) {
asprintf(&error, "error get output port info: %d", res);
goto error;
}
if ((res = spa_node_port_get_info(this->input->node->node,
SPA_DIRECTION_INPUT, this->input->port_id, &iinfo)) < 0) {
if ((res = pw_port_get_info(this->input, &iinfo)) < 0) {
asprintf(&error, "error get input port info: %d", res);
goto error;
}
@ -493,11 +484,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
uint32_t max_buffers;
size_t minsize = 1024, stride = 0;
n_params = spa_node_param_filter(this,
this->input->node->node,
this->input->port_id,
this->output->node->node,
this->output->port_id, &b);
n_params = param_filter(this, this->input, this->output, &b);
params = alloca(n_params * sizeof(struct spa_param *));
for (i = 0, offset = 0; i < n_params; i++) {
@ -775,8 +762,7 @@ do_remove_input(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
struct pw_port *port = ((struct pw_port **) data)[0];
spa_graph_port_remove(port->rt.graph, &this->rt.in_port);
spa_graph_port_remove(&this->rt.in_port);
return SPA_RESULT_OK;
}
@ -789,7 +775,7 @@ static void input_remove(struct pw_link *this, struct pw_port *port)
pw_signal_remove(&impl->input_async_complete);
pw_loop_invoke(port->node->data_loop->loop,
do_remove_input, 1, sizeof(struct pw_port*), &port, true, this);
do_remove_input, 1, 0, NULL, true, this);
clear_port_buffers(this, this->input);
}
@ -799,8 +785,7 @@ do_remove_output(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
struct pw_port *port = ((struct pw_port **) data)[0];
spa_graph_port_remove(port->rt.graph, &this->rt.out_port);
spa_graph_port_remove(&this->rt.out_port);
return SPA_RESULT_OK;
}
@ -813,7 +798,7 @@ static void output_remove(struct pw_link *this, struct pw_port *port)
pw_signal_remove(&impl->output_async_complete);
pw_loop_invoke(port->node->data_loop->loop,
do_remove_output, 1, sizeof(struct pw_port*), &port, true, this);
do_remove_output, 1, 0, NULL, true, this);
clear_port_buffers(this, this->output);
}
@ -866,7 +851,7 @@ do_activate_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
spa_graph_port_link(this->output->node->rt.sched->graph, &this->rt.out_port, &this->rt.in_port);
spa_graph_port_link(&this->rt.out_port, &this->rt.in_port);
return SPA_RESULT_OK;
}
@ -897,7 +882,7 @@ do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_link *this = user_data;
spa_graph_port_unlink(this->output->node->rt.sched->graph, &this->rt.out_port);
spa_graph_port_unlink(&this->rt.out_port);
return SPA_RESULT_OK;
}
@ -990,21 +975,9 @@ do_add_link(struct spa_loop *loop,
struct pw_port *port = ((struct pw_port **) data)[0];
if (port->direction == PW_DIRECTION_OUTPUT) {
spa_graph_port_add(port->rt.graph,
&port->rt.mix_node,
&this->rt.out_port,
PW_DIRECTION_OUTPUT,
this->rt.out_port.port_id,
0,
&this->io);
spa_graph_port_add(&port->rt.mix_node, &this->rt.out_port);
} else {
spa_graph_port_add(port->rt.graph,
&port->rt.mix_node,
&this->rt.in_port,
PW_DIRECTION_INPUT,
this->rt.in_port.port_id,
0,
&this->io);
spa_graph_port_add(&port->rt.mix_node, &this->rt.in_port);
}
return SPA_RESULT_OK;
@ -1089,6 +1062,17 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->info.input_port_id = input ? input->port_id : -1;
this->info.format = NULL;
spa_graph_port_init(&this->rt.out_port,
PW_DIRECTION_OUTPUT,
this->rt.out_port.port_id,
0,
&this->io);
spa_graph_port_init(&this->rt.in_port,
PW_DIRECTION_INPUT,
this->rt.in_port.port_id,
0,
&this->io);
pw_loop_invoke(output_node->data_loop->loop,
do_add_link,
SPA_ID_INVALID, sizeof(struct pw_port *), &output, false, this);

View file

@ -10,6 +10,7 @@ pipewirecore_headers = [
'node-factory.h',
'port.h',
'resource.h',
# 'spa-node.h',
'work-queue.h',
]
@ -25,6 +26,7 @@ pipewirecore_sources = [
'node-factory.c',
'port.c',
'resource.c',
# 'spa-node.c',
'work-queue.c',
]

View file

@ -21,8 +21,6 @@
#include <stdlib.h>
#include <errno.h>
#include <spa/graph-scheduler3.h>
#include "pipewire/client/pipewire.h"
#include "pipewire/client/interfaces.h"
@ -36,156 +34,37 @@ struct impl {
struct pw_node this;
struct pw_work_queue *work;
struct pw_listener on_async_complete;
struct pw_listener on_event;
bool async_init;
bool exported;
};
/** \endcond */
static void init_complete(struct pw_node *this);
static void update_port_ids(struct pw_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
uint32_t *input_port_ids, *output_port_ids;
uint32_t n_input_ports, n_output_ports, max_input_ports, max_output_ports;
uint32_t i;
struct spa_list *ports;
int res;
if (node->node == NULL)
return;
spa_node_get_n_ports(node->node,
&n_input_ports, &max_input_ports, &n_output_ports, &max_output_ports);
node->info.n_input_ports = n_input_ports;
node->info.max_input_ports = max_input_ports;
node->info.n_output_ports = n_output_ports;
node->info.max_output_ports = max_output_ports;
node->input_port_map = calloc(max_input_ports, sizeof(struct pw_port *));
node->output_port_map = calloc(max_output_ports, sizeof(struct pw_port *));
input_port_ids = alloca(sizeof(uint32_t) * n_input_ports);
output_port_ids = alloca(sizeof(uint32_t) * n_output_ports);
spa_node_get_port_ids(node->node,
max_input_ports, input_port_ids, max_output_ports, output_port_ids);
pw_log_debug("node %p: update_port ids %u/%u, %u/%u", node,
n_input_ports, max_input_ports, n_output_ports, max_output_ports);
i = 0;
ports = &node->input_ports;
while (true) {
struct pw_port *p = (ports == &node->input_ports) ? NULL :
SPA_CONTAINER_OF(ports, struct pw_port, link);
if (p && i < n_input_ports && p->port_id == input_port_ids[i]) {
node->input_port_map[p->port_id] = p;
pw_log_debug("node %p: exiting input port %d", node, input_port_ids[i]);
i++;
ports = ports->next;
} else if ((p && i < n_input_ports && input_port_ids[i] < p->port_id)
|| i < n_input_ports) {
struct pw_port *np;
pw_log_debug("node %p: input port added %d", node, input_port_ids[i]);
np = pw_port_new(node, PW_DIRECTION_INPUT, input_port_ids[i]);
if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_INPUT,
np->port_id, &np->io)) < 0)
pw_log_warn("node %p: can't set input IO %d", node, res);
spa_list_insert(ports, &np->link);
ports = np->link.next;
node->input_port_map[np->port_id] = np;
if (!impl->async_init)
pw_signal_emit(&node->port_added, node, np);
i++;
} else if (p) {
node->input_port_map[p->port_id] = NULL;
ports = ports->next;
if (!impl->async_init)
pw_signal_emit(&node->port_removed, node, p);
pw_log_debug("node %p: input port removed %d", node, p->port_id);
pw_port_destroy(p);
} else {
pw_log_debug("node %p: no more input ports", node);
break;
}
}
i = 0;
ports = &node->output_ports;
while (true) {
struct pw_port *p = (ports == &node->output_ports) ? NULL :
SPA_CONTAINER_OF(ports, struct pw_port, link);
if (p && i < n_output_ports && p->port_id == output_port_ids[i]) {
pw_log_debug("node %p: exiting output port %d", node, output_port_ids[i]);
i++;
ports = ports->next;
node->output_port_map[p->port_id] = p;
} else if ((p && i < n_output_ports && output_port_ids[i] < p->port_id)
|| i < n_output_ports) {
struct pw_port *np;
pw_log_debug("node %p: output port added %d", node, output_port_ids[i]);
np = pw_port_new(node, PW_DIRECTION_OUTPUT, output_port_ids[i]);
if ((res = spa_node_port_set_io(node->node, SPA_DIRECTION_OUTPUT,
np->port_id, &np->io)) < 0)
pw_log_warn("node %p: can't set output IO %d", node, res);
spa_list_insert(ports, &np->link);
ports = np->link.next;
node->output_port_map[np->port_id] = np;
if (!impl->async_init)
pw_signal_emit(&node->port_added, node, np);
i++;
} else if (p) {
node->output_port_map[p->port_id] = NULL;
ports = ports->next;
if (!impl->async_init)
pw_signal_emit(&node->port_removed, node, p);
pw_log_debug("node %p: output port removed %d", node, p->port_id);
pw_port_destroy(p);
} else {
pw_log_debug("node %p: no more output ports", node);
break;
}
}
}
static int pause_node(struct pw_node *this)
{
int res;
int res = SPA_RESULT_OK;
if (this->info.state <= PW_NODE_STATE_IDLE)
return SPA_RESULT_OK;
pw_log_debug("node %p: pause node", this);
if ((res = spa_node_send_command(this->node,
&SPA_COMMAND_INIT(this->core->type.command_node.Pause))) <
0)
pw_log_debug("got error %d", res);
if ((res = this->implementation->send_command(this,
&SPA_COMMAND_INIT(this->core->type.command_node.Pause))) < 0)
pw_log_debug("node %p: send command error %d", this, res);
return res;
}
static int start_node(struct pw_node *this)
{
int res;
int res = SPA_RESULT_OK;
pw_log_debug("node %p: start node", this);
if ((res = spa_node_send_command(this->node,
&SPA_COMMAND_INIT(this->core->type.command_node.Start))) <
0)
pw_log_debug("got error %d", res);
if ((res = this->implementation->send_command(this,
&SPA_COMMAND_INIT(this->core->type.command_node.Start))) < 0)
pw_log_debug("node %p: send command error %d", this, res);
return res;
}
@ -211,23 +90,33 @@ static int suspend_node(struct pw_node *this)
return res;
}
static void on_async_complete(struct pw_listener *listener,
struct pw_node *node, uint32_t seq, int res)
{
struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, on_async_complete);
struct pw_node *this = &impl->this;
pw_log_debug("node %p: async complete event %d %d", this, seq, res);
pw_work_queue_complete(impl->work, this, seq, res);
}
static void send_clock_update(struct pw_node *this)
{
int res;
struct spa_command_node_clock_update cu =
SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(this->core->type.command_node.ClockUpdate,
SPA_COMMAND_NODE_CLOCK_UPDATE_TIME |
SPA_COMMAND_NODE_CLOCK_UPDATE_SCALE |
SPA_COMMAND_NODE_CLOCK_UPDATE_STATE |
SPA_COMMAND_NODE_CLOCK_UPDATE_LATENCY, /* change_mask */
1, /* rate */
0, /* ticks */
0, /* monotonic_time */
0, /* offset */
(1 << 16) | 1, /* scale */
SPA_CLOCK_STATE_RUNNING, /* state */
0, /* flags */
0); /* latency */
SPA_COMMAND_NODE_CLOCK_UPDATE_INIT(this->core->type.command_node.ClockUpdate,
SPA_COMMAND_NODE_CLOCK_UPDATE_TIME |
SPA_COMMAND_NODE_CLOCK_UPDATE_SCALE |
SPA_COMMAND_NODE_CLOCK_UPDATE_STATE |
SPA_COMMAND_NODE_CLOCK_UPDATE_LATENCY, /* change_mask */
1, /* rate */
0, /* ticks */
0, /* monotonic_time */
0, /* offset */
(1 << 16) | 1, /* scale */
SPA_CLOCK_STATE_RUNNING, /* state */
0, /* flags */
0); /* latency */
if (this->clock && this->live) {
cu.body.flags.value = SPA_COMMAND_NODE_CLOCK_UPDATE_FLAG_LIVE;
@ -236,73 +125,20 @@ static void send_clock_update(struct pw_node *this)
&cu.body.ticks.value,
&cu.body.monotonic_time.value);
}
if ((res = spa_node_send_command(this->node, (struct spa_command *) &cu)) < 0)
pw_log_debug("got error %d", res);
if ((res = this->implementation->send_command(this, (struct spa_command *) &cu)) < 0)
pw_log_debug("node %p: send clock update error %d", this, res);
}
static void on_node_done(struct spa_node *node, int seq, int res, void *user_data)
static void on_event(struct pw_listener *listener,
struct pw_node *node, struct spa_event *event)
{
struct impl *impl = user_data;
struct impl *impl = SPA_CONTAINER_OF(listener, struct impl, on_event);
struct pw_node *this = &impl->this;
pw_log_debug("node %p: async complete event %d %d", this, seq, res);
pw_work_queue_complete(impl->work, this, seq, res);
pw_signal_emit(&this->async_complete, this, seq, res);
}
static void on_node_event(struct spa_node *node, struct spa_event *event, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) {
send_clock_update(this);
}
}
static void on_node_need_input(struct spa_node *node, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
spa_graph_scheduler_pull(this->rt.sched, &this->rt.node);
while (spa_graph_scheduler_iterate(this->rt.sched));
}
static void on_node_have_output(struct spa_node *node, void *user_data)
{
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
spa_graph_scheduler_push(this->rt.sched, &this->rt.node);
while (spa_graph_scheduler_iterate(this->rt.sched));
}
static void
on_node_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id, void *user_data)
{
#if 0
struct impl *impl = user_data;
struct pw_node *this = &impl->this;
struct pw_port *inport;
pw_log_trace("node %p: reuse buffer %u", this, buffer_id);
spa_list_for_each(inport, &this->input_ports, link) {
struct pw_link *link;
struct pw_port *outport;
spa_list_for_each(link, &inport->rt.links, rt.input_link) {
if (link->rt.input == NULL || link->rt.output == NULL)
continue;
outport = link->rt.output;
outport->io.buffer_id = buffer_id;
}
}
#endif
pw_log_trace("node %p: event %d", this, SPA_EVENT_TYPE(event));
if (SPA_EVENT_TYPE(event) == this->core->type.event_node.RequestClockUpdate) {
send_clock_update(this);
}
}
static void node_unbind_func(void *data)
@ -316,32 +152,38 @@ update_info(struct pw_node *this)
{
this->info.id = this->global->id;
this->info.input_formats = NULL;
for (this->info.n_input_formats = 0;; this->info.n_input_formats++) {
struct spa_format *fmt;
if (spa_node_port_enum_formats(this->node,
SPA_DIRECTION_INPUT,
0, &fmt, NULL, this->info.n_input_formats) < 0)
break;
if (!spa_list_is_empty(&this->input_ports)) {
struct pw_port *port = spa_list_first(&this->input_ports, struct pw_port, link);
this->info.input_formats =
realloc(this->info.input_formats,
sizeof(struct spa_format *) * (this->info.n_input_formats + 1));
this->info.input_formats[this->info.n_input_formats] = spa_format_copy(fmt);
for (this->info.n_input_formats = 0;; this->info.n_input_formats++) {
struct spa_format *fmt;
if (pw_port_enum_formats(port, &fmt, NULL, this->info.n_input_formats) < 0)
break;
this->info.input_formats =
realloc(this->info.input_formats,
sizeof(struct spa_format *) * (this->info.n_input_formats + 1));
this->info.input_formats[this->info.n_input_formats] = spa_format_copy(fmt);
}
}
this->info.output_formats = NULL;
for (this->info.n_output_formats = 0;; this->info.n_output_formats++) {
struct spa_format *fmt;
if (!spa_list_is_empty(&this->output_ports)) {
struct pw_port *port = spa_list_first(&this->output_ports, struct pw_port, link);
if (spa_node_port_enum_formats(this->node,
SPA_DIRECTION_OUTPUT,
0, &fmt, NULL, this->info.n_output_formats) < 0)
break;
for (this->info.n_output_formats = 0;; this->info.n_output_formats++) {
struct spa_format *fmt;
this->info.output_formats =
realloc(this->info.output_formats,
sizeof(struct spa_format *) * (this->info.n_output_formats + 1));
this->info.output_formats[this->info.n_output_formats] = spa_format_copy(fmt);
if (pw_port_enum_formats(port, &fmt, NULL, this->info.n_output_formats) < 0)
break;
this->info.output_formats =
realloc(this->info.output_formats,
sizeof(struct spa_format *) * (this->info.n_output_formats + 1));
this->info.output_formats[this->info.n_output_formats] = spa_format_copy(fmt);
}
}
this->info.props = this->properties ? &this->properties->dict : NULL;
}
@ -396,18 +238,22 @@ node_bind_func(struct pw_global *global, struct pw_client *client, uint32_t vers
return SPA_RESULT_NO_MEMORY;
}
static void init_complete(struct pw_node *this)
static int
do_node_add(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_node *this = user_data;
spa_graph_node_add(this->rt.sched->graph, &this->rt.node);
return SPA_RESULT_OK;
}
void pw_node_export(struct pw_node *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
spa_graph_node_add(this->rt.sched->graph,
&this->rt.node,
spa_graph_scheduler_default,
this->node);
update_port_ids(this);
pw_log_debug("node %p: init completed", this);
impl->async_init = false;
pw_log_debug("node %p: export", this);
spa_list_insert(this->core->node_list.prev, &this->link);
@ -415,33 +261,27 @@ static void init_complete(struct pw_node *this)
this->owner,
this->core->type.node, 0, this, node_bind_func, &this->global);
pw_loop_invoke(this->data_loop->loop, do_node_add, 1, 0, NULL, false, this);
update_info(this);
impl->exported = true;
pw_signal_emit(&this->initialized, this);
pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL);
}
static const struct spa_node_callbacks node_callbacks = {
SPA_VERSION_NODE_CALLBACKS,
&on_node_done,
&on_node_event,
&on_node_need_input,
&on_node_have_output,
&on_node_reuse_buffer,
};
struct pw_node *pw_node_new(struct pw_core *core,
struct pw_resource *owner,
const char *name,
bool async,
struct spa_node *node,
struct spa_clock *clock, struct pw_properties *properties)
struct pw_properties *properties,
size_t user_data_size)
{
struct impl *impl;
struct pw_node *this;
impl = calloc(1, sizeof(struct impl));
impl = calloc(1, sizeof(struct impl) + user_data_size);
if (impl == NULL)
return NULL;
@ -450,66 +290,45 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->owner = owner;
pw_log_debug("node %p: new, owner %p", this, owner);
if (user_data_size > 0)
this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void);
impl->work = pw_work_queue_new(this->core->main_loop->loop);
this->info.name = strdup(name);
this->properties = properties;
this->node = node;
this->clock = clock;
this->data_loop = core->data_loop;
this->rt.sched = &core->rt.sched;
spa_list_init(&this->resource_list);
if (spa_node_set_callbacks(this->node, &node_callbacks, impl) < 0)
pw_log_warn("node %p: error setting callback", this);
pw_signal_init(&this->destroy_signal);
pw_signal_init(&this->port_added);
pw_signal_init(&this->port_removed);
pw_signal_init(&this->state_request);
pw_signal_init(&this->state_changed);
pw_signal_init(&this->initialized);
pw_signal_init(&this->port_added);
pw_signal_init(&this->port_removed);
pw_signal_init(&this->destroy_signal);
pw_signal_init(&this->free_signal);
pw_signal_init(&this->async_complete);
pw_signal_init(&this->initialized);
pw_signal_init(&this->event);
pw_signal_add(&this->async_complete, &impl->on_async_complete, on_async_complete);
pw_signal_add(&this->event, &impl->on_event, on_event);
this->info.state = PW_NODE_STATE_CREATING;
spa_list_init(&this->input_ports);
pw_map_init(&this->input_port_map, 64, 64);
spa_list_init(&this->output_ports);
pw_map_init(&this->output_port_map, 64, 64);
if (this->node->info) {
uint32_t i;
if (this->properties == NULL)
this->properties = pw_properties_new(NULL, NULL);
if (this->properties)
goto no_mem;
for (i = 0; i < this->node->info->n_items; i++)
pw_properties_set(this->properties,
this->node->info->items[i].key,
this->node->info->items[i].value);
}
impl->async_init = async;
if (async) {
pw_work_queue_add(impl->work,
this,
SPA_RESULT_RETURN_ASYNC(0), (pw_work_func_t) init_complete, NULL);
} else {
init_complete(this);
}
spa_graph_node_init(&this->rt.node,
&this->rt.methods,
this);
return this;
no_mem:
free((char *)this->info.name);
free(impl);
return NULL;
}
static int
@ -520,7 +339,7 @@ do_node_remove(struct spa_loop *loop,
pause_node(this);
spa_graph_node_remove(this->rt.sched->graph, &this->rt.node);
spa_graph_node_remove(&this->rt.node);
return SPA_RESULT_OK;
}
@ -542,9 +361,9 @@ void pw_node_destroy(struct pw_node *node)
pw_log_debug("node %p: destroy", impl);
pw_signal_emit(&node->destroy_signal, node);
if (!impl->async_init) {
pw_loop_invoke(node->data_loop->loop, do_node_remove, 1, 0, NULL, true, node);
pw_loop_invoke(node->data_loop->loop, do_node_remove, 1, 0, NULL, true, node);
if (impl->exported) {
spa_list_remove(&node->link);
pw_global_destroy(node->global);
node->global = NULL;
@ -564,22 +383,23 @@ void pw_node_destroy(struct pw_node *node)
pw_port_destroy(port);
}
pw_log_debug("node %p: free", node);
pw_signal_emit(&node->free_signal, node);
if (node->destroy)
node->destroy(node);
pw_work_queue_destroy(impl->work);
if (node->input_port_map)
free(node->input_port_map);
if (node->output_port_map)
free(node->output_port_map);
pw_map_clear(&node->input_port_map);
pw_map_clear(&node->output_port_map);
if (node->properties)
pw_properties_free(node->properties);
clear_info(node);
free(impl);
clear_info(node);
free(impl);
}
/**
@ -594,68 +414,46 @@ void pw_node_destroy(struct pw_node *node)
*/
struct pw_port *pw_node_get_free_port(struct pw_node *node, enum pw_direction direction)
{
uint32_t *n_ports, max_ports;
uint64_t change_mask;
uint32_t n_ports, max_ports;
struct spa_list *ports;
struct pw_port *port = NULL, *p, **portmap;
int res;
int i;
struct pw_port *port = NULL, *p, *mixport = NULL;
struct pw_map *portmap;
if (direction == PW_DIRECTION_INPUT) {
max_ports = node->info.max_input_ports;
n_ports = &node->info.n_input_ports;
n_ports = node->info.n_input_ports;
ports = &node->input_ports;
portmap = node->input_port_map;
change_mask = 1 << 1;
portmap = &node->input_port_map;
} else {
max_ports = node->info.max_output_ports;
n_ports = &node->info.n_output_ports;
n_ports = node->info.n_output_ports;
ports = &node->output_ports;
portmap = node->output_port_map;
change_mask = 1 << 3;
portmap = &node->output_port_map;
}
pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, *n_ports);
pw_log_debug("node %p: direction %d max %u, n %u", node, direction, max_ports, n_ports);
/* first try to find an unlinked port */
spa_list_for_each(p, ports, link) {
if (spa_list_is_empty(&p->links))
return p;
/* for output we can reuse an existing port, for input only
* when there is a multiplex */
if (direction == PW_DIRECTION_OUTPUT || p->mix != NULL)
mixport = p;
}
/* no port, can we create one ? */
if (*n_ports < max_ports) {
for (i = 0; i < max_ports; i++) {
if (portmap[i] == NULL) {
pw_log_debug("node %p: creating port direction %d %u", node, direction, i);
if (max_ports == 0 || n_ports < max_ports) {
uint32_t port_id = pw_map_insert_new(portmap, NULL);
port = pw_port_new(node, direction, i);
if (port == NULL)
goto no_mem;
pw_log_debug("node %p: creating port direction %d %u", node, direction, port_id);
spa_list_insert(ports, &port->link);
if ((res = spa_node_add_port(node->node, direction, i)) < 0) {
pw_log_error("node %p: could not add port %d", node, i);
pw_port_destroy(port);
continue;
} else {
spa_node_port_set_io(node->node, direction, i, &port->io);
}
(*n_ports)++;
node->info.change_mask |= change_mask;
portmap[i] = port;
break;
}
}
port = node->implementation->add_port(node, direction, port_id);
if (port == NULL)
goto no_mem;
} else {
if (!spa_list_is_empty(ports)) {
port = spa_list_first(ports, struct pw_port, link);
/* for output we can reuse an existing port, for input only
* when there is a multiplex */
if (direction == PW_DIRECTION_INPUT && port->mix == NULL)
port = NULL;
}
port = mixport;
}
return port;

View file

@ -39,6 +39,26 @@ extern "C" {
#include <pipewire/server/client.h>
#include <pipewire/server/data-loop.h>
struct pw_node;
#define PW_VERSION_NODE_IMPLEMENTATION 0
struct pw_node_implementation {
uint32_t version;
int (*get_props) (struct pw_node *node, struct spa_props **props);
int (*set_props) (struct pw_node *node, const struct spa_props *props);
int (*send_command) (struct pw_node *node,
struct spa_command *command);
struct pw_port* (*add_port) (struct pw_node *node,
enum pw_direction direction,
uint32_t port_id);
};
/** \page page_node Node
*
* \section page_node_overview Overview
@ -71,23 +91,24 @@ struct pw_node {
struct pw_node *object,
enum pw_node_state old, enum pw_node_state state));
struct spa_handle *handle; /**< handle to SPA factory */
struct spa_node *node; /**< handle to SPA node */
bool live; /**< if the node is live */
struct spa_clock *clock; /**< handle to SPA clock if any */
struct spa_list resource_list; /**< list of resources for this node */
/** Implementation of core node functions */
const struct pw_node_implementation *implementation;
/** Emited when the node is initialized */
PW_SIGNAL(initialized, (struct pw_listener *listener, struct pw_node *object));
struct spa_list input_ports; /**< list of input ports */
struct pw_port **input_port_map; /**< map from port_id to port */
struct pw_map input_port_map; /**< map from port_id to port */
uint32_t n_used_input_links; /**< number of active input links */
uint32_t idle_used_input_links; /**< number of active input to be idle */
struct spa_list output_ports; /**< list of output ports */
struct pw_port **output_port_map; /**< map from port_id to port */
struct pw_map output_port_map; /**< map from port_id to port */
uint32_t n_used_output_links; /**< number of active output links */
uint32_t idle_used_output_links; /**< number of active output to be idle */
@ -107,13 +128,20 @@ struct pw_node {
PW_SIGNAL(async_complete, (struct pw_listener *listener,
struct pw_node *node, uint32_t seq, int res));
/** an event is emited */
PW_SIGNAL(event, (struct pw_listener *listener,
struct pw_node *node, struct spa_event *event));
struct pw_data_loop *data_loop; /**< the data loop for this node */
struct {
struct spa_graph_node_methods methods;
struct spa_graph_scheduler *sched;
struct spa_graph_node node;
} rt;
void *user_data; /**< extra user data */
pw_destroy_t destroy; /**< function to clean up the object */
};
/** Create a new node \memberof pw_node */
@ -121,10 +149,11 @@ struct pw_node *
pw_node_new(struct pw_core *core, /**< the core */
struct pw_resource *owner, /**< optional owner */
const char *name, /**< node name */
bool async, /**< if the node will initialize async */
struct spa_node *node, /**< the node */
struct spa_clock *clock, /**< optional clock */
struct pw_properties *properties /**< extra properties */);
struct pw_properties *properties, /**< extra properties */
size_t user_data_size /**< user data size */);
/** Complete initialization of the node */
void pw_node_export(struct pw_node *node);
/** Destroy a node */
void pw_node_destroy(struct pw_node *node);

View file

@ -28,70 +28,148 @@
/** \cond */
struct impl {
struct pw_port this;
uint32_t seq;
};
/** \endcond */
static int schedule_tee(struct spa_graph_node *node)
static void port_update_state(struct pw_port *port, enum pw_port_state state)
{
int res;
struct pw_port *this = node->user_data;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
if (node->action == SPA_GRAPH_ACTION_IN) {
if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) {
io->status = SPA_RESULT_NEED_BUFFER;
res = SPA_RESULT_NEED_BUFFER;
}
else {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
*p->io = *io;
io->status = SPA_RESULT_OK;
io->buffer_id = SPA_ID_INVALID;
res = SPA_RESULT_HAVE_BUFFER;
}
if (port->state != state) {
pw_log_debug("port %p: state %d -> %d", port, port->state, state);
port->state = state;
pw_signal_emit(&port->state_changed, port);
}
else if (node->action == SPA_GRAPH_ACTION_OUT) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
*io = *p->io;
io->status = SPA_RESULT_NEED_BUFFER;
res = SPA_RESULT_NEED_BUFFER;
}
else
res = SPA_RESULT_ERROR;
return res;
}
static int schedule_mix(struct spa_graph_node *node)
static int schedule_tee_input(struct spa_graph_node *node, void *user_data)
{
int res;
struct pw_port *this = node->user_data;
struct pw_port *this = user_data;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
if (node->action == SPA_GRAPH_ACTION_IN) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
*io = *p->io;
p->io->status = SPA_RESULT_OK;
p->io->buffer_id = SPA_ID_INVALID;
}
if (spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) {
io->status = SPA_RESULT_NEED_BUFFER;
res = SPA_RESULT_NEED_BUFFER;
}
else {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
*p->io = *io;
io->status = SPA_RESULT_OK;
io->buffer_id = SPA_ID_INVALID;
res = SPA_RESULT_HAVE_BUFFER;
}
else if (node->action == SPA_GRAPH_ACTION_OUT) {
io->status = SPA_RESULT_NEED_BUFFER;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link)
*p->io = *io;
io->buffer_id = SPA_ID_INVALID;
res = SPA_RESULT_NEED_BUFFER;
}
else
res = SPA_RESULT_ERROR;
return res;
}
static int schedule_tee_output(struct spa_graph_node *node, void *user_data)
{
struct pw_port *this = user_data;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
return res;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
*io = *p->io;
io->status = SPA_RESULT_NEED_BUFFER;
return SPA_RESULT_NEED_BUFFER;
}
static int schedule_tee_reuse_buffer(struct spa_graph_port *port, uint32_t buffer_id, void *user_data)
{
return SPA_RESULT_OK;
}
static const struct spa_graph_node_methods schedule_tee = {
SPA_VERSION_GRAPH_NODE_METHODS,
schedule_tee_input,
schedule_tee_output,
schedule_tee_reuse_buffer,
};
static int schedule_mix_input(struct spa_graph_node *node, void *user_data)
{
struct pw_port *this = user_data;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
*io = *p->io;
p->io->status = SPA_RESULT_OK;
p->io->buffer_id = SPA_ID_INVALID;
}
return SPA_RESULT_HAVE_BUFFER;
}
static int schedule_mix_output(struct spa_graph_node *node, void *user_data)
{
struct pw_port *this = user_data;
struct spa_graph_port *p;
struct spa_port_io *io = this->rt.mix_port.io;
io->status = SPA_RESULT_NEED_BUFFER;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link)
*p->io = *io;
io->buffer_id = SPA_ID_INVALID;
return SPA_RESULT_NEED_BUFFER;
}
static int schedule_mix_reuse_buffer(struct spa_graph_port *port, uint32_t buffer_id, void *user_data)
{
return SPA_RESULT_OK;
}
static const struct spa_graph_node_methods schedule_mix = {
SPA_VERSION_GRAPH_NODE_METHODS,
schedule_mix_input,
schedule_mix_output,
schedule_mix_reuse_buffer,
};
struct pw_port *pw_port_new(enum pw_direction direction,
uint32_t port_id,
size_t user_data_size)
{
struct impl *impl;
struct pw_port *this;
impl = calloc(1, sizeof(struct impl) + user_data_size);
if (impl == NULL)
return NULL;
this = &impl->this;
pw_log_debug("port %p: new", this);
this->direction = direction;
this->port_id = port_id;
this->state = PW_PORT_STATE_INIT;
this->io.status = SPA_RESULT_OK;
this->io.buffer_id = SPA_ID_INVALID;
if (user_data_size > 0)
this->user_data = SPA_MEMBER(impl, sizeof(struct impl), void);
spa_list_init(&this->links);
pw_signal_init(&this->state_changed);
pw_signal_init(&this->destroy_signal);
spa_graph_port_init(&this->rt.port,
this->direction,
this->port_id,
0,
&this->io);
spa_graph_node_init(&this->rt.mix_node,
this->direction == PW_DIRECTION_INPUT ?
&schedule_mix :
&schedule_tee,
this);
spa_graph_port_init(&this->rt.mix_port,
pw_direction_reverse(this->direction),
0,
0,
&this->io);
return this;
}
static int do_add_port(struct spa_loop *loop,
@ -99,57 +177,38 @@ static int do_add_port(struct spa_loop *loop,
{
struct pw_port *this = user_data;
spa_graph_port_add(this->rt.graph,
&this->node->rt.node,
&this->rt.port,
this->direction,
this->port_id,
0,
&this->io);
spa_graph_node_add(this->rt.graph,
&this->rt.mix_node,
this->direction == PW_DIRECTION_INPUT ? schedule_mix : schedule_tee,
this);
spa_graph_port_add(this->rt.graph,
&this->rt.mix_node,
&this->rt.mix_port,
pw_direction_reverse(this->direction),
0,
0,
&this->io);
spa_graph_port_link(this->rt.graph,
&this->rt.port,
&this->rt.mix_port);
spa_graph_port_add(&this->node->rt.node, &this->rt.port);
spa_graph_node_add(this->rt.graph, &this->rt.mix_node);
spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port);
spa_graph_port_link(&this->rt.port, &this->rt.mix_port);
return SPA_RESULT_OK;
}
struct pw_port *pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id)
void pw_port_add(struct pw_port *port, struct pw_node *node)
{
struct impl *impl;
struct pw_port *this;
port->node = node;
impl = calloc(1, sizeof(struct impl));
if (impl == NULL)
return NULL;
pw_log_debug("port %p: add to node %p", port, node);
if (port->direction == PW_DIRECTION_INPUT) {
spa_list_insert(&node->input_ports, &port->link);
pw_map_insert_at(&node->input_port_map, port->port_id, port);
node->info.n_input_ports++;
node->info.change_mask |= 1 << 1;
}
else {
spa_list_insert(&node->output_ports, &port->link);
pw_map_insert_at(&node->output_port_map, port->port_id, port);
node->info.n_output_ports++;
node->info.change_mask |= 1 << 3;
}
this = &impl->this;
this->node = node;
this->direction = direction;
this->port_id = port_id;
this->state = PW_PORT_STATE_CONFIGURE;
this->io.status = SPA_RESULT_OK;
this->io.buffer_id = SPA_ID_INVALID;
port->rt.graph = node->rt.sched->graph;
pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, port);
spa_list_init(&this->links);
port_update_state(port, PW_PORT_STATE_CONFIGURE);
pw_signal_init(&this->destroy_signal);
this->rt.graph = node->rt.sched->graph;
pw_loop_invoke(node->data_loop->loop, do_add_port, SPA_ID_INVALID, 0, NULL, false, this);
return this;
pw_signal_emit(&node->port_added, node, port);
}
static int do_remove_port(struct spa_loop *loop,
@ -158,61 +217,70 @@ static int do_remove_port(struct spa_loop *loop,
struct pw_port *this = user_data;
struct spa_graph_port *p;
spa_graph_port_unlink(this->rt.graph,
&this->rt.port);
spa_graph_port_remove(this->rt.graph,
&this->rt.port);
spa_graph_port_unlink(&this->rt.port);
spa_graph_port_remove(&this->rt.port);
spa_list_for_each(p, &this->rt.mix_node.ports[this->direction], link)
spa_graph_port_remove(this->rt.graph, p);
spa_graph_port_remove(p);
spa_graph_port_remove(this->rt.graph,
&this->rt.mix_port);
spa_graph_node_remove(this->rt.graph,
&this->rt.mix_node);
spa_graph_port_remove(&this->rt.mix_port);
spa_graph_node_remove(&this->rt.mix_node);
return SPA_RESULT_OK;
}
void pw_port_destroy(struct pw_port *port)
{
struct pw_node *node = port->node;
pw_log_debug("port %p: destroy", port);
pw_signal_emit(&port->destroy_signal, port);
pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port);
if (node) {
pw_loop_invoke(port->node->data_loop->loop, do_remove_port, SPA_ID_INVALID, 0, NULL, true, port);
spa_list_remove(&port->link);
if (port->direction == PW_DIRECTION_INPUT) {
pw_map_remove(&node->input_port_map, port->port_id);
node->info.n_input_ports--;
}
else {
pw_map_remove(&node->output_port_map, port->port_id);
node->info.n_output_ports--;
}
spa_list_remove(&port->link);
pw_signal_emit(&node->port_removed, node, port);
}
if (port->destroy)
port->destroy(port);
free(port);
}
static void port_update_state(struct pw_port *port, enum pw_port_state state)
{
if (port->state != state) {
pw_log_debug("port %p: state %d -> %d", port, port->state, state);
port->state = state;
}
}
static int
do_port_pause(struct spa_loop *loop,
bool async, uint32_t seq, size_t size, void *data, void *user_data)
bool async, uint32_t seq, size_t size, void *data, void *user_data)
{
struct pw_port *port = user_data;
struct pw_port *port = user_data;
return spa_node_port_send_command(port->node->node,
port->direction,
port->port_id,
&SPA_COMMAND_INIT(port->node->core->type.command_node.
Pause));
return port->implementation->send_command(port,
&SPA_COMMAND_INIT(port->node->core->type.command_node.Pause));
}
int pw_port_enum_formats(struct pw_port *port,
struct spa_format **format,
const struct spa_format *filter,
int32_t index)
{
return port->implementation->enum_formats(port, format, filter, index);
}
int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format)
{
int res;
res = spa_node_port_set_format(port->node->node, port->direction, port->port_id, flags, format);
res = port->implementation->set_format(port, flags, format);
pw_log_debug("port %p: set format %d", port, res);
if (!SPA_RESULT_IS_ASYNC(res)) {
@ -231,9 +299,28 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *
return res;
}
int pw_port_get_format(struct pw_port *port, const struct spa_format **format)
{
return port->implementation->get_format(port, format);
}
int pw_port_get_info(struct pw_port *port, const struct spa_port_info **info)
{
return port->implementation->get_info(port, info);
}
int pw_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param)
{
return port->implementation->enum_params(port, index, param);
}
int pw_port_set_param(struct pw_port *port, struct spa_param *param)
{
return port->implementation->set_param(port, param);
}
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers)
{
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
int res;
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
@ -243,13 +330,14 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
return SPA_RESULT_NO_FORMAT;
if (port->state > PW_PORT_STATE_PAUSED) {
res = pw_loop_invoke(port->node->data_loop->loop,
do_port_pause, impl->seq++, 0, NULL, true, port);
pw_loop_invoke(port->node->data_loop->loop,
do_port_pause, 0, 0, NULL, true, port);
port_update_state (port, PW_PORT_STATE_PAUSED);
}
pw_log_debug("port %p: use %d buffers", port, n_buffers);
res = spa_node_port_use_buffers(port->node->node, port->direction, port->port_id, buffers, n_buffers);
res = port->implementation->use_buffers(port, buffers, n_buffers);
port->buffers = buffers;
port->n_buffers = n_buffers;
if (port->allocated)
@ -268,21 +356,20 @@ int pw_port_alloc_buffers(struct pw_port *port,
struct spa_param **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers)
{
struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
int res;
if (port->state < PW_PORT_STATE_READY)
return SPA_RESULT_NO_FORMAT;
if (port->state > PW_PORT_STATE_PAUSED) {
res = pw_loop_invoke(port->node->data_loop->loop,
do_port_pause, impl->seq++, 0, NULL, true, port);
pw_loop_invoke(port->node->data_loop->loop,
do_port_pause, 0, 0, NULL, true, port);
port_update_state (port, PW_PORT_STATE_PAUSED);
}
pw_log_debug("port %p: alloc %d buffers", port, *n_buffers);
res = spa_node_port_alloc_buffers(port->node->node, port->direction, port->port_id,
params, n_params, buffers, n_buffers);
res = port->implementation->alloc_buffers(port, params, n_params, buffers, n_buffers);
port->buffers = buffers;
port->n_buffers = *n_buffers;
port->allocated = true;

View file

@ -44,6 +44,38 @@ enum pw_port_state {
PW_PORT_STATE_STREAMING = 4,
};
struct pw_port;
#define PW_VERSION_PORT_IMPLEMENTATION 0
struct pw_port_implementation {
uint32_t version;
int (*enum_formats) (struct pw_port *port,
struct spa_format **format,
const struct spa_format *filter,
int32_t index);
int (*set_format) (struct pw_port *port, uint32_t flags, struct spa_format *format);
int (*get_format) (struct pw_port *port, const struct spa_format **format);
int (*get_info) (struct pw_port *port, const struct spa_port_info **info);
int (*enum_params) (struct pw_port *port, uint32_t index, struct spa_param **param);
int (*set_param) (struct pw_port *port, struct spa_param *param);
int (*use_buffers) (struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers);
int (*alloc_buffers) (struct pw_port *port,
struct spa_param **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers);
int (*reuse_buffer) (struct pw_port *port, uint32_t buffer_id);
int (*send_command) (struct pw_port *port, struct spa_command *command);
};
/** \page page_port Port
*
* \section page_node_overview Overview
@ -60,9 +92,16 @@ struct pw_port {
PW_SIGNAL(destroy_signal, (struct pw_listener *listener, struct pw_port *));
struct pw_node *node; /**< owner node */
enum pw_direction direction; /**< port direction */
uint32_t port_id; /**< port id */
enum pw_port_state state; /**< state of the port */
/** Emited when the port state changes */
PW_SIGNAL(state_changed, (struct pw_listener *listener, struct pw_port *port));
const struct pw_port_implementation *implementation;
struct spa_port_io io; /**< io area of the port */
bool allocated; /**< if buffers are allocated */
@ -80,19 +119,45 @@ struct pw_port {
struct spa_graph_port mix_port;
struct spa_graph_node mix_node;
} rt; /**< data only accessed from the data thread */
void *user_data; /**< extra user data */
pw_destroy_t destroy; /**< function to clean up the object */
};
/** Create a new port \memberof pw_port
* \return a newly allocated port */
struct pw_port *
pw_port_new(struct pw_node *node, enum pw_direction direction, uint32_t port_id);
pw_port_new(enum pw_direction direction,
uint32_t port_id,
size_t user_data_size);
/** Add a port to a node \memberof pw_port */
void pw_port_add(struct pw_port *port, struct pw_node *node);
/** Destroy a port \memberof pw_port */
void pw_port_destroy(struct pw_port *port);
/** Get the current format on a port \memberof pw_port */
int pw_port_enum_formats(struct pw_port *port,
struct spa_format **format,
const struct spa_format *filter,
int32_t index);
/** Set a format on a port \memberof pw_port */
int pw_port_set_format(struct pw_port *port, uint32_t flags, struct spa_format *format);
/** Get the current format on a port \memberof pw_port */
int pw_port_get_format(struct pw_port *port, const struct spa_format **format);
/** Get the info on a port \memberof pw_port */
int pw_port_get_info(struct pw_port *port, const struct spa_port_info **info);
/** Enumerate the port parameters \memberof pw_port */
int pw_port_enum_params(struct pw_port *port, uint32_t index, struct spa_param **param);
/** Set a port parameter \memberof pw_port */
int pw_port_set_param(struct pw_port *port, struct spa_param *param);
/** Use buffers on a port \memberof pw_port */
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers);

View file

@ -42,21 +42,24 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
sched->node = NULL;
}
static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
static inline int spa_graph_scheduler_input(struct spa_graph_node *node, void *user_data)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = SPA_RESULT_ERROR;
return res;
return spa_node_process_input(n);
}
static inline int spa_graph_scheduler_output(struct spa_graph_node *node, void *user_data)
{
struct spa_node *n = node->user_data;
return spa_node_process_output(n);
}
static const struct spa_graph_node_methods spa_graph_scheduler_default = {
SPA_VERSION_GRAPH_NODE_METHODS,
spa_graph_scheduler_input,
spa_graph_scheduler_output,
};
static inline void spa_scheduler_port_check(struct spa_graph_scheduler *sched, struct spa_graph_port *port)
{
struct spa_graph_node *node = port->node;
@ -93,15 +96,21 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph_scheduler *sched
switch (n->action) {
case SPA_GRAPH_ACTION_IN:
case SPA_GRAPH_ACTION_OUT:
n->state = n->schedule(n);
debug("node %p scheduled action %d state %d\n", n, n->action, n->state);
if (n->action == SPA_GRAPH_ACTION_IN && n == sched->node)
n->state = n->methods->schedule_input(n, n->user_data);
debug("node %p scheduled input state %d\n", n, n->state);
if (n == sched->node)
break;
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(sched->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_OUT:
n->state = n->methods->schedule_output(n, n->user_data);
debug("node %p scheduled output state %d\n", n, n->state);
n->action = SPA_GRAPH_ACTION_CHECK;
spa_list_insert(sched->ready.prev, &n->ready_link);
break;
case SPA_GRAPH_ACTION_CHECK:
if (n->state == SPA_RESULT_NEED_BUFFER) {
n->ready_in = 0;

View file

@ -38,21 +38,34 @@ static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched,
sched->node = NULL;
}
static inline int spa_graph_scheduler_default(struct spa_graph_node *node)
static inline int spa_graph_scheduler_input(struct spa_graph_node *node, void *user_data)
{
int res;
struct spa_node *n = node->user_data;
if (node->action == SPA_GRAPH_ACTION_IN)
res = spa_node_process_input(n);
else if (node->action == SPA_GRAPH_ACTION_OUT)
res = spa_node_process_output(n);
else
res = SPA_RESULT_ERROR;
return res;
return spa_node_process_input(n);
}
static inline int spa_graph_scheduler_output(struct spa_graph_node *node, void *user_data)
{
struct spa_node *n = node->user_data;
return spa_node_process_output(n);
}
static inline int spa_graph_scheduler_reuse_buffer(struct spa_graph_port *port,
uint32_t buffer_id, void *user_data)
{
printf("port %p reuse buffer %d\n", port, buffer_id);
struct spa_graph_node *node = port->node;
struct spa_node *n = node->user_data;
return spa_node_port_reuse_buffer(n, port->port_id, buffer_id);
}
static const struct spa_graph_node_methods spa_graph_scheduler_default = {
SPA_VERSION_GRAPH_NODE_METHODS,
spa_graph_scheduler_input,
spa_graph_scheduler_output,
spa_graph_scheduler_reuse_buffer,
};
static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, struct spa_graph_node *node)
{
struct spa_graph_port *p;
@ -79,9 +92,8 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
}
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->action = SPA_GRAPH_ACTION_OUT;
n->state = n->schedule(n);
debug("peer %p scheduled %d %d\n", n, n->action, n->state);
n->state = n->methods->schedule_output(n, n->user_data);
debug("peer %p scheduled out %d\n", n, n->state);
if (n->state == SPA_RESULT_NEED_BUFFER)
spa_graph_scheduler_pull(sched, n);
else {
@ -97,9 +109,8 @@ static inline void spa_graph_scheduler_pull(struct spa_graph_scheduler *sched, s
debug("node %p %d %d\n", node, node->ready_in, node->required_in);
if (node->required_in > 0 && node->ready_in == node->required_in) {
node->action = SPA_GRAPH_ACTION_IN;
node->state = node->schedule(node);
debug("node %p scheduled %d %d\n", node, node->action, node->state);
node->state = node->methods->schedule_input(node, node->user_data);
debug("node %p scheduled in %d\n", node, node->state);
if (node->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
if (p->io->status == SPA_RESULT_HAVE_BUFFER)
@ -143,9 +154,8 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
}
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->action = SPA_GRAPH_ACTION_IN;
n->state = n->schedule(n);
debug("peer %p scheduled %d %d\n", n, n->action, n->state);
n->state = n->methods->schedule_input(n, n->user_data);
debug("peer %p scheduled in %d\n", n, n->state);
if (n->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_scheduler_push(sched, n);
else {
@ -159,9 +169,8 @@ static inline void spa_graph_scheduler_push(struct spa_graph_scheduler *sched, s
n->ready_link.next = NULL;
}
node->action = SPA_GRAPH_ACTION_OUT;
node->state = node->schedule(node);
debug("node %p scheduled %d %d\n", node, node->action, node->state);
node->state = node->methods->schedule_output(node, node->user_data);
debug("node %p scheduled out %d\n", node, node->state);
if (node->state == SPA_RESULT_NEED_BUFFER) {
node->ready_in = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {

View file

@ -42,7 +42,14 @@ struct spa_graph {
struct spa_list nodes;
};
typedef int (*spa_graph_node_func_t) (struct spa_graph_node * node);
#define SPA_VERSION_GRAPH_NODE_METHODS 0
struct spa_graph_node_methods {
uint32_t version;
int (*schedule_input) (struct spa_graph_node *node, void *user_data);
int (*schedule_output) (struct spa_graph_node *node, void *user_data);
int (*reuse_buffer) (struct spa_graph_port *port, uint32_t buffer_id, void *user_data);
};
struct spa_graph_node {
struct spa_list link;
@ -55,7 +62,7 @@ struct spa_graph_node {
#define SPA_GRAPH_ACTION_IN 1
#define SPA_GRAPH_ACTION_OUT 2
uint32_t action;
spa_graph_node_func_t schedule;
const struct spa_graph_node_methods *methods;
void *user_data;
uint32_t max_in;
uint32_t required_in;
@ -78,44 +85,57 @@ static inline void spa_graph_init(struct spa_graph *graph)
}
static inline void
spa_graph_node_add(struct spa_graph *graph, struct spa_graph_node *node,
spa_graph_node_func_t schedule, void *user_data)
spa_graph_node_init(struct spa_graph_node *node,
const struct spa_graph_node_methods *methods,
void *user_data)
{
spa_list_init(&node->ports[SPA_DIRECTION_INPUT]);
spa_list_init(&node->ports[SPA_DIRECTION_OUTPUT]);
node->flags = 0;
node->methods = methods;
node->user_data = user_data;
node->max_in = node->required_in = node->ready_in = 0;
debug("node %p init\n", node);
}
static inline void
spa_graph_node_add(struct spa_graph *graph,
struct spa_graph_node *node)
{
node->state = SPA_RESULT_NEED_BUFFER;
node->action = SPA_GRAPH_ACTION_OUT;
node->schedule = schedule;
node->user_data = user_data;
node->ready_link.next = NULL;
spa_list_insert(graph->nodes.prev, &node->link);
node->max_in = node->required_in = node->ready_in = 0;
debug("node %p add\n", node);
}
static inline void
spa_graph_port_add(struct spa_graph *graph,
struct spa_graph_node *node,
struct spa_graph_port *port,
enum spa_direction direction,
uint32_t port_id,
uint32_t flags,
struct spa_port_io *io)
spa_graph_port_init(struct spa_graph_port *port,
enum spa_direction direction,
uint32_t port_id,
uint32_t flags,
struct spa_port_io *io)
{
debug("port %p add type %d id %d to node %p \n", port, direction, port_id, node);
port->node = node;
debug("port %p init type %d id %d\n", port, direction, port_id);
port->direction = direction;
port->port_id = port_id;
port->flags = flags;
port->io = io;
spa_list_insert(node->ports[direction].prev, &port->link);
}
static inline void
spa_graph_port_add(struct spa_graph_node *node,
struct spa_graph_port *port)
{
debug("port %p add to node %p\n", port, node);
port->node = node;
spa_list_insert(node->ports[port->direction].prev, &port->link);
node->max_in++;
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && direction == SPA_DIRECTION_INPUT)
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL) && port->direction == SPA_DIRECTION_INPUT)
node->required_in++;
}
static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_graph_node *node)
static inline void spa_graph_node_remove(struct spa_graph_node *node)
{
debug("node %p remove\n", node);
spa_list_remove(&node->link);
@ -123,7 +143,7 @@ static inline void spa_graph_node_remove(struct spa_graph *graph, struct spa_gra
spa_list_remove(&node->ready_link);
}
static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_graph_port *port)
static inline void spa_graph_port_remove(struct spa_graph_port *port)
{
debug("port %p remove\n", port);
spa_list_remove(&port->link);
@ -132,7 +152,7 @@ static inline void spa_graph_port_remove(struct spa_graph *graph, struct spa_gra
}
static inline void
spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct spa_graph_port *in)
spa_graph_port_link(struct spa_graph_port *out, struct spa_graph_port *in)
{
debug("port %p link to %p \n", out, in);
out->peer = in;
@ -140,7 +160,7 @@ spa_graph_port_link(struct spa_graph *graph, struct spa_graph_port *out, struct
}
static inline void
spa_graph_port_unlink(struct spa_graph *graph, struct spa_graph_port *port)
spa_graph_port_unlink(struct spa_graph_port *port)
{
debug("port %p unlink from %p \n", port, port->peer);
if (port->peer) {

View file

@ -340,27 +340,27 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->volume, SPA_DIRECTION_OUTPUT, 0, &data->volume_sink_io[0]);
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->volume_sink_io[0]);
spa_graph_node_add(&data->graph, &data->source_node, spa_graph_scheduler_default,
data->source);
spa_graph_port_add(&data->graph, &data->source_node, &data->source_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_node_init(&data->source_node, &spa_graph_scheduler_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
spa_graph_port_init(&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_add(&data->graph, &data->volume_node, spa_graph_scheduler_default,
data->volume);
spa_graph_port_add(&data->graph, &data->volume_node, &data->volume_in, SPA_DIRECTION_INPUT,
0, 0, &data->source_volume_io[0]);
spa_graph_node_init(&data->volume_node, &spa_graph_scheduler_default, data->volume);
spa_graph_node_add(&data->graph, &data->volume_node);
spa_graph_port_init(&data->volume_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_volume_io[0]);
spa_graph_port_add(&data->volume_node, &data->volume_in);
spa_graph_port_link(&data->graph, &data->source_out, &data->volume_in);
spa_graph_port_link(&data->source_out, &data->volume_in);
spa_graph_port_add(&data->graph, &data->volume_node,
&data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]);
spa_graph_port_init(&data->volume_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->volume_sink_io[0]);
spa_graph_port_add(&data->volume_node, &data->volume_out);
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->volume_sink_io[0]);
spa_graph_node_init(&data->sink_node, &spa_graph_scheduler_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->volume_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);
spa_graph_port_link(&data->graph, &data->volume_out, &data->sink_in);
spa_graph_port_link(&data->volume_out, &data->sink_in);
return res;
}

View file

@ -418,35 +418,37 @@ static int make_nodes(struct data *data, const char *device)
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->mix_sink_io[0]);
#ifdef USE_GRAPH
spa_graph_node_add(&data->graph, &data->source1_node, spa_graph_scheduler_default,
data->source1);
spa_graph_port_add(&data->graph, &data->source1_node, &data->source1_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]);
spa_graph_node_init(&data->source1_node, &spa_graph_scheduler_default, data->source1);
spa_graph_port_init(&data->source1_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->source1_node, &data->source1_out);
spa_graph_node_add(&data->graph, &data->source1_node);
spa_graph_node_add(&data->graph, &data->source2_node, spa_graph_scheduler_default,
data->source2);
spa_graph_port_add(&data->graph, &data->source2_node, &data->source2_out,
SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]);
spa_graph_node_init(&data->source2_node, &spa_graph_scheduler_default, data->source2);
spa_graph_port_init(&data->source2_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source2_mix_io[0]);
spa_graph_port_add(&data->source2_node, &data->source2_out);
spa_graph_node_add(&data->graph, &data->source2_node);
spa_graph_node_add(&data->graph, &data->mix_node, spa_graph_scheduler_default,
data->mix);
spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[0], SPA_DIRECTION_INPUT,
data->mix_ports[0], 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->graph, &data->mix_node, &data->mix_in[1], SPA_DIRECTION_INPUT,
spa_graph_node_init(&data->mix_node, &spa_graph_scheduler_default, data->mix);
spa_graph_port_init(&data->mix_in[0], SPA_DIRECTION_INPUT,
data->mix_ports[0], 0, &data->source1_mix_io[0]);
spa_graph_port_add(&data->mix_node, &data->mix_in[0]);
spa_graph_port_init(&data->mix_in[1], SPA_DIRECTION_INPUT,
data->mix_ports[1], 0, &data->source2_mix_io[0]);
spa_graph_port_add(&data->mix_node, &data->mix_in[1]);
spa_graph_node_add(&data->graph, &data->mix_node);
spa_graph_port_link(&data->graph, &data->source1_out, &data->mix_in[0]);
spa_graph_port_link(&data->graph, &data->source2_out, &data->mix_in[1]);
spa_graph_port_link(&data->source1_out, &data->mix_in[0]);
spa_graph_port_link(&data->source2_out, &data->mix_in[1]);
spa_graph_port_add(&data->graph, &data->mix_node,
&data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]);
spa_graph_port_init(&data->mix_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->mix_sink_io[0]);
spa_graph_port_add(&data->mix_node, &data->mix_out);
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
spa_graph_port_add(&data->graph, &data->sink_node, &data->sink_in, SPA_DIRECTION_INPUT, 0,
0, &data->mix_sink_io[0]);
spa_graph_node_init(&data->sink_node, &spa_graph_scheduler_default, data->sink);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->mix_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);
spa_graph_node_add(&data->graph, &data->sink_node);
spa_graph_port_link(&data->graph, &data->mix_out, &data->sink_in);
spa_graph_port_link(&data->mix_out, &data->sink_in);
#endif
return res;

View file

@ -366,19 +366,21 @@ static int make_nodes(struct data *data)
spa_node_port_set_io(data->source, SPA_DIRECTION_OUTPUT, 0, &data->source_sink_io[0]);
spa_node_port_set_io(data->sink, SPA_DIRECTION_INPUT, 0, &data->source_sink_io[0]);
spa_graph_node_add(&data->graph, &data->source_node,
spa_graph_scheduler_default, data->source);
spa_graph_node_init(&data->source_node, &spa_graph_scheduler_default, data->source);
spa_graph_node_add(&data->graph, &data->source_node);
data->source_node.flags = (data->mode & MODE_ASYNC_PUSH) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
spa_graph_port_add(&data->graph, &data->source_node,
&data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_port_init( &data->source_out, SPA_DIRECTION_OUTPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_port_add(&data->source_node, &data->source_out);
spa_graph_node_init(&data->sink_node, &spa_graph_scheduler_default, data->sink);
spa_graph_node_add(&data->graph, &data->sink_node);
spa_graph_node_add(&data->graph, &data->sink_node, spa_graph_scheduler_default,
data->sink);
data->sink_node.flags = (data->mode & MODE_ASYNC_PULL) ? SPA_GRAPH_NODE_FLAG_ASYNC : 0;
spa_graph_port_add(&data->graph, &data->sink_node,
&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_port_init(&data->sink_in, SPA_DIRECTION_INPUT, 0, 0, &data->source_sink_io[0]);
spa_graph_port_add(&data->sink_node, &data->sink_in);
spa_graph_port_link(&data->graph, &data->source_out, &data->sink_in);
spa_graph_port_link(&data->source_out, &data->sink_in);
return res;
}