diff --git a/src/pipewire-jack.c b/src/pipewire-jack.c index 111929542..8ead28a8e 100644 --- a/src/pipewire-jack.c +++ b/src/pipewire-jack.c @@ -22,6 +22,8 @@ #endif #include +#include +#include #include @@ -29,6 +31,7 @@ #include #include +#include #include "extensions/client-node.h" @@ -36,9 +39,20 @@ #define JACK_PORT_NAME_SIZE 256 #define JACK_PORT_MAX 4096 #define JACK_PORT_TYPE_SIZE 32 +#define PORT_NUM_FOR_CLIENT 1024 + +#define MAX_PORTS (PORT_NUM_FOR_CLIENT/2) +#define MAX_BUFFERS 64 +#define MAX_BUFFER_DATAS 4 +#define MAX_BUFFER_MEMS 4 + #define REAL_JACK_PORT_NAME_SIZE JACK_CLIENT_NAME_SIZE + JACK_PORT_NAME_SIZE +#define NAME "jack-client" + +struct client; + struct type { uint32_t client_node; struct spa_type_media_type media_type; @@ -68,15 +82,49 @@ struct node { char name[JACK_CLIENT_NAME_SIZE]; }; +struct mem { + uint32_t id; + int fd; + uint32_t flags; + uint32_t ref; + struct pw_map_range map; + void *ptr; +}; + +struct buffer { + struct spa_list link; +#define BUFFER_FLAG_OUT (1<<0) +#define BUFFER_FLAG_MAPPED (1<<1) + uint32_t flags; + uint32_t id; + void *ptr; + struct pw_map_range map; + + struct spa_data datas[MAX_BUFFER_DATAS]; + uint32_t n_datas; + + struct mem *mem[MAX_BUFFER_DATAS+1]; + uint32_t n_mem; +}; + struct port { bool valid; + + struct client *client; + + enum spa_direction direction; uint32_t id; uint32_t parent_id; unsigned long flags; char name[REAL_JACK_PORT_NAME_SIZE]; - enum spa_direction direction; + const char *type; uint32_t type_id; + + struct buffer buffers[MAX_BUFFERS]; + uint32_t n_buffers; + + struct spa_list queue; }; struct link { @@ -87,7 +135,8 @@ struct link { }; struct context { - struct pw_main_loop *loop; + struct pw_main_loop *main; + struct pw_thread_loop *loop; struct pw_core *core; struct pw_type *t; @@ -97,6 +146,10 @@ struct context { struct pw_array links; }; +#define GET_IN_PORT(c,p) (&c->in_ports[p]) +#define GET_OUT_PORT(c,p) (&c->out_ports[p]) +#define GET_PORT(c,d,p) (d == SPA_DIRECTION_INPUT ? GET_IN_PORT(c,p) : GET_OUT_PORT(c,p)) + struct client { struct type type; @@ -118,40 +171,114 @@ struct client { struct spa_hook node_listener; struct spa_hook proxy_listener; - struct port ports[1024]; - uint32_t n_ports; + uint32_t node_id; + struct pw_client_node_transport *trans; + int writefd; + struct spa_source *socket_source; + + bool active; + + JackThreadCallback thread_callback; + void *thread_arg; + JackProcessCallback process_callback; + void *process_arg; + + uint32_t sample_rate; + uint32_t buffer_size; + + struct port in_ports[MAX_PORTS]; + uint32_t n_in_ports; + uint32_t last_in_port; + struct port out_ports[MAX_PORTS]; + uint32_t n_out_ports; + uint32_t last_out_port; + + struct pw_array mems; }; -static uint32_t alloc_port(struct client *client) +static struct port * alloc_port(struct client *c, enum spa_direction direction) { int i; + struct port *p; - for (i = 0; i < client->n_ports; i++) { - if (!client->ports[i].valid) - break; + if (direction == SPA_DIRECTION_INPUT) { + for (i = 0; i < c->n_in_ports; i++) { + if (!c->in_ports[i].valid) + break; + } + if (i >= 1024) + return NULL; + + p = GET_IN_PORT(c, i); + c->n_in_ports++; + c->last_in_port = SPA_MAX(c->last_in_port, i + 1); + } else { + for (i = 0; i < c->n_out_ports; i++) { + if (!c->out_ports[i].valid) + break; + } + if (i >= 1024) + return NULL; + + p = GET_OUT_PORT(c, i); + c->n_out_ports++; + c->last_out_port = SPA_MAX(c->last_out_port, i + 1); } - if (i >= 1024) - return SPA_ID_INVALID; + p->valid = true; + p->client = c; + p->direction = direction; + p->id = i; - client->ports[i].valid = true; - client->n_ports = SPA_MAX(client->n_ports, i + 1); - return i; + pw_log_debug("port %p: dir %d, id %d", p, p->direction, p->id); + + return p; } -static struct port *find_port(struct client *client, const char *name) +static void free_port(struct client *c, struct port *p) +{ + if (!p->valid) + return; + + if (p->direction == SPA_DIRECTION_INPUT) { + c->n_in_ports--; + while (c->last_in_port > 0 && !c->in_ports[c->last_in_port - 1].valid) + c->last_in_port--; + } else { + c->n_out_ports--; + while (c->last_out_port > 0 && !c->out_ports[c->last_out_port - 1].valid) + c->last_out_port--; + } + p->valid = false; +} + +static struct port *find_port(struct client *c, const char *name) { struct port *p; - pw_array_for_each(p, &client->context.ports) { + pw_array_for_each(p, &c->context.ports) { if (!p->valid) continue; - pw_log_debug("%s", p->name); + pw_log_debug("\"%s\" <-> \"%s\"", p->name, name); if (!strcmp(p->name, name)) return p; } return NULL; } +static struct buffer *dequeue_buffer(struct port *p) +{ + struct buffer *b; + + if (spa_list_is_empty(&p->queue)) + return NULL; + + b = spa_list_first(&p->queue, struct buffer, link); + spa_list_remove(&b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + + return b; +} + void jack_get_version(int *major_ptr, int *minor_ptr, int *micro_ptr, int *proto_ptr) { *major_ptr = 0; @@ -170,7 +297,7 @@ static void on_sync_reply(void *data, uint32_t seq) { struct client *client = data; client->last_sync = seq; - pw_main_loop_quit(client->context.loop); + pw_thread_loop_signal(client->context.loop, false); } static void on_state_changed(void *data, enum pw_remote_state old, @@ -182,7 +309,7 @@ static void on_state_changed(void *data, enum pw_remote_state old, case PW_REMOTE_STATE_ERROR: client->error = true; case PW_REMOTE_STATE_CONNECTED: - pw_main_loop_quit(client->context.loop); + pw_thread_loop_signal(client->context.loop, false); break; default: break; @@ -202,7 +329,7 @@ static int do_sync(struct client *client) pw_core_proxy_sync(client->core_proxy, seq); while (true) { - pw_main_loop_run(client->context.loop); + pw_thread_loop_wait(client->context.loop); if (client->error) return -1; @@ -227,12 +354,247 @@ static const struct pw_proxy_events proxy_events = { .destroy = on_node_proxy_destroy, }; +static struct mem *find_mem(struct pw_array *mems, uint32_t id) +{ + struct mem *m; + + pw_array_for_each(m, mems) { + if (m->id == id) + return m; + } + return NULL; +} + +static void *mem_map(struct client *c, struct mem *m, uint32_t offset, uint32_t size) +{ + if (m->ptr == NULL) { + pw_map_range_init(&m->map, offset, size, c->context.core->sc_pagesize); + + m->ptr = mmap(NULL, m->map.size, PROT_READ|PROT_WRITE, + MAP_SHARED, m->fd, m->map.offset); + + if (m->ptr == MAP_FAILED) { + pw_log_error(NAME" %p: Failed to mmap memory %d %p: %m", c, size, m); + m->ptr = NULL; + return NULL; + } + } + return SPA_MEMBER(m->ptr, m->map.start, void); +} + +static void mem_unmap(struct client *c, struct mem *m) +{ + if (m->ptr != NULL) { + if (munmap(m->ptr, m->map.size) < 0) + pw_log_warn(NAME" %p: failed to unmap: %m", c); + m->ptr = NULL; + } +} + +static void clear_mem(struct client *c, struct mem *m) +{ + if (m->fd != -1) { + bool has_ref = false; + int fd; + struct mem *m2; + + fd = m->fd; + m->fd = -1; + + pw_array_for_each(m2, &c->mems) { + if (m2->fd == fd) { + has_ref = true; + break; + } + } + if (!has_ref) { + mem_unmap(c, m); + close(fd); + } + } +} + static void client_node_add_mem(void *object, uint32_t mem_id, uint32_t type, int memfd, uint32_t flags) { + struct client *c = object; + struct mem *m; + + m = find_mem(&c->mems, mem_id); + if (m) { + pw_log_debug(NAME" %p: update mem %u, fd %d, flags %d", c, + mem_id, memfd, flags); + clear_mem(c, m); + } else { + m = pw_array_add(&c->mems, sizeof(struct mem)); + pw_log_debug(NAME" %p: add mem %u, fd %d, flags %d", c, + mem_id, memfd, flags); + } + m->id = mem_id; + m->fd = memfd; + m->flags = flags; + m->ref = 0; + m->map = PW_MAP_RANGE_INIT; + m->ptr = NULL; +} + +static int +do_remove_sources(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct client *c = user_data; + + if (c->socket_source) { + pw_loop_destroy_source(c->context.core->data_loop, c->socket_source); + c->socket_source = NULL; + } + if (c->writefd != -1) { + close(c->writefd); + c->writefd = -1; + } + return 0; +} + +static void unhandle_socket(struct client *c) +{ + pw_loop_invoke(c->context.core->data_loop, + do_remove_sources, 1, NULL, 0, true, c); +} + +static void reuse_buffer(struct client *c, struct port *p, uint32_t id) +{ + struct buffer *b = &p->buffers[id]; + + if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { + pw_log_trace(NAME" %p: recycle buffer %d", c, id); + spa_list_append(&p->queue, &b->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + } +} + +static inline void send_need_input(struct client *c) +{ + uint64_t cmd = 1; + pw_client_node_transport_add_message(c->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); + write(c->writefd, &cmd, 8); +} + +static inline void send_have_output(struct client *c) +{ + uint64_t cmd = 1; + pw_client_node_transport_add_message(c->trans, + &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); + write(c->writefd, &cmd, 8); +} + +static void handle_rtnode_message(struct client *c, struct pw_client_node_message *message) +{ + pw_log_trace("node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); + + switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { + case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: + { + if (c->process_callback) + c->process_callback(c->buffer_size, c->process_arg); + + send_have_output(c); + break; + } + case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: + { + int i; + + for (i = 0; i < c->last_out_port; i++) { + struct port *p = &c->out_ports[i]; + struct spa_io_buffers *output; + + if (!p->valid) + continue; + + pw_log_trace("%d %d", i, p->id); + output = &c->trans->outputs[p->id]; + if (output->buffer_id == SPA_ID_INVALID) + continue; + + reuse_buffer(c, p, output->buffer_id); + output->buffer_id = SPA_ID_INVALID; + } + if (c->n_in_ports > 0) { + send_need_input(c); + } else { + if (c->process_callback) + c->process_callback(c->buffer_size, c->process_arg); + send_have_output(c); + } + break; + } + case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: + { + struct pw_client_node_message_port_reuse_buffer *rb = + (struct pw_client_node_message_port_reuse_buffer *) message; + struct port *p; + uint32_t port_id = rb->body.port_id.value; + + p = &c->out_ports[port_id]; + + if (!p->valid) + return; + + reuse_buffer(c, p, rb->body.buffer_id.value); + break; + } + default: + pw_log_warn("unexpected node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); + break; + } +} + +static void +on_rtsocket_condition(void *data, int fd, enum spa_io mask) +{ + struct client *c = data; + + if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { + pw_log_warn("got error"); + unhandle_socket(c); + return; + } + + if (mask & SPA_IO_IN) { + struct pw_client_node_message message; + uint64_t cmd; + + if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t)) + pw_log_warn("jack %p: read failed %m", c); + + while (pw_client_node_transport_next_message(c->trans, &message) == 1) { + struct pw_client_node_message *msg = alloca(SPA_POD_SIZE(&message)); + pw_client_node_transport_parse_message(c->trans, msg); + handle_rtnode_message(c, msg); + } + } +} + +static void clean_transport(struct client *c) +{ + struct mem *m; + + if (c->trans == NULL) + return; + + unhandle_socket(c); + + pw_array_for_each(m, &c->mems) + clear_mem(c, m); + pw_array_clear(&c->mems); + + pw_client_node_transport_destroy(c->trans); + c->trans = NULL; + close(c->writefd); } static void client_node_transport(void *object, @@ -241,13 +603,30 @@ static void client_node_transport(void *object, int writefd, struct pw_client_node_transport *transport) { -} + struct client *c = (struct client *) object; + struct pw_core *core = c->context.core; + clean_transport(c); + + c->node_id = node_id; + c->trans = transport; + + pw_log_info("client %p: create client transport %p with fds %d %d for node %u", + c, c->trans, readfd, writefd, node_id); + + c->writefd = writefd; + c->socket_source = pw_loop_add_io(core->data_loop, + readfd, + SPA_IO_ERR | SPA_IO_HUP, + true, on_rtsocket_condition, c); +} static void client_node_set_param(void *object, uint32_t seq, uint32_t id, uint32_t flags, const struct spa_pod *param) { + struct client *c = (struct client *) object; + pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); } static void client_node_event(void *object, const struct spa_event *event) @@ -256,6 +635,25 @@ static void client_node_event(void *object, const struct spa_event *event) static void client_node_command(void *object, uint32_t seq, const struct spa_command *command) { + struct client *c = (struct client *) object; + struct pw_type *t = c->context.t; + + if (SPA_COMMAND_TYPE(command) == t->command_node.Pause) { + pw_client_node_proxy_done(c->node_proxy, seq, 0); + + pw_loop_update_io(c->context.core->data_loop, + c->socket_source, SPA_IO_ERR | SPA_IO_HUP); + + } else if (SPA_COMMAND_TYPE(command) == t->command_node.Start) { + pw_client_node_proxy_done(c->node_proxy, seq, 0); + + pw_loop_update_io(c->context.core->data_loop, + c->socket_source, + SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); + } else { + pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); + pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); + } } static void client_node_add_port(void *object, @@ -263,6 +661,8 @@ static void client_node_add_port(void *object, enum spa_direction direction, uint32_t port_id) { + struct client *c = (struct client *) object; + pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); } static void client_node_remove_port(void *object, @@ -270,6 +670,8 @@ static void client_node_remove_port(void *object, enum spa_direction direction, uint32_t port_id) { + struct client *c = (struct client *) object; + pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); } static void client_node_port_set_param(void *object, @@ -279,7 +681,83 @@ static void client_node_port_set_param(void *object, uint32_t id, uint32_t flags, const struct spa_pod *param) { + struct client *c = (struct client *) object; + struct pw_type *t = c->context.t; + struct spa_pod *params[4]; + uint8_t buffer[1024]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + + c->sample_rate = 44100; + c->buffer_size = 1024 / sizeof(float); + + params[0] = spa_pod_builder_object(&b, + t->param.idEnumFormat, t->spa_format, + "I", c->type.media_type.audio, + "I", c->type.media_subtype.raw, + ":", c->type.format_audio.format, "I", c->type.audio_format.F32, + ":", c->type.format_audio.channels, "i", 1, + ":", c->type.format_audio.rate, "iru", c->sample_rate, 2, 1, INT32_MAX); + + params[1] = spa_pod_builder_object(&b, + t->param.idFormat, t->spa_format, + "I", c->type.media_type.audio, + "I", c->type.media_subtype.raw, + ":", c->type.format_audio.format, "I", c->type.audio_format.F32, + ":", c->type.format_audio.channels, "i", 1, + ":", c->type.format_audio.rate, "i", c->sample_rate); + + params[2] = spa_pod_builder_object(&b, + t->param.idBuffers, t->param_buffers.Buffers, + ":", t->param_buffers.size, "isu", 1024, 3, 4, INT32_MAX, 4, + ":", t->param_buffers.stride, "i", 4, + ":", t->param_buffers.buffers, "iru", 1, 2, 1, 2, + ":", t->param_buffers.align, "i", 16); + + pw_thread_loop_lock(c->context.loop); + pw_client_node_proxy_port_update(c->node_proxy, + direction, + port_id, + PW_CLIENT_NODE_PORT_UPDATE_PARAMS, + 3, + (const struct spa_pod **) params, + NULL); + + pw_client_node_proxy_done(c->node_proxy, seq, 0); } + +static void clear_buffers(struct client *c, struct port *p) +{ + struct buffer *b; + int i; + + pw_log_debug(NAME" %p: port %p clear buffers", c, p); + + for (i = 0; i < p->n_buffers; i++) { + b = &p->buffers[i]; + + if (b->ptr != NULL) { + if (munmap(b->ptr, b->map.size) < 0) + pw_log_warn("failed to unmap: %m"); + } + for (i = 0; i < b->n_datas; i++) { + struct spa_data *d = &b->datas[i]; + if (d->fd != -1 && d->data) { + if (munmap(SPA_MEMBER(d->data, -d->mapoffset, void), + d->maxsize + d->mapoffset) < 0) + pw_log_warn("failed to unmap: %m"); + } + } + for (i = 0; i < b->n_mem; i++) { + if (--b->mem[i]->ref == 0) + clear_mem(c, b->mem[i]); + } + b->n_mem = 0; + b->ptr = NULL; + } + p->n_buffers = 0; + spa_list_init(&p->queue); +} + static void client_node_port_use_buffers(void *object, uint32_t seq, enum spa_direction direction, @@ -287,7 +765,115 @@ static void client_node_port_use_buffers(void *object, uint32_t n_buffers, struct pw_client_node_buffer *buffers) { + struct client *c = (struct client *) object; + struct port *p = GET_PORT(c, direction, port_id); + struct pw_type *t = c->context.t; + struct buffer *b; + uint32_t i, j, prot, res; + struct pw_core *core = c->context.core; + + if (!p->valid) { + res = -EINVAL; + goto done; + } + + pw_log_debug(NAME" %p: port %p use_buffers %d", c, p, n_buffers); + + prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + + /* clear previous buffers */ + clear_buffers(c, p); + + for (i = 0; i < n_buffers; i++) { + off_t offset; + struct spa_buffer *buf; + + struct mem *m = find_mem(&c->mems, buffers[i].mem_id); + if (m == NULL) { + pw_log_warn(NAME" %p: unknown memory id %u", c, buffers[i].mem_id); + continue; + } + + buf = buffers[i].buffer; + + b = &p->buffers[buf->id]; + b->flags = 0; + b->id = buf->id; + b->n_mem = 0; + + pw_map_range_init(&b->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); + + b->ptr = mmap(NULL, b->map.size, prot, MAP_SHARED, m->fd, b->map.offset); + if (b->ptr == MAP_FAILED) { + b->ptr = NULL; + pw_log_warn(NAME" %p: Failed to mmap memory %u %u: %m", c, + b->map.offset, b->map.size); + continue; + } + + + m->ref++; + b->mem[b->n_mem++] = m; + + pw_log_debug("add buffer %d %d %u %u", m->id, b->id, b->map.offset, b->map.size); + + offset = b->map.start; + for (j = 0; j < buf->n_metas; j++) { + struct spa_meta *m = &buf->metas[j]; + offset += m->size; + } + + b->n_datas = SPA_MIN(buf->n_datas, MAX_BUFFER_DATAS); + + for (j = 0; j < b->n_datas; j++) { + struct spa_data *d = &b->datas[j]; + + c->buffer_size = SPA_MAX(c->buffer_size, d->maxsize / sizeof(float)); + + memcpy(d, &buf->datas[j], sizeof(struct spa_data)); + d->chunk = + SPA_MEMBER(b->ptr, offset + sizeof(struct spa_chunk) * j, + struct spa_chunk); + + if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { + struct mem *bm = find_mem(&c->mems, SPA_PTR_TO_UINT32(d->data)); + + d->data = mmap(NULL, d->maxsize + d->mapoffset, prot, + MAP_SHARED, bm->fd, 0); + if (d->data == MAP_FAILED) { + pw_log_error(NAME" %p: failed to map buffer mem %m", c); + d->data = NULL; + res = -errno; + goto done; + } + d->data = SPA_MEMBER(d->data, d->mapoffset, void); + d->fd = bm->fd; + bm->ref++; + b->mem[b->n_mem++] = bm; + pw_log_debug(NAME" %p: data %d %u -> fd %d", c, j, bm->id, bm->fd); + } else if (d->type == t->data.MemPtr) { + d->data = SPA_MEMBER(b->ptr, + b->map.start + SPA_PTR_TO_INT(d->data), void); + d->fd = -1; + pw_log_debug(NAME" %p: data %d %u -> mem %p", c, j, b->id, d->data); + } else { + pw_log_warn("unknown buffer data type %d", d->type); + } + if (mlock(d->data, d->maxsize) < 0) + pw_log_warn(NAME" %p: Failed to mlock memory %p %u: %m", c, + d->data, d->maxsize); + } + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + if (direction == SPA_DIRECTION_OUTPUT) + reuse_buffer(c, p, b->id); + } + p->n_buffers = n_buffers; + res = 0; + + done: + pw_client_node_proxy_done(c->node_proxy, seq, res); } + static void client_node_port_command(void *object, enum spa_direction direction, uint32_t port_id, @@ -304,10 +890,10 @@ static void client_node_port_set_io(void *object, uint32_t offset, uint32_t size) { + struct client *c = (struct client *) object; + pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); } - - static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, .add_mem = client_node_add_mem, @@ -358,6 +944,10 @@ static void registry_event_global(void *data, uint32_t id, uint32_t parent_id, else if (type == t->port) { const struct spa_dict_item *item; + if ((str = spa_dict_lookup(props, "port.dsp")) == NULL || + !pw_properties_parse_bool(str)) + return; + if ((str = spa_dict_lookup(props, "port.name")) == NULL) return; @@ -433,16 +1023,23 @@ jack_client_t * jack_client_open (const char *client_name, goto init_failed; strncpy(client->name, client_name, JACK_CLIENT_NAME_SIZE); - client->context.loop = pw_main_loop_new(NULL); - client->context.core = pw_core_new(pw_main_loop_get_loop(client->context.loop), NULL); + client->context.main = pw_main_loop_new(NULL); + client->context.loop = pw_thread_loop_new(pw_main_loop_get_loop(client->context.main), client_name); + client->context.core = pw_core_new(pw_thread_loop_get_loop(client->context.loop), NULL); client->context.t = pw_core_get_type(client->context.core); init_type(&client->type, client->context.t->map); + pw_array_init(&client->mems, 64); + pw_array_ensure_size(&client->mems, sizeof(struct mem) * 64); + pw_map_init(&client->context.globals, 64, 64); pw_array_init(&client->context.nodes, 64); pw_array_init(&client->context.ports, 64); pw_array_init(&client->context.links, 64); + pw_thread_loop_start(client->context.loop); + + pw_thread_loop_lock(client->context.loop); client->remote = pw_remote_new(client->context.core, pw_properties_new( "client.name", client_name, @@ -450,12 +1047,13 @@ jack_client_t * jack_client_open (const char *client_name, 0); pw_remote_add_listener(client->remote, &client->remote_listener, &remote_events, client); + pw_remote_connect(client->remote); while (busy) { const char *error = NULL; - pw_main_loop_run(client->context.loop); + pw_thread_loop_wait(client->context.loop); switch (pw_remote_get_state(client->remote, &error)) { case PW_REMOTE_STATE_ERROR: @@ -503,15 +1101,20 @@ jack_client_t * jack_client_open (const char *client_name, if (do_sync(client) < 0) goto init_failed; + pw_thread_loop_unlock(client->context.loop); + *status = 0; return (jack_client_t *)client; init_failed: *status = JackFailure | JackInitFailure; - return NULL; + goto exit; server_failed: *status = JackFailure | JackServerFailed; + goto exit; + exit: + pw_thread_loop_unlock(client->context.loop); return NULL; } @@ -532,8 +1135,11 @@ int jack_client_close (jack_client_t *client) pw_log_debug("client %p: close", client); + pw_thread_loop_stop(c->context.loop); + pw_core_destroy(c->context.core); - pw_main_loop_destroy(c->context.loop); + pw_thread_loop_destroy(c->context.loop); + pw_main_loop_destroy(c->context.main); free(c); return 0; @@ -576,19 +1182,39 @@ void jack_internal_client_close (const char *client_name) int jack_activate (jack_client_t *client) { struct client *c = (struct client *) client; + int res = 0; + pw_thread_loop_lock(c->context.loop); pw_client_node_proxy_done(c->node_proxy, 0, 0); pw_client_node_proxy_set_active(c->node_proxy, true); - if (do_sync(c) < 0) - return -1; + res = do_sync(c); - return 0; + pw_thread_loop_unlock(c->context.loop); + + if (res > 0) + c->active = true; + + return res; } int jack_deactivate (jack_client_t *client) { - return 0; + struct client *c = (struct client *) client; + int res = 0; + + pw_thread_loop_lock(c->context.loop); + pw_client_node_proxy_done(c->node_proxy, 0, 0); + pw_client_node_proxy_set_active(c->node_proxy, false); + + res = do_sync(c); + + pw_thread_loop_unlock(c->context.loop); + + if (res > 0) + c->active = false; + + return res; } int jack_get_client_pid (const char *name) @@ -622,6 +1248,18 @@ void jack_cycle_signal (jack_client_t* client, int status) int jack_set_process_thread(jack_client_t* client, JackThreadCallback thread_callback, void *arg) { + struct client *c = (struct client *) client; + + if (c->active) { + pw_log_error("jack %p: can't set callback on active client", c); + return -EIO; + } else if (c->process_callback) { + pw_log_error("jack %p: process callback was already set", c); + return -EIO; + } + + c->thread_callback = thread_callback; + c->thread_arg = arg; return 0; } @@ -646,6 +1284,19 @@ int jack_set_process_callback (jack_client_t *client, JackProcessCallback process_callback, void *arg) { + struct client *c = (struct client *) client; + + if (c->active) { + pw_log_error("jack %p: can't set callback on active client", c); + return -EIO; + } else if (c->thread_callback) { + pw_log_error("jack %p: thread callback was already set", c); + return -EIO; + } + + pw_log_debug("jack %p: %p %p", c, process_callback, arg); + c->process_callback = process_callback; + c->process_arg = arg; return 0; } @@ -730,12 +1381,14 @@ int jack_set_buffer_size (jack_client_t *client, jack_nframes_t nframes) jack_nframes_t jack_get_sample_rate (jack_client_t *client) { - return 0; + struct client *c = (struct client *) client; + return c->sample_rate; } jack_nframes_t jack_get_buffer_size (jack_client_t *client) { - return 0; + struct client *c = (struct client *) client; + return c->buffer_size; } int jack_engine_takeover_timebase (jack_client_t *client) @@ -765,6 +1418,7 @@ jack_port_t * jack_port_register (jack_client_t *client, struct spa_pod *params[4]; uint32_t port_id; struct port *p; + int res; pw_log_debug("client %p: port register \"%s\" \"%s\" %ld %ld", c, port_name, port_type, flags, buffer_size); @@ -776,13 +1430,11 @@ jack_port_t * jack_port_register (jack_client_t *client, else return NULL; - if ((port_id = alloc_port(c)) == SPA_ID_INVALID) + if ((p = alloc_port(c, direction)) == NULL) return NULL; - p = &c->ports[port_id]; - p->id = port_id; + port_id = p->id; snprintf(p->name, sizeof(p->name), "%s:%s", c->name, port_name); - p->direction = direction; p->type = port_type; if (strcmp(port_type, JACK_DEFAULT_AUDIO_TYPE) == 0) @@ -792,11 +1444,15 @@ jack_port_t * jack_port_register (jack_client_t *client, else return NULL; + spa_list_init(&p->queue); + p->n_buffers = 0; + port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS | SPA_PORT_INFO_FLAG_NO_REF; port_info.props = &dict; dict = SPA_DICT_INIT(items, 0); + items[dict.n_items++] = SPA_DICT_ITEM_INIT("port.dsp", "1"); items[dict.n_items++] = SPA_DICT_ITEM_INIT("port.name", port_name); items[dict.n_items++] = SPA_DICT_ITEM_INIT("port.type", port_type); @@ -810,11 +1466,12 @@ jack_port_t * jack_port_register (jack_client_t *client, params[1] = spa_pod_builder_object(&b, t->param.idBuffers, t->param_buffers.Buffers, - ":", t->param_buffers.size, "isu", 128, 3, 4, INT32_MAX, 4, + ":", t->param_buffers.size, "isu", 1024, 3, 4, INT32_MAX, 4, ":", t->param_buffers.stride, "i", 4, ":", t->param_buffers.buffers, "iru", 1, 2, 1, 2, ":", t->param_buffers.align, "i", 16); + pw_thread_loop_lock(c->context.loop); pw_client_node_proxy_port_update(c->node_proxy, direction, port_id, @@ -824,35 +1481,66 @@ jack_port_t * jack_port_register (jack_client_t *client, (const struct spa_pod **) params, &port_info); - if (do_sync(c) < 0) + res = do_sync(c); + + pw_thread_loop_unlock(c->context.loop); + + if (res < 0) return NULL; - return (jack_port_t *) &c->ports[port_id]; + return (jack_port_t *) p; } int jack_port_unregister (jack_client_t *client, jack_port_t *port) { struct client *c = (struct client *) client; struct port *p = (struct port *) port; + int res; pw_log_debug("client %p: port unregister %p", client, port); - p->valid = false; + pw_thread_loop_lock(c->context.loop); + free_port(c, p); pw_client_node_proxy_port_update(c->node_proxy, p->direction, p->id, 0, 0, NULL, NULL); - if (do_sync(c) < 0) - return -1; + res = do_sync(c); - return 0; + pw_thread_loop_unlock(c->context.loop); + + return res; } void * jack_port_get_buffer (jack_port_t *port, jack_nframes_t frames) { - return 0; + struct port *p = (struct port *) port; + struct client *c = p->client; + struct buffer *b; + struct spa_io_buffers *io; + + pw_log_trace("port %p: get buffer", p); + + if (p->direction == SPA_DIRECTION_INPUT) { + io = &c->trans->inputs[p->id]; + if (io->status != SPA_STATUS_HAVE_BUFFER) + return NULL; + + b = &p->buffers[io->buffer_id]; + io->status = SPA_STATUS_NEED_BUFFER; + } else { + b = dequeue_buffer(p); + if (b == NULL) { + pw_log_warn("port %p: out of buffers", p); + return NULL; + } + io = &c->trans->outputs[p->id]; + io->status = SPA_STATUS_HAVE_BUFFER; + io->buffer_id = b->id; + } + return b->datas[0].data; } jack_uuid_t jack_port_uuid (const jack_port_t *port) @@ -975,34 +1663,46 @@ int jack_connect (jack_client_t *client, struct client *c = (struct client *) client; struct port *src, *dst; struct pw_properties *props; + int res; pw_log_debug("client %p: connect %s %s", client, source_port, destination_port); + pw_thread_loop_lock(c->context.loop); + src = find_port(c, source_port); dst = find_port(c, destination_port); - if (src == NULL || dst == NULL) - return -1; + if (src == NULL || dst == NULL) { + res = -ENOENT; + goto exit; + } props = pw_properties_new(NULL, NULL); + if (props == NULL) { + res = -ENOMEM; + goto exit; + } + pw_properties_setf(props, PW_LINK_OUTPUT_NODE_ID, "%d", src->parent_id); pw_properties_setf(props, PW_LINK_OUTPUT_PORT_ID, "%d", src->id); pw_properties_setf(props, PW_LINK_INPUT_NODE_ID, "%d", dst->parent_id); pw_properties_setf(props, PW_LINK_INPUT_PORT_ID, "%d", dst->id); pw_core_proxy_create_object(c->core_proxy, - "link-factory", - c->context.t->link, - PW_VERSION_LINK, - &props->dict, - 0); + "link-factory", + c->context.t->link, + PW_VERSION_LINK, + &props->dict, + 0); pw_properties_free(props); - if (do_sync(c) < 0) - return -1; + res = do_sync(c); - return 0; + exit: + pw_thread_loop_unlock(c->context.loop); + + return res; } int jack_disconnect (jack_client_t *client, @@ -1077,6 +1777,8 @@ const char ** jack_get_ports (jack_client_t *client, int count = 0; struct port *p; + pw_thread_loop_lock(c->context.loop); + pw_array_for_each(p, &c->context.ports) { if (!p->valid) continue; @@ -1086,6 +1788,7 @@ const char ** jack_get_ports (jack_client_t *client, res[count++] = p->name; } res[count] = NULL; + pw_thread_loop_unlock(c->context.loop); return res; }