From a7954ea908b701f1e4e7a1da8b1f8e43f3c8eac6 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 26 Feb 2018 17:31:22 +0100 Subject: [PATCH] pipewire-jack: improve processing --- src/pipewire-jack.c | 221 +++++++++++++++++++++++++++++++++----------- 1 file changed, 168 insertions(+), 53 deletions(-) diff --git a/src/pipewire-jack.c b/src/pipewire-jack.c index 87fcd3c15..fe07267e4 100644 --- a/src/pipewire-jack.c +++ b/src/pipewire-jack.c @@ -47,7 +47,7 @@ #define MAX_OBJECTS 8192 #define MAX_PORTS (PORT_NUM_FOR_CLIENT/2) #define MAX_BUFFERS 2 -#define MAX_BUFFER_DATAS 4 +#define MAX_BUFFER_DATAS 4u #define MAX_BUFFER_MEMS 4 @@ -143,6 +143,7 @@ struct port { struct buffer buffers[MAX_BUFFERS]; uint32_t n_buffers; + struct spa_io_buffers *io; struct spa_list queue; }; @@ -233,6 +234,9 @@ struct client { struct pw_array mems; float empty[BUFFER_SIZE_MAX + 8]; + + bool started; + int status; }; static struct object * alloc_object(struct client *c) @@ -263,7 +267,7 @@ static void free_object(struct client *c, struct object *o) static struct port * alloc_port(struct client *c, enum spa_direction direction) { - int i; + uint32_t i; struct port *p; struct object *o; @@ -393,8 +397,10 @@ static void on_state_changed(void *data, enum pw_remote_state old, case PW_REMOTE_STATE_UNCONNECTED: if (client->shutdown_callback) client->shutdown_callback(client->shutdown_arg); + /* fallthrough*/ case PW_REMOTE_STATE_ERROR: client->error = true; + /* fallthrough*/ case PW_REMOTE_STATE_CONNECTED: pw_thread_loop_signal(client->context.loop, false); break; @@ -452,7 +458,6 @@ static struct mem *find_mem(struct pw_array *mems, uint32_t id) return NULL; } -#if 0 static void *mem_map(struct client *c, struct mem *m, uint32_t offset, uint32_t size) { if (m->ptr == NULL) { @@ -469,7 +474,6 @@ static void *mem_map(struct client *c, struct mem *m, uint32_t offset, uint32_t } return SPA_MEMBER(m->ptr, m->map.start, void); } -#endif static void mem_unmap(struct client *c, struct mem *m) { @@ -558,7 +562,7 @@ static void reuse_buffer(struct client *c, struct port *p, uint32_t id) struct buffer *b = &p->buffers[id]; if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { - pw_log_trace(NAME" %p: recycle buffer %d", c, id); + pw_log_trace(NAME" %p: port %p: recycle buffer %d", c, p, id); spa_list_append(&p->queue, &b->link); SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); } @@ -567,6 +571,7 @@ static void reuse_buffer(struct client *c, struct port *p, uint32_t id) static inline void send_need_input(struct client *c) { uint64_t cmd = 1; + pw_log_trace("send need input"); pw_client_node_transport_add_message(c->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); write(c->writefd, &cmd, 8); @@ -575,11 +580,63 @@ static inline void send_need_input(struct client *c) static inline void send_have_output(struct client *c) { uint64_t cmd = 1; + pw_log_trace("send have output"); pw_client_node_transport_add_message(c->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); write(c->writefd, &cmd, 8); } +static int do_process_input(struct client *c) +{ + if (c->status == SPA_STATUS_HAVE_BUFFER) + return SPA_STATUS_HAVE_BUFFER; + + if (c->process_callback) + c->process_callback(c->buffer_size, c->process_arg); + + return c->status = SPA_STATUS_HAVE_BUFFER; +} + +static int do_process_output(struct client *c) +{ + uint32_t i; + + for (i = 0; i < c->last_out_port; i++) { + struct port *p = GET_OUT_PORT(c, i); + struct spa_io_buffers *io = p->io; + + pw_log_trace("port %p: %d %d %d", p, p->valid, io->status, io->buffer_id); + + if (!p->valid || io == NULL) + continue; + + if (io->status != SPA_STATUS_NEED_BUFFER) + continue; + + if (io->buffer_id < p->n_buffers) + reuse_buffer(c, p, io->buffer_id); + + io->status = SPA_STATUS_NEED_BUFFER; + io->buffer_id = SPA_ID_INVALID; + } + + c->status = SPA_STATUS_HAVE_BUFFER; + for (i = 0; i < c->last_in_port; i++) { + struct port *p = GET_IN_PORT(c, i); + struct spa_io_buffers *io = p->io; + + if (!p->valid || io == NULL) + continue; + + c->status = SPA_STATUS_NEED_BUFFER; + } + if (c->status == SPA_STATUS_HAVE_BUFFER) { + if (c->process_callback) + c->process_callback(c->buffer_size, c->process_arg); + } + return c->status; +} + static void handle_rtnode_message(struct client *c, struct pw_client_node_message *message) { pw_log_trace("node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message)); @@ -587,36 +644,29 @@ static void handle_rtnode_message(struct client *c, struct pw_client_node_messag switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: { - if (c->process_callback) - c->process_callback(c->buffer_size, c->process_arg); - - send_have_output(c); + switch (do_process_input(c)) { + case SPA_STATUS_HAVE_BUFFER: + send_have_output(c); + break; + case SPA_STATUS_NEED_BUFFER: + send_need_input(c); + break; + default: + break; + } break; } case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: { - int i; - - for (i = 0; i < c->last_out_port; i++) { - struct port *p = &c->out_ports[i]; - struct spa_io_buffers *output; - - if (!p->valid) - continue; - - output = &c->trans->outputs[p->id]; - if (output->buffer_id == SPA_ID_INVALID) - continue; - - reuse_buffer(c, p, output->buffer_id); - output->buffer_id = SPA_ID_INVALID; - } - if (c->n_in_ports > 0) { - send_need_input(c); - } else { - if (c->process_callback) - c->process_callback(c->buffer_size, c->process_arg); + switch (do_process_output(c)) { + case SPA_STATUS_HAVE_BUFFER: send_have_output(c); + break; + case SPA_STATUS_NEED_BUFFER: + send_need_input(c); + break; + default: + break; } break; } @@ -626,13 +676,12 @@ static void handle_rtnode_message(struct client *c, struct pw_client_node_messag (struct pw_client_node_message_port_reuse_buffer *) message; struct port *p; uint32_t port_id = rb->body.port_id.value; + uint32_t buffer_id = rb->body.buffer_id.value; - p = &c->out_ports[port_id]; + p = GET_OUT_PORT(c, port_id); + if (buffer_id < p->n_buffers) + reuse_buffer(c, p, buffer_id); - if (!p->valid) - return; - - reuse_buffer(c, p, rb->body.buffer_id.value); break; } default: @@ -693,6 +742,7 @@ static void client_node_transport(void *object, { struct client *c = (struct client *) object; struct pw_core *core = c->context.core; + uint32_t i; clean_transport(c); @@ -702,6 +752,15 @@ static void client_node_transport(void *object, pw_log_debug("client %p: create client transport %p with fds %d %d for node %u", c, c->trans, readfd, writefd, node_id); + for (i = 0; i < c->trans->area->max_input_ports; i++) { + struct port *p = GET_IN_PORT(c, i); + p->io = &c->trans->inputs[i]; + } + for (i = 0; i < c->trans->area->max_output_ports; i++) { + struct port *p = GET_OUT_PORT(c, i); + p->io = &c->trans->outputs[i]; + } + c->writefd = writefd; c->socket_source = pw_loop_add_io(core->data_loop, readfd, @@ -727,17 +786,33 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com struct pw_type *t = c->context.t; if (SPA_COMMAND_TYPE(command) == t->command_node.Pause) { - pw_client_node_proxy_done(c->node_proxy, seq, 0); + if (c->started) { + pw_loop_update_io(c->context.core->data_loop, + c->socket_source, SPA_IO_ERR | SPA_IO_HUP); - pw_loop_update_io(c->context.core->data_loop, - c->socket_source, SPA_IO_ERR | SPA_IO_HUP); + c->started = false; + } + pw_client_node_proxy_done(c->node_proxy, seq, 0); } else if (SPA_COMMAND_TYPE(command) == t->command_node.Start) { - pw_client_node_proxy_done(c->node_proxy, seq, 0); + if (!c->started) { + pw_loop_update_io(c->context.core->data_loop, + c->socket_source, + SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); - pw_loop_update_io(c->context.core->data_loop, - c->socket_source, - SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); + switch (do_process_output(c)) { + case SPA_STATUS_HAVE_BUFFER: + send_have_output(c); + break; + case SPA_STATUS_NEED_BUFFER: + send_need_input(c); + break; + default: + break; + } + c->started = true; + } + pw_client_node_proxy_done(c->node_proxy, seq, 0); } else { pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command)); pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); @@ -765,7 +840,7 @@ static void client_node_remove_port(void *object, static void clear_buffers(struct client *c, struct port *p) { struct buffer *b; - int i, j; + uint32_t i, j; pw_log_debug(NAME" %p: port %p clear buffers", c, p); @@ -1035,7 +1110,39 @@ static void client_node_port_set_io(void *object, uint32_t size) { struct client *c = (struct client *) object; - pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP); + struct port *p = GET_PORT(c, direction, port_id); + struct pw_type *t = c->context.t; + struct mem *m; + void *ptr; + int res; + + if (mem_id == SPA_ID_INVALID) { + ptr = NULL; + size = 0; + } + else { + m = find_mem(&c->mems, mem_id); + if (m == NULL) { + pw_log_warn("unknown memory id %u", mem_id); + res = -EINVAL; + goto exit; + } + if ((ptr = mem_map(c, m, offset, size)) == NULL) { + res = -errno; + goto exit; + } + } + + + if (id == t->io.Buffers) { + p->io = ptr; + } + pw_log_debug("port %p: set io id %u %p %d %d", p, id, ptr, p->io->status, p->io->buffer_id); + + res = 0; + + exit: + pw_client_node_proxy_done(c->node_proxy, seq, res); } static const struct pw_client_node_proxy_events client_node_events = { @@ -1783,34 +1890,42 @@ void * jack_port_get_buffer (jack_port_t *port, jack_nframes_t frames) struct port *p; struct buffer *b; struct spa_io_buffers *io; - - pw_log_trace("port %p: get buffer", port); + int status; if (o->type != c->context.t->port || o->port.port_id == SPA_ID_INVALID) { pw_log_error("client %p: invalid port %p", c, port); return NULL; } p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id); + if (p->n_buffers == 0) return c->empty; + io = p->io; + status = io->status; + pw_log_trace("port %p: get buffer %d %d", p, status, io->buffer_id); + if (p->direction == SPA_DIRECTION_INPUT) { - io = &c->trans->inputs[p->id]; - if (io->status != SPA_STATUS_HAVE_BUFFER) + io->status = SPA_STATUS_NEED_BUFFER; + if (io->buffer_id >= p->n_buffers) return c->empty; b = &p->buffers[io->buffer_id]; - io->status = SPA_STATUS_NEED_BUFFER; } else { + io->status = SPA_STATUS_HAVE_BUFFER; if ((b = dequeue_buffer(p)) == NULL) { pw_log_warn("port %p: out of buffers", p); - return c->empty; + goto empty_out; } - io = &c->trans->outputs[p->id]; - io->status = SPA_STATUS_HAVE_BUFFER; - io->buffer_id = b->id; + reuse_buffer(c, p, b->id); } + io->buffer_id = b->id; + pw_log_trace("port %p: get buffer %d %d", p, io->status, io->buffer_id); return b->datas[0].data; + + empty_out: + io->buffer_id = SPA_ID_INVALID; + return c->empty; } jack_uuid_t jack_port_uuid (const jack_port_t *port)