From 4731a0b4b8c4773a8673bb807d1a429925319ade Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 15 Aug 2017 11:02:42 +0200 Subject: [PATCH] More work on jack support --- spa/include/spa/graph-scheduler3.h | 2 - src/modules/module-jack.c | 249 +++++++++++++++++++++------- src/modules/module-jack/jack-node.c | 216 +++++++++++++++++++----- src/modules/module-jack/jack-node.h | 23 ++- src/modules/module-jack/server.h | 3 + src/modules/module-jack/shared.h | 4 +- src/pipewire/node.c | 2 - src/pipewire/port.c | 3 +- 8 files changed, 390 insertions(+), 112 deletions(-) diff --git a/spa/include/spa/graph-scheduler3.h b/spa/include/spa/graph-scheduler3.h index 98abb32fc..39700b101 100644 --- a/spa/include/spa/graph-scheduler3.h +++ b/spa/include/spa/graph-scheduler3.h @@ -28,14 +28,12 @@ extern "C" { struct spa_graph_scheduler { struct spa_graph *graph; - struct spa_graph_node *node; }; static inline void spa_graph_scheduler_init(struct spa_graph_scheduler *sched, struct spa_graph *graph) { sched->graph = graph; - sched->node = NULL; } static inline int spa_graph_node_scheduler_input(void *data) diff --git a/src/modules/module-jack.c b/src/modules/module-jack.c index 3c5b3e46c..25d93eb56 100644 --- a/src/modules/module-jack.c +++ b/src/modules/module-jack.c @@ -39,6 +39,7 @@ #include "pipewire/module.h" #include "pipewire/client.h" #include "pipewire/resource.h" +#include "pipewire/private.h" #include "pipewire/link.h" #include "pipewire/node-factory.h" #include "pipewire/data-loop.h" @@ -86,6 +87,10 @@ struct impl { struct jack_server server; struct pw_link *sink_link; + + struct { + struct spa_list nodes; + } rt; }; struct client { @@ -122,6 +127,80 @@ static bool init_socket_name(struct sockaddr_un *addr, const char *name, bool pr return true; } +static int +notify_client(struct jack_client *client, int ref_num, const char *name, int notify, + int sync, const char* message, int value1, int value2) +{ + int size, result = 0; + char _name[JACK_CLIENT_NAME_SIZE+1]; + char _message[JACK_MESSAGE_SIZE+1]; + + if (client->fd == 0) + return 0; + + if (name == NULL) + name = client->control->name; + + snprintf(_name, sizeof(_name), "%s", name); + snprintf(_message, sizeof(_message), "%s", message); + + size = sizeof(int) + sizeof(_name) + 5 * sizeof(int) + sizeof(_message); + CheckWrite(&size, sizeof(int)); + CheckWrite(_name, sizeof(_name)); + CheckWrite(&ref_num, sizeof(int)); + CheckWrite(¬ify, sizeof(int)); + CheckWrite(&value1, sizeof(int)); + CheckWrite(&value2, sizeof(int)); + CheckWrite(&sync, sizeof(int)); + CheckWrite(_message, sizeof(_message)); + + if (sync) + CheckRead(&result, sizeof(int)); + + return result; +} + +static int +notify_add_client(struct impl *impl, struct jack_client *client, const char *name, int ref_num) +{ + struct jack_server *server = &impl->server; + int i; + + for (i = 0; i < CLIENT_NUM; i++) { + struct jack_client *c = server->client_table[i]; + const char *n; + + if (c == NULL || c == client) + continue; + + n = c->control->name; + if (notify_client(c, ref_num, name, jack_notify_AddClient, false, "", 0, 0) < 0) { + pw_log_warn("module-jack %p: can't notify client", impl); + } + if (notify_client(client, i, n, jack_notify_AddClient, true, "", 0, 0) < 0) { + pw_log_error("module-jack %p: can't notify client", impl); + return -1; + } + } + return 0; +} + +void +notify_clients(struct impl *impl, int notify, + int sync, const char* message, int value1, int value2) +{ + struct jack_server *server = &impl->server; + int i; + for (i = 0; i < CLIENT_NUM; i++) { + struct jack_client *c = server->client_table[i]; + + if (c == NULL) + continue; + + notify_client(c, i, NULL, notify, sync, message, value1, value2); + } +} + static int process_messages(struct client *client); static void client_destroy(void *data) @@ -196,6 +275,9 @@ handle_register_port(struct client *client) reply_stop: jack_graph_manager_next_stop(mgr); + if (jc->control->active) + notify_clients(impl, jack_notify_PortRegistrationOnCallback, false, "", port_index, 0); + reply: CheckWrite(&result, sizeof(int)); CheckWrite(&port_index, sizeof(jack_port_id_t)); @@ -227,6 +309,8 @@ handle_activate_client(struct client *client) jack_graph_manager_next_stop(mgr); + notify_clients(impl, jack_notify_ActivateClient, true, "", 0, 0); + CheckWrite(&result, sizeof(int)); return 0; } @@ -304,61 +388,6 @@ handle_client_check(struct client *client) return 0; } -static int -notify_client(struct jack_client *client, int ref_num, const char *name, int notify, - int sync, const char* message, int value1, int value2) -{ - int size, result = 0; - char _name[JACK_CLIENT_NAME_SIZE+1]; - char _message[JACK_MESSAGE_SIZE+1]; - - if (client->fd == 0) - return 0; - - snprintf(_name, sizeof(_name), "%s", name); - snprintf(_message, sizeof(_message), "%s", message); - - size = sizeof(int) + sizeof(_name) + 5 * sizeof(int) + sizeof(_message); - CheckWrite(&size, sizeof(int)); - CheckWrite(_name, sizeof(_name)); - CheckWrite(&ref_num, sizeof(int)); - CheckWrite(¬ify, sizeof(int)); - CheckWrite(&value1, sizeof(int)); - CheckWrite(&value2, sizeof(int)); - CheckWrite(&sync, sizeof(int)); - CheckWrite(_message, sizeof(_message)); - - if (sync) - CheckRead(&result, sizeof(int)); - - return result; -} - -static int -notify_add_client(struct impl *impl, struct jack_client *client, const char *name, int ref_num) -{ - struct jack_server *server = &impl->server; - int i; - - for (i = 0; i < CLIENT_NUM; i++) { - struct jack_client *c = server->client_table[i]; - const char *n; - - if (c == NULL || c == client) - continue; - - n = c->control->name; - if (notify_client(c, ref_num, name, jack_notify_AddClient, false, "", 0, 0) < 0) { - pw_log_warn("module-jack %p: can't notify client", impl); - } - if (notify_client(client, i, n, jack_notify_AddClient, true, "", 0, 0) < 0) { - pw_log_error("module-jack %p: can't notify client", impl); - return -1; - } - } - return 0; -} - static int handle_client_open(struct client *client) { @@ -434,6 +463,8 @@ handle_client_open(struct client *client) goto reply; } + spa_list_append(&impl->rt.nodes, &jc->node->graph_link); + shared_engine = impl->server.engine_control->info.index; shared_client = jc->control->info.index; shared_graph = impl->server.graph_manager->info.index; @@ -576,6 +607,8 @@ handle_connect_name_ports(struct client *client) 0); pw_link_activate(link); + notify_clients(impl, jack_notify_PortConnectCallback, false, "", src_id, dst_id); + result = 0; reply_stop: jack_graph_manager_next_stop(mgr); @@ -618,9 +651,13 @@ process_messages(struct client *client) res = handle_register_port(client); break; case jack_request_UnRegisterPort: + break; case jack_request_ConnectPorts: + break; case jack_request_DisconnectPorts: + break; case jack_request_SetTimeBaseClient: + break; case jack_request_ActivateClient: res = handle_activate_client(client); break; @@ -628,6 +665,7 @@ process_messages(struct client *client) res = handle_deactivate_client(client); break; case jack_request_DisconnectPort: + break; case jack_request_SetClientCapabilities: case jack_request_GetPortConnections: case jack_request_GetPortNConnections: @@ -649,6 +687,7 @@ process_messages(struct client *client) res = handle_connect_name_ports(client); break; case jack_request_DisconnectNamePorts: + break; case jack_request_GetInternalClientName: case jack_request_InternalClientHandle: case jack_request_InternalClientLoad: @@ -763,6 +802,94 @@ static struct client *client_new(struct impl *impl, int fd) return NULL; } +static void jack_node_pull(void *data) +{ + struct jack_client *jc = data; + struct impl *impl = jc->data; + struct jack_server *server = &impl->server; + struct jack_graph_manager *mgr = server->graph_manager; + struct spa_graph_node *n = &jc->node->node->rt.node, *pn; + struct spa_graph_port *p, *pp; + + jack_graph_manager_try_switch(mgr); + + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) + continue; + pn->state = pn->callbacks->process_input(pn->callbacks_data); + } +} + +static void jack_node_push(void *data) +{ + struct jack_client *jc = data; + struct impl *impl = jc->data; + struct jack_server *server = &impl->server; + struct jack_graph_manager *mgr = server->graph_manager; + struct jack_connection_manager *conn; + int activation; + struct pw_jack_node *node; + struct spa_graph_node *n = &jc->node->node->rt.node, *pn; + struct spa_graph_port *p, *pp; + + conn = jack_graph_manager_get_current(mgr); + + jack_connection_manager_reset(conn, mgr->client_timing); + + activation = jack_connection_manager_get_activation(conn, server->freewheel_ref_num); + if (activation == 0) + return; + + pw_log_trace("resume %d", activation); + + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) + continue; + pn->state = pn->callbacks->process_output(pn->callbacks_data); + } + + spa_list_for_each(node, &impl->rt.nodes, graph_link) { + n = &node->node->rt.node; + n->state = n->callbacks->process_output(n->callbacks_data); + + spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) { + if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) + continue; + pn->state = pn->callbacks->process_input(pn->callbacks_data); + } + + n->state = n->callbacks->process_input(n->callbacks_data); + + spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link) { + if ((pp = p->peer) == NULL || ((pn = pp->node) == NULL)) + continue; + pn->state = pn->callbacks->process_input(pn->callbacks_data); + } + } + + +#if 0 + jack_connection_manager_resume_ref_num(conn, + client->control, + server->synchro_table, + mgr->client_timing); + + if (server->engine_control->sync_mode) { + pw_log_trace("suspend"); + jack_connection_manager_suspend_ref_num(conn, + client->control, + server->synchro_table, + mgr->client_timing); + } +#endif +} + +static const struct pw_jack_node_events jack_node_events = { + PW_VERSION_JACK_NODE_EVENTS, + .pull = jack_node_pull, + .push = jack_node_push, +}; + static int make_audio_client(struct impl *impl) { @@ -787,6 +914,7 @@ make_audio_client(struct impl *impl) } jc = calloc(1,sizeof(struct jack_client)); + jc->data = impl; jc->ref_num = ref_num; jc->control = jack_client_control_alloc("system", -1, ref_num, -1); jc->control->active = true; @@ -817,12 +945,13 @@ make_audio_client(struct impl *impl) jack_graph_manager_next_stop(mgr); server->audio_ref_num = ref_num; - server->audio_node = pw_jack_node_new(impl->core, pw_module_get_global(impl->module), server, ref_num, NULL); server->audio_node_node = pw_jack_node_get_node(server->audio_node); jc->node = server->audio_node; + pw_jack_node_add_listener(server->audio_node, &jc->node_listener, &jack_node_events, jc); + pw_log_debug("module-jack %p: Added audio driver %d", impl, ref_num); return 0; @@ -851,6 +980,7 @@ make_freewheel_client(struct impl *impl) } jc = calloc(1,sizeof(struct jack_client)); + jc->data = impl; jc->ref_num = ref_num; jc->control = jack_client_control_alloc("freewheel", -1, ref_num, -1); jc->control->active = true; @@ -876,7 +1006,7 @@ static bool on_global(void *data, struct pw_global *global) { struct impl *impl = data; - struct pw_node *node, *n; + struct pw_node *node; const struct pw_properties *properties; const char *str; @@ -892,10 +1022,8 @@ static bool on_global(void *data, struct pw_global *global) if (strcmp(str, "Audio/Sink") != 0) return true; - n = pw_jack_node_get_node(impl->server.audio_node); - impl->sink_link = pw_link_new(impl->core, pw_module_get_global(impl->module), - pw_node_get_free_port(n, PW_DIRECTION_OUTPUT), + pw_node_get_free_port(impl->server.audio_node_node, PW_DIRECTION_OUTPUT), pw_node_get_free_port(node, PW_DIRECTION_INPUT), NULL, NULL, @@ -1106,6 +1234,7 @@ static struct impl *module_init(struct pw_module *module, struct pw_properties * spa_list_init(&impl->socket_list); spa_list_init(&impl->client_list); + spa_list_init(&impl->rt.nodes); str = NULL; if (impl->properties) diff --git a/src/modules/module-jack/jack-node.c b/src/modules/module-jack/jack-node.c index a5c4e0166..9e52714e1 100644 --- a/src/modules/module-jack/jack-node.c +++ b/src/modules/module-jack/jack-node.c @@ -54,16 +54,12 @@ struct type { struct spa_type_audio_format audio_format; }; -struct pw_jack_node { - struct pw_core *core; - struct pw_node *node; +struct impl { + struct pw_jack_node node; - struct spa_hook_list listener_list; - - struct jack_server *server; struct type type; - int ref_num; - struct pw_port *otherport; + + int status; }; static inline void init_type(struct type *type, struct spa_type_map *map) @@ -80,14 +76,15 @@ struct buffer { struct spa_list link; struct spa_buffer *outbuf; void *ptr; - size_t size; }; struct port_data { - struct pw_jack_node *node; + struct impl *impl; + enum pw_direction direction; int jack_port_id; struct jack_port *port; + float *ptr; struct spa_port_info info; @@ -145,7 +142,7 @@ static void recycle_buffer(struct pw_jack_node *this, struct port_data *pd, uint spa_list_append(&pd->empty, &b->link); } -static int node_process_input(void *data) +static int driver_process_input(void *data) { struct pw_jack_node *this = data; struct spa_graph_node *node = &this->node->rt.node; @@ -176,39 +173,147 @@ static int node_process_input(void *data) return SPA_RESULT_HAVE_BUFFER; } -static int node_process_output(void *data) +static void conv_f32_s16(int16_t *out, float *in, int n_samples, int stride) +{ + int i; + for (i = 0; i < n_samples; i++) { + if (in[i] < -1.0f) + *out = -32767; + else if (in[i] >= 1.0f) + *out = 32767; + else + *out = lrintf(in[i] * 32767.0f); + out += stride; + } +} +static void fill_s16(int16_t *out, int n_samples, int stride) +{ + int i; + for (i = 0; i < n_samples; i++) { + *out = 0; + out += stride; + } +} + +static int driver_process_output(void *data) { struct pw_jack_node *this = data; struct spa_graph_node *node = &this->node->rt.node; struct spa_graph_port *p; + struct port_data *opd = pw_port_get_user_data(this->otherport); + struct spa_port_io *out_io = opd->io; + struct jack_engine_control *ctrl = this->server->engine_control; + struct buffer *out; + int16_t *op; pw_log_trace(NAME "%p: process output", this); - spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { - struct pw_port *port = p->callbacks_data; - struct port_data *opd = pw_port_get_user_data(port); - struct spa_port_io *out_io = opd->io; + if (out_io->status == SPA_RESULT_HAVE_BUFFER) + return SPA_RESULT_HAVE_BUFFER; - if (out_io->status == SPA_RESULT_HAVE_BUFFER) - return SPA_RESULT_HAVE_BUFFER; - - if (out_io->buffer_id != SPA_ID_INVALID) { - recycle_buffer(this, opd, out_io->buffer_id); - out_io->buffer_id = SPA_ID_INVALID; - } + if (out_io->buffer_id != SPA_ID_INVALID) { + recycle_buffer(this, opd, out_io->buffer_id); + out_io->buffer_id = SPA_ID_INVALID; } + out = buffer_dequeue(this, opd); + if (out == NULL) + return SPA_RESULT_OUT_OF_BUFFERS; + + out_io->buffer_id = out->outbuf->id; + out_io->status = SPA_RESULT_HAVE_BUFFER; + + op = out->ptr; + + spa_hook_list_call(&this->listener_list, struct pw_jack_node_events, pull); + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { struct pw_port *port = p->callbacks_data; struct port_data *ipd = pw_port_get_user_data(port); struct spa_port_io *in_io = ipd->io; + struct buffer *in; + int stride = 2; + if (in_io->buffer_id != SPA_ID_INVALID && in_io->status == SPA_RESULT_HAVE_BUFFER) { + in = &ipd->buffers[in_io->buffer_id]; + conv_f32_s16(op, in->ptr, ctrl->buffer_size, stride); + } + else { + fill_s16(op, ctrl->buffer_size, stride); + } + op++; in_io->status = SPA_RESULT_NEED_BUFFER; } + out->outbuf->datas[0].chunk->size = ctrl->buffer_size * sizeof(int16_t) * 2; - spa_hook_list_call(&this->listener_list, struct pw_jack_node_events, process); + spa_hook_list_call(&this->listener_list, struct pw_jack_node_events, push); + node->ready_in = node->required_in; - return SPA_RESULT_NEED_BUFFER; + return SPA_RESULT_HAVE_BUFFER; +} + +static const struct pw_node_implementation driver_impl = { + PW_VERSION_NODE_IMPLEMENTATION, + .get_props = node_get_props, + .set_props = node_set_props, + .send_command = node_send_command, + .add_port = node_add_port, + .process_input = driver_process_input, + .process_output = driver_process_output, +}; + +static int node_process_input(void *data) +{ + struct impl *impl = data; + struct pw_jack_node *this = &impl->node; + struct spa_graph_node *node = &this->node->rt.node; + struct spa_graph_port *p; + struct jack_server *server = this->server; + struct jack_graph_manager *mgr = server->graph_manager; + struct jack_connection_manager *conn; + jack_time_t current_date = 0; + int ref_num = this->ref_num; + + pw_log_trace(NAME " %p: process input", impl); + if (impl->status == SPA_RESULT_HAVE_BUFFER) + return SPA_RESULT_HAVE_BUFFER; + + mgr->client_timing[ref_num].status = Triggered; + mgr->client_timing[ref_num].signaled_at = current_date; + + conn = jack_graph_manager_get_current(mgr); + + jack_activation_count_signal(&conn->input_counter[ref_num], + &server->synchro_table[ref_num]); + + spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { + struct pw_port *port = p->callbacks_data; + struct port_data *opd = pw_port_get_user_data(port); + struct spa_port_io *out_io = opd->io; + out_io->buffer_id = 0; + out_io->status = SPA_RESULT_HAVE_BUFFER; + pw_log_trace(NAME " %p: port %p: %d %d", impl, p, out_io->buffer_id, out_io->status); + } + return impl->status = SPA_RESULT_HAVE_BUFFER; +} + +static int node_process_output(void *data) +{ + struct impl *impl = data; + struct pw_jack_node *this = &impl->node; + struct spa_graph_node *node = &this->node->rt.node; + struct spa_graph_port *p; + + pw_log_trace(NAME " %p: process output", impl); + spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + struct pw_port *port = p->callbacks_data; + struct port_data *ipd = pw_port_get_user_data(port); + struct spa_port_io *in_io = ipd->io; + in_io->buffer_id = 0; + in_io->status = SPA_RESULT_NEED_BUFFER; + pw_log_trace(NAME " %p: port %p: %d %d", impl, p, in_io->buffer_id, in_io->status); + } + return impl->status = SPA_RESULT_NEED_BUFFER; } static const struct pw_node_implementation node_impl = { @@ -237,9 +342,10 @@ static int port_enum_formats(void *data, int32_t index) { struct port_data *pd = data; - struct type *t = &pd->node->type; + struct type *t = &pd->impl->type; struct spa_pod_builder b = { NULL, }; struct spa_pod_frame f[2]; + struct jack_engine_control *ctrl = pd->impl->node.server->engine_control; if (index > 0) return SPA_RESULT_ENUM_END; @@ -251,7 +357,7 @@ static int port_enum_formats(void *data, spa_pod_builder_format(&b, &f[0], t->format, t->media_type.audio, t->media_subtype.raw, PROP(&f[1], t->format_audio.format, SPA_POD_TYPE_ID, t->audio_format.F32), - PROP(&f[1], t->format_audio.rate, SPA_POD_TYPE_INT, 44100), + PROP(&f[1], t->format_audio.rate, SPA_POD_TYPE_INT, ctrl->sample_rate), PROP(&f[1], t->format_audio.channels, SPA_POD_TYPE_INT, 1)); } else if (pd->port->type_id == 1) { @@ -264,7 +370,7 @@ static int port_enum_formats(void *data, spa_pod_builder_format(&b, &f[0], t->format, t->media_type.audio, t->media_subtype.raw, PROP(&f[1], t->format_audio.format, SPA_POD_TYPE_ID, t->audio_format.S16), - PROP(&f[1], t->format_audio.rate, SPA_POD_TYPE_INT, 44100), + PROP(&f[1], t->format_audio.rate, SPA_POD_TYPE_INT, ctrl->sample_rate), PROP(&f[1], t->format_audio.channels, SPA_POD_TYPE_INT, 2)); } *format = SPA_POD_BUILDER_DEREF(&b, f[0].ref, struct spa_format); @@ -279,7 +385,11 @@ static int port_set_format(void *data, uint32_t flags, const struct spa_format * static int port_get_format(void *data, const struct spa_format **format) { - return SPA_RESULT_OK; + int res; + struct spa_format *fmt; + res = port_enum_formats(data, &fmt, NULL, 0); + *format = fmt; + return res; } static int port_get_info(void *data, const struct spa_port_info **info) @@ -287,7 +397,10 @@ static int port_get_info(void *data, const struct spa_port_info **info) struct port_data *pd = data; pd->info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | SPA_PORT_INFO_FLAG_LIVE; - pd->info.rate = 44100; + if (pd->direction == PW_DIRECTION_OUTPUT && pd->impl->node.otherport == NULL) + pd->info.flags |= SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + + pd->info.rate = pd->impl->node.server->engine_control->sample_rate; *info = &pd->info; return SPA_RESULT_OK; @@ -306,7 +419,7 @@ static int port_set_param(void *data, struct spa_param *param) static int port_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_buffers) { struct port_data *pd = data; - struct type *t = &pd->node->type; + struct type *t = &pd->impl->type; int i; for (i = 0; i < n_buffers; i++) { @@ -319,7 +432,6 @@ static int port_use_buffers(void *data, struct spa_buffer **buffers, uint32_t n_ d[0].type == t->data.MemFd || d[0].type == t->data.DmaBuf) && d[0].data != NULL) { b->ptr = d[0].data; - b->size = d[0].maxsize; } else { pw_log_error(NAME " %p: invalid memory on buffer %p", pd, buffers[i]); return SPA_RESULT_ERROR; @@ -335,7 +447,23 @@ static int port_alloc_buffers(void *data, struct spa_param **params, uint32_t n_params, struct spa_buffer **buffers, uint32_t *n_buffers) { - return SPA_RESULT_NOT_IMPLEMENTED; + struct port_data *pd = data; + struct type *t = &pd->impl->type; + int i; + + for (i = 0; i < *n_buffers; i++) { + struct buffer *b; + struct spa_data *d = buffers[i]->datas; + + b = &pd->buffers[i]; + b->outbuf = buffers[i]; + d[0].type = t->data.MemPtr; + b->ptr = d[0].data = pd->ptr; + spa_list_append(&pd->empty, &b->link); + } + pd->n_buffers = *n_buffers; + + return SPA_RESULT_OK; } static int port_reuse_buffer(void *data, uint32_t buffer_id) @@ -366,6 +494,7 @@ static const struct pw_port_implementation port_impl = { static struct pw_port *make_port(struct pw_jack_node *node, enum pw_direction direction, int port_id, int jack_port_id, struct jack_port *jp, bool autoconnect) { + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, node); struct pw_port *port; struct port_data *pd; struct pw_properties *properties = NULL; @@ -375,9 +504,11 @@ static struct pw_port *make_port(struct pw_jack_node *node, enum pw_direction di port = pw_port_new(direction, port_id, properties, sizeof(struct port_data)); pd = pw_port_get_user_data(port); - pd->node = node; + pd->direction = direction; + pd->impl = impl; pd->jack_port_id = jack_port_id; pd->port = jp; + pd->ptr = (float *)((uintptr_t)jp->buffer & ~31L) + 8; spa_list_init(&pd->empty); pw_port_set_implementation(port, &port_impl, pd); pw_port_add(port, node->node); @@ -391,6 +522,7 @@ struct pw_jack_node *pw_jack_node_new(struct pw_core *core, int ref_num, struct pw_properties *properties) { + struct impl *impl; struct pw_jack_node *this; struct pw_node *node; struct jack_client *client = server->client_table[ref_num]; @@ -401,21 +533,21 @@ struct pw_jack_node *pw_jack_node_new(struct pw_core *core, bool make_input = false, make_output = false; node = pw_node_new(core, NULL, parent, client->control->name, - properties, sizeof(struct pw_jack_node)); + properties, sizeof(struct impl)); if (node == NULL) return NULL; - this = pw_node_get_user_data(node); + impl = pw_node_get_user_data(node); + this = &impl->node; pw_log_debug("jack-node %p: new", this); this->node = node; this->core = core; spa_hook_list_init(&this->listener_list); this->server = server; + this->client = client; this->ref_num = ref_num; - init_type(&this->type, pw_core_get_type(core)->map); - - pw_node_set_implementation(node, &node_impl, this); + init_type(&impl->type, pw_core_get_type(core)->map); conn = jack_graph_manager_next_start(mgr); @@ -445,6 +577,12 @@ struct pw_jack_node *pw_jack_node_new(struct pw_core *core, if (make_input) this->otherport = make_port(this, PW_DIRECTION_INPUT, 0, -1, NULL, true); + if (make_output || make_input) + pw_node_set_implementation(node, &driver_impl, this); + else + pw_node_set_implementation(node, &node_impl, this); + + pw_node_register(node); return this; diff --git a/src/modules/module-jack/jack-node.h b/src/modules/module-jack/jack-node.h index 185e2ec47..fa2b57419 100644 --- a/src/modules/module-jack/jack-node.h +++ b/src/modules/module-jack/jack-node.h @@ -26,11 +26,20 @@ extern "C" { #endif -/** \class pw_jack_node - * - * PipeWire jack node interface - */ -struct pw_jack_node; +struct pw_jack_node { + struct pw_core *core; + struct pw_node *node; + + struct spa_hook_list listener_list; + + struct jack_server *server; + struct jack_client *client; + int ref_num; + + struct pw_port *otherport; + + struct spa_list graph_link; +}; struct pw_jack_node_events { #define PW_VERSION_JACK_NODE_EVENTS 0 @@ -38,7 +47,9 @@ struct pw_jack_node_events { void (*destroy) (void *data); - void (*process) (void *data); + void (*pull) (void *data); + + void (*push) (void *data); }; diff --git a/src/modules/module-jack/server.h b/src/modules/module-jack/server.h index 27313cde3..9be2d74d4 100644 --- a/src/modules/module-jack/server.h +++ b/src/modules/module-jack/server.h @@ -19,10 +19,13 @@ struct jack_client { int ref_num; + void *data; struct client *owner; struct jack_client_control *control; struct pw_jack_node *node; int fd; /* notify fd */ + struct spa_hook node_listener; + struct spa_list graph_link; }; struct jack_server { diff --git a/src/modules/module-jack/shared.h b/src/modules/module-jack/shared.h index a510e7de8..a94d8dafa 100644 --- a/src/modules/module-jack/shared.h +++ b/src/modules/module-jack/shared.h @@ -918,9 +918,9 @@ jack_engine_control_alloc(const char* name) ctrl = (struct jack_engine_control *)jack_shm_addr(&info); ctrl->info = info; - ctrl->buffer_size = 512; + ctrl->buffer_size = 128; ctrl->sample_rate = 48000; - ctrl->sync_mode = true; + ctrl->sync_mode = false; ctrl->temporary = false; ctrl->period_usecs = 1000000.f / ctrl->sample_rate * ctrl->buffer_size; ctrl->timeout_usecs = 0; diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 5c4f359ff..6725fe4ea 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -170,7 +170,6 @@ static void node_need_input(void *data) struct pw_node *this = &impl->this; spa_graph_scheduler_pull(this->rt.sched, &this->rt.node); - while (spa_graph_scheduler_iterate(this->rt.sched)); } static void node_have_output(void *data) @@ -179,7 +178,6 @@ static void node_have_output(void *data) struct pw_node *this = &impl->this; spa_graph_scheduler_push(this->rt.sched, &this->rt.node); - while (spa_graph_scheduler_iterate(this->rt.sched)); } static void node_unbind_func(void *data) diff --git a/src/pipewire/port.c b/src/pipewire/port.c index 4fa3b6388..e27363bed 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -101,7 +101,8 @@ static int schedule_mix_input(void *data) struct spa_port_io *io = this->rt.mix_port.io; spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { - pw_log_trace("mix input %p %p->%p %d %d", p, p->io, io, p->io->status, p->io->buffer_id); + pw_log_trace("mix %p: input %p %p->%p %d %d", node, + p, p->io, io, p->io->status, p->io->buffer_id); *io = *p->io; p->io->status = SPA_RESULT_OK; p->io->buffer_id = SPA_ID_INVALID;