diff --git a/src/modules/module-audio-dsp.c b/src/modules/module-audio-dsp.c index f868339ca..242710e58 100644 --- a/src/modules/module-audio-dsp.c +++ b/src/modules/module-audio-dsp.c @@ -76,6 +76,8 @@ struct impl { }; struct buffer { +#define BUFFER_FLAG_OUT (1<<0) + uint32_t flags; struct spa_list link; struct spa_buffer *outbuf; void *ptr; @@ -91,8 +93,6 @@ struct port { #define PORT_FLAG_MIDI (1<<2) uint32_t flags; - struct spa_node mix_node; - struct spa_port_info info; struct spa_io_buffers *io; @@ -214,8 +214,11 @@ static int node_remove_port(struct spa_node *node, enum spa_direction direction, static void recycle_buffer(struct node *n, struct port *p, uint32_t id) { struct buffer *b = &p->buffers[id]; - pw_log_trace("recycle buffer %d", id); - spa_list_append(&p->queue, &b->link); + if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { + pw_log_trace("recycle buffer %d", id); + spa_list_append(&p->queue, &b->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + } } static int clear_buffers(struct node *n, struct port *p) @@ -228,22 +231,10 @@ static int clear_buffers(struct node *n, struct port *p) return 0; } -static struct buffer * dequeue_buffer(struct node *n, struct port *p) -{ - struct buffer *b; - - if (spa_list_is_empty(&p->queue)) - return NULL; - - b = spa_list_first(&p->queue, struct buffer, link); - spa_list_remove(&b->link); - - return b; -} - -static void conv_f32_s16(int16_t *out, float *in, int n_samples, int stride) +static void conv_f32_s16(int16_t *out, int index, float *in, int n_samples, int stride) { int i; + out += index; for (i = 0; i < n_samples; i++) { if (in[i] < -1.0f) *out = -32767; @@ -254,22 +245,39 @@ static void conv_f32_s16(int16_t *out, float *in, int n_samples, int stride) out += stride; } } -static void fill_s16(int16_t *out, int n_samples, int stride) +static void fill_s16(int16_t *out, int index, int n_samples, int stride) { int i; + out += index; for (i = 0; i < n_samples; i++) { *out = 0; out += stride; } } -#if 0 static void add_f32(float *out, float *in, int n_samples) { int i; for (i = 0; i < n_samples; i++) out[i] += in[i]; } -#endif + +static struct buffer * peek_buffer(struct node *n, struct port *p) +{ + if (spa_list_is_empty(&p->queue)) + return NULL; + return spa_list_first(&p->queue, struct buffer, link); +} + +static struct buffer * dequeue_buffer(struct node *n, struct port *p) +{ + struct buffer *b; + + if ((b = peek_buffer(n, p)) != NULL) { + spa_list_remove(&b->link); + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + } + return b; +} static int node_process_input(struct spa_node *node) { @@ -278,8 +286,6 @@ static int node_process_input(struct spa_node *node) struct port *outp = GET_OUT_PORT(n, 0); struct spa_io_buffers *outio = outp->io; struct buffer *out; - int16_t *op; - int i; pw_log_trace(NAME " %p: process input", this); @@ -295,26 +301,6 @@ static int node_process_input(struct spa_node *node) outio->buffer_id = out->outbuf->id; outio->status = SPA_STATUS_HAVE_BUFFER; - op = out->ptr; - - for (i = 0; i < n->n_in_ports; i++) { - struct port *inp = GET_IN_PORT(n, i); - struct spa_io_buffers *inio = inp->io; - struct buffer *in; - int stride = 2; - - pw_log_trace(NAME" %p: mix %d %p %d %d", this, i, inio, inio->buffer_id, n->buffer_size); - if (inio->buffer_id < inp->n_buffers && inio->status == SPA_STATUS_HAVE_BUFFER) { - in = &inp->buffers[inio->buffer_id]; - conv_f32_s16(op, in->ptr, n->buffer_size, stride); - } - else { - fill_s16(op, n->buffer_size, stride); - } - op++; - inio->status = SPA_STATUS_NEED_BUFFER; - } - out->outbuf->datas[0].chunk->offset = 0; out->outbuf->datas[0].chunk->size = n->buffer_size * sizeof(int16_t) * 2; out->outbuf->datas[0].chunk->stride = 0; @@ -339,6 +325,7 @@ static int node_process_output(struct spa_node *node) recycle_buffer(n, outp, outio->buffer_id); outio->buffer_id = SPA_ID_INVALID; } + outio->status = SPA_STATUS_OK; for (i = 0; i < n->n_in_ports; i++) { struct port *inp = GET_IN_PORT(n, i); @@ -349,10 +336,9 @@ static int node_process_output(struct spa_node *node) inio->status = SPA_STATUS_NEED_BUFFER; } - return outio->status = SPA_STATUS_NEED_BUFFER; + return SPA_STATUS_NEED_BUFFER; } - static int port_set_io(struct spa_node *node, enum spa_direction direction, uint32_t port_id, uint32_t id, void *data, size_t size) @@ -546,6 +532,7 @@ static int port_use_buffers(struct spa_node *node, enum spa_direction direction, struct spa_data *d = buffers[i]->datas; b = &p->buffers[i]; + b->flags = 0; b->outbuf = buffers[i]; if ((d[0].type == t->data.MemPtr || d[0].type == t->data.MemFd || @@ -629,63 +616,98 @@ static const struct spa_node node_impl = { .process_output = node_process_output, }; -#if 0 + static int schedule_mix_input(struct spa_node *_node) { - struct port *p = SPA_CONTAINER_OF(_node, struct port, mix_node); - struct pw_port *port = p->port; + struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node); + struct port *p = port->owner_data; + struct node *n = p->node; + struct port *outp = GET_OUT_PORT(n, 0); struct spa_graph_node *node = &port->rt.mix_node; struct spa_graph_port *gp; struct spa_io_buffers *io = port->rt.mix_port.io; size_t buffer_size = p->node->buffer_size; + struct buffer *outb; + float *out = NULL; int layer = 0; + int stride = n->n_in_ports; spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) { - struct pw_link *link = gp->scheduler_data; - struct spa_buffer *inbuf; + struct pw_port_mix *mix = SPA_CONTAINER_OF(gp, struct pw_port_mix, port); - pw_log_trace("mix %p: input %d %d", node, gp->io->buffer_id, link->output->n_buffers); - - if (!(gp->io->buffer_id < link->output->n_buffers && gp->io->status == SPA_STATUS_HAVE_BUFFER)) + if (gp->flags & SPA_GRAPH_PORT_FLAG_DISABLED) continue; - inbuf = link->output->buffers[gp->io->buffer_id]; + pw_log_trace("mix %p: input %d %d/%d", node, + gp->io->status, gp->io->buffer_id, mix->n_buffers); - if (layer++ == 0) - memcpy(p->buffers[0].ptr, inbuf->datas[0].data, buffer_size * sizeof(float)); - else - add_f32(p->buffers[0].ptr, inbuf->datas[0].data, buffer_size); + if (!(gp->io->buffer_id < mix->n_buffers && gp->io->status == SPA_STATUS_HAVE_BUFFER)) + continue; + + if (layer++ == 0) { + io->buffer_id = gp->io->buffer_id; + io->status = SPA_STATUS_HAVE_BUFFER; + out = mix->buffers[io->buffer_id]->datas[0].data; + } + else { + add_f32(out, mix->buffers[gp->io->buffer_id]->datas[0].data, buffer_size); + } pw_log_trace("mix %p: input %p %p->%p %d %d", node, gp, gp->io, io, gp->io->status, gp->io->buffer_id); - *io = *gp->io; - io->buffer_id = 0; + gp->io->status = SPA_STATUS_OK; gp->io->buffer_id = SPA_ID_INVALID; } + + outb = peek_buffer(n, outp); + if (out == NULL) + fill_s16(outb->ptr, port->port_id, buffer_size, stride); + else + conv_f32_s16(outb->ptr, port->port_id, out, buffer_size, stride); + + return SPA_STATUS_HAVE_BUFFER; } static int schedule_mix_output(struct spa_node *_node) { - struct port *p = SPA_CONTAINER_OF(_node, struct port, mix_node); - struct pw_port *port = p->port; + struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node); + struct port *p = port->owner_data; struct spa_graph_node *node = &port->rt.mix_node; struct spa_graph_port *gp; struct spa_io_buffers *io = port->rt.mix_port.io; - spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) + spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) { + if (gp->flags & SPA_GRAPH_PORT_FLAG_DISABLED) + continue; *gp->io = *io; + } return io->status; } +static int schedule_mix_use_buffers(struct spa_node *_node, + enum spa_direction direction, + uint32_t port_id, + struct spa_buffer **buffers, + uint32_t n_buffers) +{ + struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node); + struct port *p = port->owner_data; + + pw_log_debug("port %p: %d use buffers %d %p", port, port_id, n_buffers, buffers); + + return 0; +} + + static const struct spa_node schedule_mix_node = { SPA_VERSION_NODE, NULL, + .port_use_buffers = schedule_mix_use_buffers, .process_input = schedule_mix_input, .process_output = schedule_mix_output, }; -#endif static void port_free(void *data) { @@ -722,10 +744,13 @@ static struct port *make_port(struct node *n, enum pw_direction direction, p->node = n; p->flags = flags; spa_list_init(&p->queue); + port->owner_data = p; if (direction == PW_DIRECTION_INPUT) { n->in_ports[id] = p; n->n_in_ports++; + port->mix_node = schedule_mix_node; + port->mix = &port->mix_node; } else { n->out_ports[id] = p; n->n_out_ports++; diff --git a/src/pipewire/core.c b/src/pipewire/core.c index da3a1ebf4..76055f859 100644 --- a/src/pipewire/core.c +++ b/src/pipewire/core.c @@ -33,8 +33,6 @@ #include #include -#include - /** \cond */ struct resource_data { struct spa_hook resource_listener; @@ -386,9 +384,6 @@ struct pw_core *pw_core_new(struct pw_loop *main_loop, struct pw_properties *pro pw_type_init(&this->type); pw_map_init(&this->globals, 128, 32); - spa_graph_init(&this->rt.graph); - spa_graph_set_callbacks(&this->rt.graph, &spa_graph_impl_default, NULL); - spa_debug_set_type_map(this->type.map); this->support[0] = SPA_SUPPORT_INIT(SPA_TYPE__TypeMap, this->type.map); diff --git a/src/pipewire/link.c b/src/pipewire/link.c index fca364e0d..7cf119bd2 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -41,6 +41,7 @@ struct impl { bool active; bool have_io; + bool activated; struct pw_work_queue *work; @@ -51,6 +52,8 @@ struct impl { struct spa_hook input_node_listener; struct spa_hook output_port_listener; struct spa_hook output_node_listener; + + struct spa_io_buffers io; }; struct resource_data { @@ -512,22 +515,24 @@ static int select_io(struct pw_link *this) if (impl->have_io) return 0; - io = this->rt.in_port.port.io; + io = this->rt.mix[SPA_DIRECTION_INPUT].port.io; if (io == NULL) - io = this->rt.out_port.port.io; + io = this->rt.mix[SPA_DIRECTION_OUTPUT].port.io; if (io == NULL) - io = &this->io; + io = &impl->io; if (io == NULL) return -EIO; if ((res = port_set_io(this, this->input, io, - sizeof(struct spa_io_buffers), &this->rt.in_port.port)) < 0) + sizeof(struct spa_io_buffers), &this->rt.mix[SPA_DIRECTION_INPUT].port)) < 0) return res; if ((res = port_set_io(this, this->output, io, - sizeof(struct spa_io_buffers), &this->rt.out_port.port)) < 0) + sizeof(struct spa_io_buffers), &this->rt.mix[SPA_DIRECTION_OUTPUT].port)) < 0) return res; + this->io = io; + impl->have_io = true; return 0; @@ -588,11 +593,11 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s goto error; } } else if (in_state == PW_PORT_STATE_READY && out_state > PW_PORT_STATE_READY) { - out_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - in_flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + SPA_FLAG_SET(out_flags, SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS); + SPA_FLAG_UNSET(in_flags, SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS); } else if (out_state == PW_PORT_STATE_READY && in_state > PW_PORT_STATE_READY) { - in_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - out_flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS; + SPA_FLAG_SET(in_flags, SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS); + SPA_FLAG_UNSET(out_flags, SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS); } else { pw_log_debug("link %p: delay allocation, state %d %d", this, in_state, out_state); return 0; @@ -612,7 +617,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s allocation.n_buffers, allocation.buffers); } else if (input->allocation.n_buffers && input->mix == NULL) { out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - in_flags = 0; + in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; allocation = input->allocation; @@ -691,6 +696,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { if ((res = pw_port_alloc_buffers(output, + this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id, params, n_params, allocation.buffers, &allocation.n_buffers)) < 0) { @@ -706,6 +712,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s allocation.n_buffers, allocation.buffers); } else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) { if ((res = pw_port_alloc_buffers(input, + this->rt.mix[SPA_DIRECTION_INPUT].port.port_id, params, n_params, allocation.buffers, &allocation.n_buffers)) < 0) { @@ -724,6 +731,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s pw_log_debug("link %p: using %d buffers %p on output port", this, allocation.n_buffers, allocation.buffers); if ((res = pw_port_use_buffers(output, + this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id, allocation.buffers, allocation.n_buffers)) < 0) { asprintf(&error, "link %p: error use output buffers: %s", this, @@ -735,10 +743,12 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s move_allocation(&allocation, &output->allocation); - } else if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { + } + if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) { pw_log_debug("link %p: using %d buffers %p on input port", this, allocation.n_buffers, allocation.buffers); if ((res = pw_port_use_buffers(input, + this->rt.mix[SPA_DIRECTION_INPUT].port.port_id, allocation.buffers, allocation.n_buffers)) < 0) { asprintf(&error, "link %p: error use input buffers: %s", this, @@ -757,7 +767,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s asprintf(&error, "link %p: error can set io: %s", this, spa_strerror(res)); goto error; } - return 0; + + return res; error: free_allocation(&output->allocation); @@ -771,8 +782,18 @@ do_activate_link(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - SPA_FLAG_UNSET(this->rt.out_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); - SPA_FLAG_UNSET(this->rt.in_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + + pw_log_trace("link %p: activate", this); + SPA_FLAG_UNSET(this->rt.mix[0].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + SPA_FLAG_UNSET(this->rt.mix[1].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + + spa_list_append(&this->output->node->rt.links[SPA_DIRECTION_OUTPUT], + &this->rt.out_node_link); + spa_list_append(&this->input->node->rt.links[SPA_DIRECTION_INPUT], + &this->rt.in_node_link); + + __atomic_add_fetch(&this->input->node->rt.activation->required, 1, __ATOMIC_SEQ_CST); + return 0; } @@ -791,8 +812,12 @@ static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state) input = this->input; output = this->output; - pw_loop_invoke(output->node->data_loop, + if (!impl->activated) { + pw_loop_invoke(output->node->data_loop, do_activate_link, SPA_ID_INVALID, NULL, 0, false, this); + impl->activated = true; + } + if (in_state == PW_PORT_STATE_PAUSED) { if ((res = pw_node_set_state(input->node, PW_NODE_STATE_RUNNING)) < 0) { @@ -899,8 +924,13 @@ output_node_async_complete(void *data, uint32_t seq, int res) static void clear_port_buffers(struct pw_link *link, struct pw_port *port) { - if (spa_list_is_empty(&port->links) && port->allocation.mem == NULL) - pw_port_use_buffers(port, NULL, 0); + int res; + + pw_log_debug("%d %p", spa_list_is_empty(&port->links), port->allocation.mem); + + if ((res = pw_port_use_buffers(port, + link->rt.mix[SPA_DIRECTION_INPUT].port.port_id, NULL, 0)) < 0) + pw_log_warn("link %p: port %p clear error %s", link, port, spa_strerror(res)); } static int @@ -908,7 +938,8 @@ do_remove_input(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - spa_graph_port_remove(&this->rt.in_port.port); + spa_graph_port_unlink(&this->rt.mix[SPA_DIRECTION_INPUT].port); + spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_INPUT].port); return 0; } @@ -923,12 +954,12 @@ static void input_remove(struct pw_link *this, struct pw_port *port) pw_loop_invoke(port->node->data_loop, do_remove_input, 1, NULL, 0, true, this); - pw_port_release_mix(port, &this->rt.in_port); - spa_list_remove(&this->input_link); spa_hook_list_call(&this->input->listener_list, struct pw_port_events, link_removed, this); clear_port_buffers(this, port); + + pw_port_release_mix(port, &this->rt.mix[SPA_DIRECTION_INPUT]); this->input = NULL; } @@ -937,7 +968,8 @@ do_remove_output(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - spa_graph_port_remove(&this->rt.out_port.port); + spa_graph_port_unlink(&this->rt.mix[SPA_DIRECTION_OUTPUT].port); + spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_OUTPUT].port); return 0; } @@ -952,12 +984,12 @@ static void output_remove(struct pw_link *this, struct pw_port *port) pw_loop_invoke(port->node->data_loop, do_remove_output, 1, NULL, 0, true, this); - pw_port_release_mix(port, &this->rt.out_port); - spa_list_remove(&this->output_link); spa_hook_list_call(&this->output->listener_list, struct pw_port_events, link_removed, this); clear_port_buffers(this, port); + + pw_port_release_mix(port, &this->rt.mix[SPA_DIRECTION_OUTPUT]); this->output = NULL; } @@ -985,12 +1017,13 @@ int pw_link_activate(struct pw_link *this) { struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); + pw_log_debug("link %p: activate %d", this, impl->active); + if (impl->active) return 0; impl->active = true; - pw_log_debug("link %p: activate", this); this->output->node->n_used_output_links++; this->input->node->n_used_input_links++; @@ -1005,9 +1038,16 @@ do_deactivate_link(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) { struct pw_link *this = user_data; - pw_log_trace("link %p: disable %p and %p", this, &this->rt.out_port, &this->rt.in_port); - SPA_FLAG_SET(this->rt.out_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); - SPA_FLAG_SET(this->rt.in_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + pw_log_debug("link %p: disable %p and %p", this, &this->rt.mix[0], &this->rt.mix[1]); + + __atomic_sub_fetch(&this->input->node->rt.activation->required, 1, __ATOMIC_SEQ_CST); + + SPA_FLAG_SET(this->rt.mix[0].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + SPA_FLAG_SET(this->rt.mix[1].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + + spa_list_remove(&this->rt.out_node_link); + spa_list_remove(&this->rt.in_node_link); + return 0; } @@ -1016,13 +1056,17 @@ int pw_link_deactivate(struct pw_link *this) struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this); struct pw_node *input_node, *output_node; + pw_log_debug("link %p: deactivate %d", this, impl->active); + if (!impl->active) return 0; impl->active = false; - pw_log_debug("link %p: deactivate", this); - pw_loop_invoke(this->output->node->data_loop, - do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this); + if (impl->activated) { + pw_loop_invoke(this->output->node->data_loop, + do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this); + impl->activated = false; + } input_node = this->input->node; output_node = this->output->node; @@ -1108,11 +1152,7 @@ do_add_link(struct spa_loop *loop, struct pw_link *this = user_data; struct pw_port *port = ((struct pw_port **) data)[0]; - if (port->direction == PW_DIRECTION_OUTPUT) { - spa_graph_port_add(&port->rt.mix_node, &this->rt.out_port.port); - } else { - spa_graph_port_add(&port->rt.mix_node, &this->rt.in_port.port); - } + spa_graph_port_add(&port->rt.mix_node, &this->rt.mix[port->direction].port); return 0; } @@ -1211,16 +1251,17 @@ struct pw_link *pw_link_new(struct pw_core *core, this->info.format = NULL; this->info.props = this->properties ? &this->properties->dict : NULL; - this->io = SPA_IO_BUFFERS_INIT; + impl->io = SPA_IO_BUFFERS_INIT; - pw_port_init_mix(output, &this->rt.out_port); - pw_port_init_mix(input, &this->rt.in_port); + pw_port_init_mix(output, &this->rt.mix[SPA_DIRECTION_OUTPUT]); + pw_port_init_mix(input, &this->rt.mix[SPA_DIRECTION_INPUT]); pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl, - output_node, output->port_id, this->rt.out_port.port.port_id, - input_node, input->port_id, this->rt.in_port.port.port_id); + output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id, + input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id); - spa_graph_port_link(&this->rt.out_port.port, &this->rt.in_port.port); + spa_graph_port_link(&this->rt.mix[SPA_DIRECTION_OUTPUT].port, + &this->rt.mix[SPA_DIRECTION_INPUT].port); /* nodes can be in different data loops so we do this twice */ pw_loop_invoke(output_node->data_loop, do_add_link, diff --git a/src/pipewire/node.c b/src/pipewire/node.c index d2ad2c2d5..7ad5d1707 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -41,6 +41,8 @@ struct impl { struct pw_work_queue *work; bool pause_on_idle; + + struct pw_node_activation activation; }; struct resource_data { @@ -301,17 +303,6 @@ global_bind(void *_data, struct pw_client *client, uint32_t permissions, return; } -static int -do_node_add(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct pw_node *this = user_data; - - spa_graph_node_add(this->rt.graph, &this->rt.node); - - return 0; -} - static void global_destroy(void *data) { struct pw_node *this = data; @@ -347,8 +338,6 @@ int pw_node_register(struct pw_node *this, pw_node_update_ports(this); - pw_loop_invoke(this->data_loop, do_node_add, 1, NULL, 0, false, this); - if ((str = pw_properties_get(this->properties, "media.class")) != NULL) pw_properties_set(properties, "media.class", str); pw_properties_set(properties, "node.name", this->info.name); @@ -382,6 +371,45 @@ int pw_node_register(struct pw_node *this, return 0; } +static void node_have_output(void *data); +static void node_need_input(void *data); + +static int impl_node_process(struct pw_node *node) +{ + struct spa_graph_port *p; + int res = 0; + + pw_log_trace("node %p: process %d", node, node->rt.activation->status); + + if (node->rt.activation->status != SPA_STATUS_HAVE_BUFFER) { + if (!spa_list_is_empty(&node->rt.node.ports[SPA_DIRECTION_INPUT])) { + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) + spa_node_process_input(p->peer->node->implementation); + + if (node->node->process_input) + res = node->rt.activation->status = spa_node_process_input(node->node); + } + else { + if (node->node->process_output) + res = node->rt.activation->status = spa_node_process_output(node->node); + } + + } else if (node->rt.activation->status == SPA_STATUS_HAVE_BUFFER) { + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link) + spa_node_process_output(p->peer->node->implementation); + + if (node->node->process_output) + res = node->rt.activation->status = spa_node_process_output(node->node); + } + + if (res == SPA_STATUS_HAVE_BUFFER) + node_have_output(node); + else if (res == SPA_STATUS_NEED_BUFFER) + node_need_input(node); + + return node->rt.activation->status; +} + static void check_properties(struct pw_node *node) { struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); @@ -427,8 +455,6 @@ struct pw_node *pw_node_new(struct pw_core *core, this->data_loop = core->data_loop; - this->rt.graph = &core->rt.graph; - spa_list_init(&this->resource_list); spa_hook_list_init(&this->listener_list); @@ -442,6 +468,12 @@ struct pw_node *pw_node_new(struct pw_core *core, pw_map_init(&this->output_port_map, 64, 64); spa_graph_node_init(&this->rt.node); + spa_list_init(&this->rt.links[SPA_DIRECTION_INPUT]); + spa_list_init(&this->rt.links[SPA_DIRECTION_OUTPUT]); + this->rt.activation = &impl->activation; + impl->activation.status = SPA_STATUS_NEED_BUFFER; + + this->process = impl_node_process; return this; @@ -520,20 +552,99 @@ static void node_event(void *data, struct spa_event *event) spa_hook_list_call(&node->listener_list, struct pw_node_events, event, event); } + +static void node_process(struct pw_node *node) +{ + pw_log_trace("node %p: pending %d required %d %d", node, + node->rt.activation->pending, node->rt.activation->required, + node->rt.activation->status); + + if (--node->rt.activation->pending == 0) { + if (node->process) + node->rt.activation->status = node->process(node); + else + node->rt.activation->status = SPA_STATUS_OK; + + pw_log_trace("node %p: %d", node, node->rt.activation->status); + } +} + static void node_need_input(void *data) { - struct pw_node *node = data; + struct pw_node *node = data, *n, *pn; + struct spa_list queue, pending; + struct spa_graph_port *p; + pw_log_trace("node %p: need input", node); spa_hook_list_call(&node->listener_list, struct pw_node_events, need_input); - spa_graph_need_input(node->rt.graph, &node->rt.node); + + spa_list_init(&queue); + spa_list_init(&pending); + + node->rt.activation->status = SPA_STATUS_NEED_BUFFER; + spa_list_append(&queue, &node->rt.sched_link); + + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link) + spa_node_process_output(p->peer->node->implementation); + + while (!spa_list_is_empty(&queue)) { + struct pw_link *l; + + n = spa_list_first(&queue, struct pw_node, rt.sched_link); + spa_list_remove(&n->rt.sched_link); + + if (n != node); + spa_list_prepend(&pending, &n->rt.sched_link); + + n->rt.activation->pending = 1; + if (n->rt.activation->status == SPA_STATUS_HAVE_BUFFER) + continue; + + n->rt.activation->pending += n->rt.activation->required; + pw_log_trace("node %p: add %d %d", + n, n->rt.activation->pending, n->rt.activation->required); + + spa_list_for_each(l, &n->rt.links[SPA_DIRECTION_INPUT], rt.in_node_link) { + pn = l->output->node; + + pw_log_trace("node %p: %p in %p %d", n, pn, l, l->io->status); + if (l->io->status == SPA_STATUS_OK) { + n->rt.activation->pending -= 1; + continue; + } + + if (pn->rt.sched_link.next == NULL) + spa_list_append(&queue, &pn->rt.sched_link); + } + } + while (!spa_list_is_empty(&pending)) { + n = spa_list_first(&pending, struct pw_node, rt.sched_link); + spa_list_remove(&n->rt.sched_link); + n->rt.sched_link.next = NULL; + + node_process(n); + } } static void node_have_output(void *data) { - struct pw_node *node = data; + struct pw_node *node = data, *pn; + struct pw_link *l; + struct spa_graph_port *p; + pw_log_trace("node %p: have output", node); - spa_graph_have_output(node->rt.graph, &node->rt.node); + node->rt.activation->status = SPA_STATUS_HAVE_BUFFER; + + spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link) + spa_node_process_input(p->peer->node->implementation); + spa_hook_list_call(&node->listener_list, struct pw_node_events, have_output); + + spa_list_for_each(l, &node->rt.links[SPA_DIRECTION_OUTPUT], rt.out_node_link) { + pn = l->input->node; + pw_log_trace("node %p: %p out %p %d", node, pn, l, l->io->status); + node_process(pn); + } } static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id) @@ -561,7 +672,6 @@ static const struct spa_node_callbacks node_callbacks = { .reuse_buffer = node_reuse_buffer, }; - void pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node) { @@ -586,19 +696,6 @@ void pw_node_add_listener(struct pw_node *node, spa_hook_list_append(&node->listener_list, listener, events, data); } -static int -do_node_remove(struct spa_loop *loop, - bool async, uint32_t seq, const void *data, size_t size, void *user_data) -{ - struct pw_node *this = user_data; - - pause_node(this); - - spa_graph_node_remove(&this->rt.node); - - return 0; -} - /** Destroy a node * \param node a node to destroy * @@ -616,8 +713,9 @@ void pw_node_destroy(struct pw_node *node) pw_log_debug("node %p: destroy", impl); spa_hook_list_call(&node->listener_list, struct pw_node_events, destroy); + pause_node(node); + if (node->registered) { - pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node); spa_list_remove(&node->link); } diff --git a/src/pipewire/port.c b/src/pipewire/port.c index d906c2cef..6e16b7d1a 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -57,18 +57,15 @@ static int schedule_tee_input(struct spa_node *data) struct spa_io_buffers *io = this->rt.mix_port.io; if (!spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) { - pw_log_trace("node %p: tee input %d %d", node, io->status, io->buffer_id); + pw_log_trace("port %p: tee input %d %d", this, io->status, io->buffer_id); spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { - pw_log_trace("node %p: port %p %d %p->%p", node, p, p->flags, io, p->io); + pw_log_trace("port %p: port %d %d %p->%p", this, + p->port_id, p->flags, io, p->io); if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED) continue; *p->io = *io; } - io->buffer_id = SPA_ID_INVALID; } - else - io->status = SPA_STATUS_NEED_BUFFER; - return io->status; } static int schedule_tee_output(struct spa_node *data) @@ -79,23 +76,24 @@ static int schedule_tee_output(struct spa_node *data) struct spa_io_buffers *io = this->rt.mix_port.io; spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) { - pw_log_trace("node %p: port %p %d %p->%p", node, p, p->flags, p->io, io); + pw_log_trace("port %p: port %d %d %p->%p %d %d", + this, p->port_id, p->flags, p->io, io, + p->io->status, p->io->buffer_id); if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED) continue; *io = *p->io; } - pw_log_trace("node %p: tee output %d %d", node, io->status, io->buffer_id); + pw_log_trace("port %p: tee output %d %d", this, io->status, io->buffer_id); return io->status; } static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id) { struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node); - struct spa_graph_node *node = &this->rt.mix_node; struct spa_graph_port *p = &this->rt.mix_port, *pp; if ((pp = p->peer) != NULL) { - pw_log_trace("node %p: tee reuse buffer %d %d", node, port_id, buffer_id); + pw_log_trace("port %p: tee reuse buffer %d %d", this, port_id, buffer_id); spa_node_port_reuse_buffer(pp->node->implementation, port_id, buffer_id); } return 0; @@ -119,10 +117,9 @@ static int schedule_mix_input(struct spa_node *data) spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED) continue; - pw_log_trace("mix %p: input %p %p->%p %d %d", node, - p, p->io, io, p->io->status, p->io->buffer_id); + pw_log_trace("port %p: mix input %d %p->%p %d %d", this, + p->port_id, p->io, io, p->io->status, p->io->buffer_id); *io = *p->io; - p->io->buffer_id = SPA_ID_INVALID; break; } return io->status; @@ -137,6 +134,8 @@ static int schedule_mix_output(struct spa_node *data) if (!spa_list_is_empty(&node->ports[SPA_DIRECTION_INPUT])) { spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { + pw_log_trace("port %p: port %d %d %p->%p", this, + p->port_id, p->flags, io, p->io); if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED) continue; *p->io = *io; @@ -146,7 +145,7 @@ static int schedule_mix_output(struct spa_node *data) io->status = SPA_STATUS_HAVE_BUFFER; io->buffer_id = SPA_ID_INVALID; } - pw_log_trace("mix %p: output %d %d", node, io->status, io->buffer_id); + pw_log_trace("port %p: output %d %d", this, io->status, io->buffer_id); return io->status; } @@ -158,7 +157,7 @@ static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, ui spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) { if ((pp = p->peer) != NULL) { - pw_log_trace("mix %p: reuse buffer %d %d", node, port_id, buffer_id); + pw_log_trace("port %p: reuse buffer %d %d", this, port_id, buffer_id); spa_node_port_reuse_buffer(pp->node->implementation, port_id, buffer_id); } } @@ -179,7 +178,7 @@ int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix) int res = 0; const struct pw_port_implementation *pi = port->implementation; - id = pw_map_insert_new(&port->mix_port_map, NULL); + id = pw_map_insert_new(&port->mix_port_map, mix); spa_graph_port_init(&mix->port, port->direction, id, @@ -343,7 +342,6 @@ static int do_add_port(struct spa_loop *loop, this->rt.port.flags = this->spa_info->flags; spa_graph_port_add(&this->node->rt.node, &this->rt.port); - spa_graph_node_add(this->rt.graph, &this->rt.mix_node); spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port); spa_graph_port_link(&this->rt.port, &this->rt.mix_port); @@ -468,6 +466,9 @@ int pw_port_add(struct pw_port *port, struct pw_node *node) struct pw_type *t = &core->type; const char *str, *dir; + if (port->node != NULL) + return -EEXIST; + port->node = node; spa_node_port_get_info(node->node, @@ -516,7 +517,6 @@ int pw_port_add(struct pw_port *port, struct pw_node *node) pw_port_register(port, node->global->owner, node->global, pw_properties_copy(port->properties)); - port->rt.graph = node->rt.graph; pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port); if (port->state <= PW_PORT_STATE_INIT) @@ -554,7 +554,6 @@ static int do_remove_port(struct spa_loop *loop, spa_graph_port_remove(p); spa_graph_port_remove(&this->rt.mix_port); - spa_graph_node_remove(&this->rt.mix_node); return 0; } @@ -563,11 +562,13 @@ static void pw_port_remove(struct pw_port *port) { struct pw_node *node = port->node; + if (node == NULL) + return; + pw_log_debug("port %p: remove", port); - if (port->rt.graph) - pw_loop_invoke(port->node->data_loop, do_remove_port, - SPA_ID_INVALID, NULL, 0, true, port); + pw_loop_invoke(port->node->data_loop, do_remove_port, + SPA_ID_INVALID, NULL, 0, true, port); if (port->direction == PW_DIRECTION_INPUT) { pw_map_remove(&node->input_port_map, port->port_id); @@ -583,7 +584,6 @@ static void pw_port_remove(struct pw_port *port) void pw_port_destroy(struct pw_port *port) { - struct pw_node *node = port->node; struct pw_control *control, *ctemp; struct pw_resource *resource, *tmp; @@ -591,8 +591,7 @@ void pw_port_destroy(struct pw_port *port) spa_hook_list_call(&port->listener_list, struct pw_port_events, destroy); - if (node) - pw_port_remove(port); + pw_port_remove(port); spa_list_for_each_safe(control, ctemp, &port->control_list[0], port_link) pw_control_destroy(control); @@ -731,10 +730,15 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, return res; } -int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers) +int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, + struct spa_buffer **buffers, uint32_t n_buffers) { int res; struct pw_node *node = port->node; + struct pw_port_mix *mix; + struct spa_graph_port *p; + + pw_log_debug("port %p: %d.%d: %d buffers ", port, port->port_id, mix_id, n_buffers); if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY) return 0; @@ -742,8 +746,18 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 if (n_buffers > 0 && port->state < PW_PORT_STATE_READY) return -EIO; - res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers); - pw_log_debug("port %p: use %d buffers: %d (%s)", port, n_buffers, res, spa_strerror(res)); + if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL) + return -EIO; + + p = &mix->port; + + if (port->mix_node.port_use_buffers) + res = spa_node_port_use_buffers(&port->mix_node, p->direction, p->port_id, buffers, n_buffers); + else + res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers); + + pw_log_debug("port %p: %d.%d: use %d buffers: %d (%s)", port, + port->port_id, mix_id, n_buffers, res, spa_strerror(res)); port->allocated = false; @@ -753,6 +767,8 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 n_buffers = 0; buffers = NULL; } + mix->n_buffers = n_buffers; + mix->buffers = buffers; if (n_buffers == 0) port_update_state (port, PW_PORT_STATE_READY); @@ -762,33 +778,49 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3 return res; } -int pw_port_alloc_buffers(struct pw_port *port, +int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id, struct spa_pod **params, uint32_t n_params, struct spa_buffer **buffers, uint32_t *n_buffers) { int res; struct pw_node *node = port->node; + struct pw_port_mix *mix; + struct spa_graph_port *p; if (port->state < PW_PORT_STATE_READY) return -EIO; - res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id, - params, n_params, - buffers, n_buffers); - pw_log_debug("port %p: alloc %d buffers: %d (%s)", port, *n_buffers, res, spa_strerror(res)); + if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL) + return -EIO; + + p = &mix->port; + + if (port->mix_node.port_use_buffers) + res = spa_node_port_alloc_buffers(&port->mix_node, p->direction, p->port_id, + params, n_params, + buffers, n_buffers); + else + res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id, + params, n_params, + buffers, n_buffers); + + pw_log_debug("port %p: %d.%d alloc %d buffers: %d (%s)", port, + port->port_id, mix_id, *n_buffers, res, spa_strerror(res)); free_allocation(&port->allocation); if (res < 0) { - n_buffers = 0; + *n_buffers = 0; buffers = NULL; port->allocated = false; } else { port->allocated = true; } + mix->n_buffers = *n_buffers; + mix->buffers = buffers; - if (n_buffers == 0) + if (*n_buffers == 0) port_update_state (port, PW_PORT_STATE_READY); else if (!SPA_RESULT_IS_ASYNC(res)) port_update_state (port, PW_PORT_STATE_PAUSED); diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 4f00edced..2182fdf06 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -163,10 +163,6 @@ struct pw_core { struct pw_client *current_client; /**< client currently executing code in mainloop */ long sc_pagesize; - - struct { - struct spa_graph graph; - } rt; }; struct pw_data_loop { @@ -227,6 +223,22 @@ struct pw_module { void *user_data; /**< module user_data */ }; +struct pw_node_activation { +#define NOT_TRIGGERED 0 +#define TRIGGERED 1 +#define AWAKE 2 +#define FINISHED 3 + int state; + + uint64_t signal_time; + uint64_t awake_time; + uint64_t finish_time; + + int status; + uint32_t required; + uint32_t pending; +}; + struct pw_node { struct pw_core *core; /**< core object */ struct spa_list link; /**< link in core node_list */ @@ -260,9 +272,13 @@ struct pw_node { struct pw_loop *data_loop; /**< the data loop for this node */ + int (*process) (struct pw_node *node); + struct { - struct spa_graph *graph; struct spa_graph_node node; + struct spa_list links[2]; + struct pw_node_activation *activation; + struct spa_list sched_link; } rt; void *user_data; /**< extra user data */ @@ -270,7 +286,7 @@ struct pw_node { struct pw_port_mix { struct spa_graph_port port; - struct spa_buffer *buffers; + struct spa_buffer **buffers; uint32_t n_buffers; }; @@ -318,7 +334,6 @@ struct pw_port { struct pw_map mix_port_map; /**< map from port_id from mixer */ struct { - struct spa_graph *graph; struct spa_io_buffers io; /**< io area of the port */ struct spa_graph_port port; /**< this graph port, linked to mix_port */ struct spa_graph_port mix_port; /**< port from the mixer */ @@ -344,7 +359,7 @@ struct pw_link { struct spa_list resource_list; /**< list of bound resources */ - struct spa_io_buffers io; /**< link io area if not provided by ports */ + struct spa_io_buffers *io; /**< link io area */ struct pw_port *output; /**< output port */ struct spa_list output_link; /**< link in output port links */ @@ -354,8 +369,9 @@ struct pw_link { struct spa_hook_list listener_list; struct { - struct pw_port_mix out_port; - struct pw_port_mix in_port; + struct pw_port_mix mix[2]; + struct spa_list in_node_link; + struct spa_list out_node_link; } rt; void *user_data; @@ -560,10 +576,11 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags, const struct spa_pod *param); /** Use buffers on a port \memberof pw_port */ -int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers); +int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id, + struct spa_buffer **buffers, uint32_t n_buffers); /** Allocate memory for buffers on a port \memberof pw_port */ -int pw_port_alloc_buffers(struct pw_port *port, +int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id, struct spa_pod **params, uint32_t n_params, struct spa_buffer **buffers, uint32_t *n_buffers); diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index 3588ad091..c8eb82490 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -67,6 +67,7 @@ struct buffer { }; struct mix { + struct spa_list link; struct pw_port *port; uint32_t mix_id; struct pw_port_mix mix; @@ -83,7 +84,9 @@ struct node_data { struct spa_source *rtsocket_source; struct pw_client_node_transport *trans; - struct mix mix[MAX_MIX]; + struct mix mix_pool[MAX_MIX]; + struct spa_list mix[2]; + struct spa_list free_mix; struct pw_array mems; @@ -470,10 +473,8 @@ do_remove_source(struct spa_loop *loop, } -static void unhandle_socket(struct pw_proxy *proxy) +static void unhandle_socket(struct node_data *data) { - struct node_data *data = proxy->user_data; - pw_loop_invoke(data->core->data_loop, do_remove_source, 1, NULL, 0, true, data); } @@ -504,8 +505,8 @@ static void node_need_input(void *data) { struct node_data *d = data; uint64_t cmd = 1; - pw_log_trace("remote %p: send need input", data); do_pull(data, SPA_DIRECTION_INPUT); + pw_log_trace("remote %p: send need input", data); pw_client_node_transport_add_message(d->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); write(d->rtwritefd, &cmd, 8); @@ -517,6 +518,7 @@ static void node_have_output(void *data) uint64_t cmd = 1; do_push(data, SPA_DIRECTION_OUTPUT); + pw_log_trace("remote %p: send have output", data); pw_client_node_transport_add_message(d->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT)); write(d->rtwritefd, &cmd, 8); @@ -608,7 +610,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) if (mask & (SPA_IO_ERR | SPA_IO_HUP)) { pw_log_warn("got error"); - unhandle_socket(proxy); + unhandle_socket(data); return; } @@ -691,15 +693,14 @@ static void clear_mem(struct node_data *data, struct mem *m) } } -static void clean_transport(struct pw_proxy *proxy) +static void clean_transport(struct node_data *data) { - struct node_data *data = proxy->user_data; struct mem *m; if (data->trans == NULL) return; - unhandle_socket(proxy); + unhandle_socket(data); pw_array_for_each(m, &data->mems) clear_mem(data, m); @@ -733,35 +734,43 @@ do_activate_mix(struct spa_loop *loop, static struct mix *find_mix(struct node_data *data, enum spa_direction direction, uint32_t port_id, uint32_t mix_id) { - struct mix *mix, *empty = NULL; - struct pw_port *port; - int i; + struct mix *mix; - for (i = 0; i < MAX_MIX; i++) { - mix = &data->mix[i]; - - if (mix->port == NULL) { - empty = mix; - continue; - } - if (mix->port->direction == (enum pw_direction) direction && - mix->port->port_id == port_id && + spa_list_for_each(mix, &data->mix[direction], link) { + if (mix->port->port_id == port_id && mix->mix_id == mix_id) return mix; } - if (empty == NULL) + return NULL; +} + +static struct mix *ensure_mix(struct node_data *data, + enum spa_direction direction, uint32_t port_id, uint32_t mix_id) +{ + struct mix *mix; + struct pw_port *port; + + if ((mix = find_mix(data, direction, port_id, mix_id))) + return mix; + + if (spa_list_is_empty(&data->free_mix)) return NULL; port = pw_node_find_port(data->node, direction, port_id); if (port == NULL) return NULL; - mix_init(empty, port, mix_id); + mix = spa_list_first(&data->free_mix, struct mix, link); + spa_list_remove(&mix->link); + + mix_init(mix, port, mix_id); + + spa_list_append(&data->mix[direction], &mix->link); pw_loop_invoke(data->core->data_loop, - do_activate_mix, SPA_ID_INVALID, NULL, 0, false, empty); + do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix); - return empty; + return mix; } static void client_node_add_mem(void *object, @@ -797,7 +806,7 @@ static void client_node_transport(void *object, uint32_t node_id, struct pw_proxy *proxy = object; struct node_data *data = proxy->user_data; - clean_transport(proxy); + clean_transport(data); data->node_id = node_id; data->trans = transport; @@ -887,21 +896,22 @@ static void client_node_event(void *object, const struct spa_event *event) static void do_start(struct node_data *data) { - int i; uint64_t cmd = 1; + struct mix *mix; - for (i = 0; i < MAX_MIX; i++) { - struct mix *mix = &data->mix[i]; - - if (mix->port == NULL) - continue; - + spa_list_for_each(mix, &data->mix[SPA_DIRECTION_INPUT], link) { mix->mix.port.io->status = SPA_STATUS_NEED_BUFFER; mix->mix.port.io->buffer_id = SPA_ID_INVALID; } - pw_client_node_transport_add_message(data->trans, + spa_list_for_each(mix, &data->mix[SPA_DIRECTION_OUTPUT], link) { + mix->mix.port.io->status = SPA_STATUS_NEED_BUFFER; + mix->mix.port.io->buffer_id = SPA_ID_INVALID; + } + if (!spa_list_is_empty(&data->mix[SPA_DIRECTION_INPUT])) { + pw_client_node_transport_add_message(data->trans, &PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT)); - write(data->rtwritefd, &cmd, 8); + write(data->rtwritefd, &cmd, 8); + } } static void client_node_command(void *object, uint32_t seq, const struct spa_command *command) @@ -1007,7 +1017,7 @@ static void clear_buffers(struct node_data *data, struct mix *mix) int i; pw_log_debug("port %p: clear buffers", port); - pw_port_use_buffers(port, NULL, 0); + pw_port_use_buffers(port, mix->mix_id, NULL, 0); pw_array_for_each(bid, &mix->buffers) { if (bid->ptr != NULL) { @@ -1047,7 +1057,7 @@ client_node_port_use_buffers(void *object, struct pw_type *t = &core->type; int res, prot; - mix = find_mix(data, direction, port_id, mix_id); + mix = ensure_mix(data, direction, port_id, mix_id); if (mix == NULL) { res = -EINVAL; goto done; @@ -1164,7 +1174,7 @@ client_node_port_use_buffers(void *object, bufs[i] = b; } - res = pw_port_use_buffers(mix->port, bufs, n_buffers); + res = pw_port_use_buffers(mix->port, mix->mix_id, bufs, n_buffers); done: pw_client_node_proxy_done(data->node_proxy, seq, res); @@ -1212,7 +1222,7 @@ client_node_port_set_io(void *object, struct mem *mid; void *ptr; - mix = find_mix(data, direction, port_id, mix_id); + mix = ensure_mix(data, direction, port_id, mix_id); if (mix == NULL) return; @@ -1317,26 +1327,40 @@ static const struct pw_node_events node_events = { .have_output = node_have_output, }; +static int +do_deactivate_mix(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct mix *mix = user_data; + SPA_FLAG_SET(mix->mix.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED); + spa_graph_port_remove(&mix->mix.port); + return 0; +} + static void clear_mix(struct node_data *data, struct mix *mix) { - if (mix->port) { - clear_buffers(data, mix); - pw_array_clear(&mix->buffers); - mix->port = NULL; - } + clear_buffers(data, mix); + pw_array_clear(&mix->buffers); + + pw_loop_invoke(data->core->data_loop, + do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix); + + spa_list_remove(&mix->link); + spa_list_append(&data->free_mix, &mix->link); } static void node_proxy_destroy(void *_data) { struct node_data *data = _data; - struct pw_proxy *proxy = (struct pw_proxy*) data->node_proxy; - int i; + struct mix *mix, *tmp; if (data->trans) { - for (i = 0; i < MAX_MIX; i++) - clear_mix(data, &data->mix[i]); + spa_list_for_each_safe(mix, tmp, &data->mix[SPA_DIRECTION_INPUT], link) + clear_mix(data, mix); + spa_list_for_each_safe(mix, tmp, &data->mix[SPA_DIRECTION_OUTPUT], link) + clear_mix(data, mix); } - clean_transport(proxy); + clean_transport(data); spa_hook_remove(&data->node_listener); } @@ -1352,6 +1376,7 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, struct remote *impl = SPA_CONTAINER_OF(remote, struct remote, this); struct pw_proxy *proxy; struct node_data *data; + int i; proxy = pw_core_proxy_create_object(remote->core_proxy, "client-node", @@ -1369,6 +1394,12 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote, data->t = pw_core_get_type(data->core); data->node_proxy = (struct pw_client_node_proxy *)proxy; + spa_list_init(&data->free_mix); + spa_list_init(&data->mix[0]); + spa_list_init(&data->mix[1]); + for (i = 0; i < MAX_MIX; i++) + spa_list_append(&data->free_mix, &data->mix_pool[i].link); + pw_array_init(&data->mems, 64); pw_array_ensure_size(&data->mems, sizeof(struct mem) * 64);