pipewire-jack: improve processing

This commit is contained in:
Wim Taymans 2018-02-26 17:31:22 +01:00
parent 895ec3865d
commit a7954ea908

View file

@ -47,7 +47,7 @@
#define MAX_OBJECTS 8192
#define MAX_PORTS (PORT_NUM_FOR_CLIENT/2)
#define MAX_BUFFERS 2
#define MAX_BUFFER_DATAS 4
#define MAX_BUFFER_DATAS 4u
#define MAX_BUFFER_MEMS 4
@ -143,6 +143,7 @@ struct port {
struct buffer buffers[MAX_BUFFERS];
uint32_t n_buffers;
struct spa_io_buffers *io;
struct spa_list queue;
};
@ -233,6 +234,9 @@ struct client {
struct pw_array mems;
float empty[BUFFER_SIZE_MAX + 8];
bool started;
int status;
};
static struct object * alloc_object(struct client *c)
@ -263,7 +267,7 @@ static void free_object(struct client *c, struct object *o)
static struct port * alloc_port(struct client *c, enum spa_direction direction)
{
int i;
uint32_t i;
struct port *p;
struct object *o;
@ -393,8 +397,10 @@ static void on_state_changed(void *data, enum pw_remote_state old,
case PW_REMOTE_STATE_UNCONNECTED:
if (client->shutdown_callback)
client->shutdown_callback(client->shutdown_arg);
/* fallthrough*/
case PW_REMOTE_STATE_ERROR:
client->error = true;
/* fallthrough*/
case PW_REMOTE_STATE_CONNECTED:
pw_thread_loop_signal(client->context.loop, false);
break;
@ -452,7 +458,6 @@ static struct mem *find_mem(struct pw_array *mems, uint32_t id)
return NULL;
}
#if 0
static void *mem_map(struct client *c, struct mem *m, uint32_t offset, uint32_t size)
{
if (m->ptr == NULL) {
@ -469,7 +474,6 @@ static void *mem_map(struct client *c, struct mem *m, uint32_t offset, uint32_t
}
return SPA_MEMBER(m->ptr, m->map.start, void);
}
#endif
static void mem_unmap(struct client *c, struct mem *m)
{
@ -558,7 +562,7 @@ static void reuse_buffer(struct client *c, struct port *p, uint32_t id)
struct buffer *b = &p->buffers[id];
if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) {
pw_log_trace(NAME" %p: recycle buffer %d", c, id);
pw_log_trace(NAME" %p: port %p: recycle buffer %d", c, p, id);
spa_list_append(&p->queue, &b->link);
SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
}
@ -567,6 +571,7 @@ static void reuse_buffer(struct client *c, struct port *p, uint32_t id)
static inline void send_need_input(struct client *c)
{
uint64_t cmd = 1;
pw_log_trace("send need input");
pw_client_node_transport_add_message(c->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(c->writefd, &cmd, 8);
@ -575,11 +580,63 @@ static inline void send_need_input(struct client *c)
static inline void send_have_output(struct client *c)
{
uint64_t cmd = 1;
pw_log_trace("send have output");
pw_client_node_transport_add_message(c->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(c->writefd, &cmd, 8);
}
static int do_process_input(struct client *c)
{
if (c->status == SPA_STATUS_HAVE_BUFFER)
return SPA_STATUS_HAVE_BUFFER;
if (c->process_callback)
c->process_callback(c->buffer_size, c->process_arg);
return c->status = SPA_STATUS_HAVE_BUFFER;
}
static int do_process_output(struct client *c)
{
uint32_t i;
for (i = 0; i < c->last_out_port; i++) {
struct port *p = GET_OUT_PORT(c, i);
struct spa_io_buffers *io = p->io;
pw_log_trace("port %p: %d %d %d", p, p->valid, io->status, io->buffer_id);
if (!p->valid || io == NULL)
continue;
if (io->status != SPA_STATUS_NEED_BUFFER)
continue;
if (io->buffer_id < p->n_buffers)
reuse_buffer(c, p, io->buffer_id);
io->status = SPA_STATUS_NEED_BUFFER;
io->buffer_id = SPA_ID_INVALID;
}
c->status = SPA_STATUS_HAVE_BUFFER;
for (i = 0; i < c->last_in_port; i++) {
struct port *p = GET_IN_PORT(c, i);
struct spa_io_buffers *io = p->io;
if (!p->valid || io == NULL)
continue;
c->status = SPA_STATUS_NEED_BUFFER;
}
if (c->status == SPA_STATUS_HAVE_BUFFER) {
if (c->process_callback)
c->process_callback(c->buffer_size, c->process_arg);
}
return c->status;
}
static void handle_rtnode_message(struct client *c, struct pw_client_node_message *message)
{
pw_log_trace("node message %d", PW_CLIENT_NODE_MESSAGE_TYPE(message));
@ -587,36 +644,29 @@ static void handle_rtnode_message(struct client *c, struct pw_client_node_messag
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
{
if (c->process_callback)
c->process_callback(c->buffer_size, c->process_arg);
switch (do_process_input(c)) {
case SPA_STATUS_HAVE_BUFFER:
send_have_output(c);
break;
case SPA_STATUS_NEED_BUFFER:
send_need_input(c);
break;
default:
break;
}
break;
}
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
{
int i;
for (i = 0; i < c->last_out_port; i++) {
struct port *p = &c->out_ports[i];
struct spa_io_buffers *output;
if (!p->valid)
continue;
output = &c->trans->outputs[p->id];
if (output->buffer_id == SPA_ID_INVALID)
continue;
reuse_buffer(c, p, output->buffer_id);
output->buffer_id = SPA_ID_INVALID;
}
if (c->n_in_ports > 0) {
send_need_input(c);
} else {
if (c->process_callback)
c->process_callback(c->buffer_size, c->process_arg);
switch (do_process_output(c)) {
case SPA_STATUS_HAVE_BUFFER:
send_have_output(c);
break;
case SPA_STATUS_NEED_BUFFER:
send_need_input(c);
break;
default:
break;
}
break;
}
@ -626,13 +676,12 @@ static void handle_rtnode_message(struct client *c, struct pw_client_node_messag
(struct pw_client_node_message_port_reuse_buffer *) message;
struct port *p;
uint32_t port_id = rb->body.port_id.value;
uint32_t buffer_id = rb->body.buffer_id.value;
p = &c->out_ports[port_id];
p = GET_OUT_PORT(c, port_id);
if (buffer_id < p->n_buffers)
reuse_buffer(c, p, buffer_id);
if (!p->valid)
return;
reuse_buffer(c, p, rb->body.buffer_id.value);
break;
}
default:
@ -693,6 +742,7 @@ static void client_node_transport(void *object,
{
struct client *c = (struct client *) object;
struct pw_core *core = c->context.core;
uint32_t i;
clean_transport(c);
@ -702,6 +752,15 @@ static void client_node_transport(void *object,
pw_log_debug("client %p: create client transport %p with fds %d %d for node %u",
c, c->trans, readfd, writefd, node_id);
for (i = 0; i < c->trans->area->max_input_ports; i++) {
struct port *p = GET_IN_PORT(c, i);
p->io = &c->trans->inputs[i];
}
for (i = 0; i < c->trans->area->max_output_ports; i++) {
struct port *p = GET_OUT_PORT(c, i);
p->io = &c->trans->outputs[i];
}
c->writefd = writefd;
c->socket_source = pw_loop_add_io(core->data_loop,
readfd,
@ -727,17 +786,33 @@ static void client_node_command(void *object, uint32_t seq, const struct spa_com
struct pw_type *t = c->context.t;
if (SPA_COMMAND_TYPE(command) == t->command_node.Pause) {
pw_client_node_proxy_done(c->node_proxy, seq, 0);
if (c->started) {
pw_loop_update_io(c->context.core->data_loop,
c->socket_source, SPA_IO_ERR | SPA_IO_HUP);
} else if (SPA_COMMAND_TYPE(command) == t->command_node.Start) {
c->started = false;
}
pw_client_node_proxy_done(c->node_proxy, seq, 0);
} else if (SPA_COMMAND_TYPE(command) == t->command_node.Start) {
if (!c->started) {
pw_loop_update_io(c->context.core->data_loop,
c->socket_source,
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
switch (do_process_output(c)) {
case SPA_STATUS_HAVE_BUFFER:
send_have_output(c);
break;
case SPA_STATUS_NEED_BUFFER:
send_need_input(c);
break;
default:
break;
}
c->started = true;
}
pw_client_node_proxy_done(c->node_proxy, seq, 0);
} else {
pw_log_warn("unhandled node command %d", SPA_COMMAND_TYPE(command));
pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP);
@ -765,7 +840,7 @@ static void client_node_remove_port(void *object,
static void clear_buffers(struct client *c, struct port *p)
{
struct buffer *b;
int i, j;
uint32_t i, j;
pw_log_debug(NAME" %p: port %p clear buffers", c, p);
@ -1035,7 +1110,39 @@ static void client_node_port_set_io(void *object,
uint32_t size)
{
struct client *c = (struct client *) object;
pw_client_node_proxy_done(c->node_proxy, seq, -ENOTSUP);
struct port *p = GET_PORT(c, direction, port_id);
struct pw_type *t = c->context.t;
struct mem *m;
void *ptr;
int res;
if (mem_id == SPA_ID_INVALID) {
ptr = NULL;
size = 0;
}
else {
m = find_mem(&c->mems, mem_id);
if (m == NULL) {
pw_log_warn("unknown memory id %u", mem_id);
res = -EINVAL;
goto exit;
}
if ((ptr = mem_map(c, m, offset, size)) == NULL) {
res = -errno;
goto exit;
}
}
if (id == t->io.Buffers) {
p->io = ptr;
}
pw_log_debug("port %p: set io id %u %p %d %d", p, id, ptr, p->io->status, p->io->buffer_id);
res = 0;
exit:
pw_client_node_proxy_done(c->node_proxy, seq, res);
}
static const struct pw_client_node_proxy_events client_node_events = {
@ -1783,34 +1890,42 @@ void * jack_port_get_buffer (jack_port_t *port, jack_nframes_t frames)
struct port *p;
struct buffer *b;
struct spa_io_buffers *io;
pw_log_trace("port %p: get buffer", port);
int status;
if (o->type != c->context.t->port || o->port.port_id == SPA_ID_INVALID) {
pw_log_error("client %p: invalid port %p", c, port);
return NULL;
}
p = GET_PORT(c, GET_DIRECTION(o->port.flags), o->port.port_id);
if (p->n_buffers == 0)
return c->empty;
io = p->io;
status = io->status;
pw_log_trace("port %p: get buffer %d %d", p, status, io->buffer_id);
if (p->direction == SPA_DIRECTION_INPUT) {
io = &c->trans->inputs[p->id];
if (io->status != SPA_STATUS_HAVE_BUFFER)
io->status = SPA_STATUS_NEED_BUFFER;
if (io->buffer_id >= p->n_buffers)
return c->empty;
b = &p->buffers[io->buffer_id];
io->status = SPA_STATUS_NEED_BUFFER;
} else {
io->status = SPA_STATUS_HAVE_BUFFER;
if ((b = dequeue_buffer(p)) == NULL) {
pw_log_warn("port %p: out of buffers", p);
return c->empty;
goto empty_out;
}
reuse_buffer(c, p, b->id);
}
io = &c->trans->outputs[p->id];
io->status = SPA_STATUS_HAVE_BUFFER;
io->buffer_id = b->id;
}
pw_log_trace("port %p: get buffer %d %d", p, io->status, io->buffer_id);
return b->datas[0].data;
empty_out:
io->buffer_id = SPA_ID_INVALID;
return c->empty;
}
jack_uuid_t jack_port_uuid (const jack_port_t *port)