node: improve scheduling

Don't use the graph scheduler anymore, instead use a more simple
method using the nodes directly. The idea is that when a node
pulls, we walk the graph backwards and collect nodes to process.
Searching stops on a node with output. Then we process the nodes,
each one in turn calling process on the next one when all dependend
nodes are processed. This is quite similar to jack2 and with some
modifications seems to work well for more complicated input/output
patterns.
Set per mix input/output buffers.
Implement mixing in the audio-dsp node.
remote: handle mix ports a little better
This commit is contained in:
Wim Taymans 2018-03-08 11:08:37 +01:00
parent 1f077c7e0a
commit b6239fb1ab
7 changed files with 479 additions and 240 deletions

View file

@ -76,6 +76,8 @@ struct impl {
};
struct buffer {
#define BUFFER_FLAG_OUT (1<<0)
uint32_t flags;
struct spa_list link;
struct spa_buffer *outbuf;
void *ptr;
@ -91,8 +93,6 @@ struct port {
#define PORT_FLAG_MIDI (1<<2)
uint32_t flags;
struct spa_node mix_node;
struct spa_port_info info;
struct spa_io_buffers *io;
@ -214,8 +214,11 @@ static int node_remove_port(struct spa_node *node, enum spa_direction direction,
static void recycle_buffer(struct node *n, struct port *p, uint32_t id)
{
struct buffer *b = &p->buffers[id];
pw_log_trace("recycle buffer %d", id);
spa_list_append(&p->queue, &b->link);
if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) {
pw_log_trace("recycle buffer %d", id);
spa_list_append(&p->queue, &b->link);
SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
}
}
static int clear_buffers(struct node *n, struct port *p)
@ -228,22 +231,10 @@ static int clear_buffers(struct node *n, struct port *p)
return 0;
}
static struct buffer * dequeue_buffer(struct node *n, struct port *p)
{
struct buffer *b;
if (spa_list_is_empty(&p->queue))
return NULL;
b = spa_list_first(&p->queue, struct buffer, link);
spa_list_remove(&b->link);
return b;
}
static void conv_f32_s16(int16_t *out, float *in, int n_samples, int stride)
static void conv_f32_s16(int16_t *out, int index, float *in, int n_samples, int stride)
{
int i;
out += index;
for (i = 0; i < n_samples; i++) {
if (in[i] < -1.0f)
*out = -32767;
@ -254,22 +245,39 @@ static void conv_f32_s16(int16_t *out, float *in, int n_samples, int stride)
out += stride;
}
}
static void fill_s16(int16_t *out, int n_samples, int stride)
static void fill_s16(int16_t *out, int index, int n_samples, int stride)
{
int i;
out += index;
for (i = 0; i < n_samples; i++) {
*out = 0;
out += stride;
}
}
#if 0
static void add_f32(float *out, float *in, int n_samples)
{
int i;
for (i = 0; i < n_samples; i++)
out[i] += in[i];
}
#endif
static struct buffer * peek_buffer(struct node *n, struct port *p)
{
if (spa_list_is_empty(&p->queue))
return NULL;
return spa_list_first(&p->queue, struct buffer, link);
}
static struct buffer * dequeue_buffer(struct node *n, struct port *p)
{
struct buffer *b;
if ((b = peek_buffer(n, p)) != NULL) {
spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
}
return b;
}
static int node_process_input(struct spa_node *node)
{
@ -278,8 +286,6 @@ static int node_process_input(struct spa_node *node)
struct port *outp = GET_OUT_PORT(n, 0);
struct spa_io_buffers *outio = outp->io;
struct buffer *out;
int16_t *op;
int i;
pw_log_trace(NAME " %p: process input", this);
@ -295,26 +301,6 @@ static int node_process_input(struct spa_node *node)
outio->buffer_id = out->outbuf->id;
outio->status = SPA_STATUS_HAVE_BUFFER;
op = out->ptr;
for (i = 0; i < n->n_in_ports; i++) {
struct port *inp = GET_IN_PORT(n, i);
struct spa_io_buffers *inio = inp->io;
struct buffer *in;
int stride = 2;
pw_log_trace(NAME" %p: mix %d %p %d %d", this, i, inio, inio->buffer_id, n->buffer_size);
if (inio->buffer_id < inp->n_buffers && inio->status == SPA_STATUS_HAVE_BUFFER) {
in = &inp->buffers[inio->buffer_id];
conv_f32_s16(op, in->ptr, n->buffer_size, stride);
}
else {
fill_s16(op, n->buffer_size, stride);
}
op++;
inio->status = SPA_STATUS_NEED_BUFFER;
}
out->outbuf->datas[0].chunk->offset = 0;
out->outbuf->datas[0].chunk->size = n->buffer_size * sizeof(int16_t) * 2;
out->outbuf->datas[0].chunk->stride = 0;
@ -339,6 +325,7 @@ static int node_process_output(struct spa_node *node)
recycle_buffer(n, outp, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID;
}
outio->status = SPA_STATUS_OK;
for (i = 0; i < n->n_in_ports; i++) {
struct port *inp = GET_IN_PORT(n, i);
@ -349,10 +336,9 @@ static int node_process_output(struct spa_node *node)
inio->status = SPA_STATUS_NEED_BUFFER;
}
return outio->status = SPA_STATUS_NEED_BUFFER;
return SPA_STATUS_NEED_BUFFER;
}
static int port_set_io(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
uint32_t id, void *data, size_t size)
@ -546,6 +532,7 @@ static int port_use_buffers(struct spa_node *node, enum spa_direction direction,
struct spa_data *d = buffers[i]->datas;
b = &p->buffers[i];
b->flags = 0;
b->outbuf = buffers[i];
if ((d[0].type == t->data.MemPtr ||
d[0].type == t->data.MemFd ||
@ -629,63 +616,98 @@ static const struct spa_node node_impl = {
.process_output = node_process_output,
};
#if 0
static int schedule_mix_input(struct spa_node *_node)
{
struct port *p = SPA_CONTAINER_OF(_node, struct port, mix_node);
struct pw_port *port = p->port;
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct port *p = port->owner_data;
struct node *n = p->node;
struct port *outp = GET_OUT_PORT(n, 0);
struct spa_graph_node *node = &port->rt.mix_node;
struct spa_graph_port *gp;
struct spa_io_buffers *io = port->rt.mix_port.io;
size_t buffer_size = p->node->buffer_size;
struct buffer *outb;
float *out = NULL;
int layer = 0;
int stride = n->n_in_ports;
spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) {
struct pw_link *link = gp->scheduler_data;
struct spa_buffer *inbuf;
struct pw_port_mix *mix = SPA_CONTAINER_OF(gp, struct pw_port_mix, port);
pw_log_trace("mix %p: input %d %d", node, gp->io->buffer_id, link->output->n_buffers);
if (!(gp->io->buffer_id < link->output->n_buffers && gp->io->status == SPA_STATUS_HAVE_BUFFER))
if (gp->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
inbuf = link->output->buffers[gp->io->buffer_id];
pw_log_trace("mix %p: input %d %d/%d", node,
gp->io->status, gp->io->buffer_id, mix->n_buffers);
if (layer++ == 0)
memcpy(p->buffers[0].ptr, inbuf->datas[0].data, buffer_size * sizeof(float));
else
add_f32(p->buffers[0].ptr, inbuf->datas[0].data, buffer_size);
if (!(gp->io->buffer_id < mix->n_buffers && gp->io->status == SPA_STATUS_HAVE_BUFFER))
continue;
if (layer++ == 0) {
io->buffer_id = gp->io->buffer_id;
io->status = SPA_STATUS_HAVE_BUFFER;
out = mix->buffers[io->buffer_id]->datas[0].data;
}
else {
add_f32(out, mix->buffers[gp->io->buffer_id]->datas[0].data, buffer_size);
}
pw_log_trace("mix %p: input %p %p->%p %d %d", node,
gp, gp->io, io, gp->io->status, gp->io->buffer_id);
*io = *gp->io;
io->buffer_id = 0;
gp->io->status = SPA_STATUS_OK;
gp->io->buffer_id = SPA_ID_INVALID;
}
outb = peek_buffer(n, outp);
if (out == NULL)
fill_s16(outb->ptr, port->port_id, buffer_size, stride);
else
conv_f32_s16(outb->ptr, port->port_id, out, buffer_size, stride);
return SPA_STATUS_HAVE_BUFFER;
}
static int schedule_mix_output(struct spa_node *_node)
{
struct port *p = SPA_CONTAINER_OF(_node, struct port, mix_node);
struct pw_port *port = p->port;
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct port *p = port->owner_data;
struct spa_graph_node *node = &port->rt.mix_node;
struct spa_graph_port *gp;
struct spa_io_buffers *io = port->rt.mix_port.io;
spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link)
spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) {
if (gp->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
*gp->io = *io;
}
return io->status;
}
static int schedule_mix_use_buffers(struct spa_node *_node,
enum spa_direction direction,
uint32_t port_id,
struct spa_buffer **buffers,
uint32_t n_buffers)
{
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct port *p = port->owner_data;
pw_log_debug("port %p: %d use buffers %d %p", port, port_id, n_buffers, buffers);
return 0;
}
static const struct spa_node schedule_mix_node = {
SPA_VERSION_NODE,
NULL,
.port_use_buffers = schedule_mix_use_buffers,
.process_input = schedule_mix_input,
.process_output = schedule_mix_output,
};
#endif
static void port_free(void *data)
{
@ -722,10 +744,13 @@ static struct port *make_port(struct node *n, enum pw_direction direction,
p->node = n;
p->flags = flags;
spa_list_init(&p->queue);
port->owner_data = p;
if (direction == PW_DIRECTION_INPUT) {
n->in_ports[id] = p;
n->n_in_ports++;
port->mix_node = schedule_mix_node;
port->mix = &port->mix_node;
} else {
n->out_ports[id] = p;
n->n_out_ports++;

View file

@ -33,8 +33,6 @@
#include <pipewire/core.h>
#include <pipewire/data-loop.h>
#include <spa/graph/graph-scheduler6.h>
/** \cond */
struct resource_data {
struct spa_hook resource_listener;
@ -386,9 +384,6 @@ struct pw_core *pw_core_new(struct pw_loop *main_loop, struct pw_properties *pro
pw_type_init(&this->type);
pw_map_init(&this->globals, 128, 32);
spa_graph_init(&this->rt.graph);
spa_graph_set_callbacks(&this->rt.graph, &spa_graph_impl_default, NULL);
spa_debug_set_type_map(this->type.map);
this->support[0] = SPA_SUPPORT_INIT(SPA_TYPE__TypeMap, this->type.map);

View file

@ -41,6 +41,7 @@ struct impl {
bool active;
bool have_io;
bool activated;
struct pw_work_queue *work;
@ -51,6 +52,8 @@ struct impl {
struct spa_hook input_node_listener;
struct spa_hook output_port_listener;
struct spa_hook output_node_listener;
struct spa_io_buffers io;
};
struct resource_data {
@ -512,22 +515,24 @@ static int select_io(struct pw_link *this)
if (impl->have_io)
return 0;
io = this->rt.in_port.port.io;
io = this->rt.mix[SPA_DIRECTION_INPUT].port.io;
if (io == NULL)
io = this->rt.out_port.port.io;
io = this->rt.mix[SPA_DIRECTION_OUTPUT].port.io;
if (io == NULL)
io = &this->io;
io = &impl->io;
if (io == NULL)
return -EIO;
if ((res = port_set_io(this, this->input, io,
sizeof(struct spa_io_buffers), &this->rt.in_port.port)) < 0)
sizeof(struct spa_io_buffers), &this->rt.mix[SPA_DIRECTION_INPUT].port)) < 0)
return res;
if ((res = port_set_io(this, this->output, io,
sizeof(struct spa_io_buffers), &this->rt.out_port.port)) < 0)
sizeof(struct spa_io_buffers), &this->rt.mix[SPA_DIRECTION_OUTPUT].port)) < 0)
return res;
this->io = io;
impl->have_io = true;
return 0;
@ -588,11 +593,11 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
goto error;
}
} else if (in_state == PW_PORT_STATE_READY && out_state > PW_PORT_STATE_READY) {
out_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
SPA_FLAG_SET(out_flags, SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS);
SPA_FLAG_UNSET(in_flags, SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS);
} else if (out_state == PW_PORT_STATE_READY && in_state > PW_PORT_STATE_READY) {
in_flags &= ~SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
out_flags &= ~SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS;
SPA_FLAG_SET(in_flags, SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS);
SPA_FLAG_UNSET(out_flags, SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS);
} else {
pw_log_debug("link %p: delay allocation, state %d %d", this, in_state, out_state);
return 0;
@ -612,7 +617,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
allocation.n_buffers, allocation.buffers);
} else if (input->allocation.n_buffers && input->mix == NULL) {
out_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
in_flags = 0;
in_flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS;
allocation = input->allocation;
@ -691,6 +696,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
if (out_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(output,
this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
params, n_params,
allocation.buffers,
&allocation.n_buffers)) < 0) {
@ -706,6 +712,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
allocation.n_buffers, allocation.buffers);
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_ALLOC_BUFFERS) {
if ((res = pw_port_alloc_buffers(input,
this->rt.mix[SPA_DIRECTION_INPUT].port.port_id,
params, n_params,
allocation.buffers,
&allocation.n_buffers)) < 0) {
@ -724,6 +731,7 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
pw_log_debug("link %p: using %d buffers %p on output port", this,
allocation.n_buffers, allocation.buffers);
if ((res = pw_port_use_buffers(output,
this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
allocation.buffers,
allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use output buffers: %s", this,
@ -735,10 +743,12 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
move_allocation(&allocation, &output->allocation);
} else if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
}
if (in_flags & SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS) {
pw_log_debug("link %p: using %d buffers %p on input port", this,
allocation.n_buffers, allocation.buffers);
if ((res = pw_port_use_buffers(input,
this->rt.mix[SPA_DIRECTION_INPUT].port.port_id,
allocation.buffers,
allocation.n_buffers)) < 0) {
asprintf(&error, "link %p: error use input buffers: %s", this,
@ -757,7 +767,8 @@ static int do_allocation(struct pw_link *this, uint32_t in_state, uint32_t out_s
asprintf(&error, "link %p: error can set io: %s", this, spa_strerror(res));
goto error;
}
return 0;
return res;
error:
free_allocation(&output->allocation);
@ -771,8 +782,18 @@ do_activate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
SPA_FLAG_UNSET(this->rt.out_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
SPA_FLAG_UNSET(this->rt.in_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
pw_log_trace("link %p: activate", this);
SPA_FLAG_UNSET(this->rt.mix[0].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
SPA_FLAG_UNSET(this->rt.mix[1].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
spa_list_append(&this->output->node->rt.links[SPA_DIRECTION_OUTPUT],
&this->rt.out_node_link);
spa_list_append(&this->input->node->rt.links[SPA_DIRECTION_INPUT],
&this->rt.in_node_link);
__atomic_add_fetch(&this->input->node->rt.activation->required, 1, __ATOMIC_SEQ_CST);
return 0;
}
@ -791,8 +812,12 @@ static int do_start(struct pw_link *this, uint32_t in_state, uint32_t out_state)
input = this->input;
output = this->output;
pw_loop_invoke(output->node->data_loop,
if (!impl->activated) {
pw_loop_invoke(output->node->data_loop,
do_activate_link, SPA_ID_INVALID, NULL, 0, false, this);
impl->activated = true;
}
if (in_state == PW_PORT_STATE_PAUSED) {
if ((res = pw_node_set_state(input->node, PW_NODE_STATE_RUNNING)) < 0) {
@ -899,8 +924,13 @@ output_node_async_complete(void *data, uint32_t seq, int res)
static void clear_port_buffers(struct pw_link *link, struct pw_port *port)
{
if (spa_list_is_empty(&port->links) && port->allocation.mem == NULL)
pw_port_use_buffers(port, NULL, 0);
int res;
pw_log_debug("%d %p", spa_list_is_empty(&port->links), port->allocation.mem);
if ((res = pw_port_use_buffers(port,
link->rt.mix[SPA_DIRECTION_INPUT].port.port_id, NULL, 0)) < 0)
pw_log_warn("link %p: port %p clear error %s", link, port, spa_strerror(res));
}
static int
@ -908,7 +938,8 @@ do_remove_input(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
spa_graph_port_remove(&this->rt.in_port.port);
spa_graph_port_unlink(&this->rt.mix[SPA_DIRECTION_INPUT].port);
spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_INPUT].port);
return 0;
}
@ -923,12 +954,12 @@ static void input_remove(struct pw_link *this, struct pw_port *port)
pw_loop_invoke(port->node->data_loop,
do_remove_input, 1, NULL, 0, true, this);
pw_port_release_mix(port, &this->rt.in_port);
spa_list_remove(&this->input_link);
spa_hook_list_call(&this->input->listener_list, struct pw_port_events, link_removed, this);
clear_port_buffers(this, port);
pw_port_release_mix(port, &this->rt.mix[SPA_DIRECTION_INPUT]);
this->input = NULL;
}
@ -937,7 +968,8 @@ do_remove_output(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
spa_graph_port_remove(&this->rt.out_port.port);
spa_graph_port_unlink(&this->rt.mix[SPA_DIRECTION_OUTPUT].port);
spa_graph_port_remove(&this->rt.mix[SPA_DIRECTION_OUTPUT].port);
return 0;
}
@ -952,12 +984,12 @@ static void output_remove(struct pw_link *this, struct pw_port *port)
pw_loop_invoke(port->node->data_loop,
do_remove_output, 1, NULL, 0, true, this);
pw_port_release_mix(port, &this->rt.out_port);
spa_list_remove(&this->output_link);
spa_hook_list_call(&this->output->listener_list, struct pw_port_events, link_removed, this);
clear_port_buffers(this, port);
pw_port_release_mix(port, &this->rt.mix[SPA_DIRECTION_OUTPUT]);
this->output = NULL;
}
@ -985,12 +1017,13 @@ int pw_link_activate(struct pw_link *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
pw_log_debug("link %p: activate %d", this, impl->active);
if (impl->active)
return 0;
impl->active = true;
pw_log_debug("link %p: activate", this);
this->output->node->n_used_output_links++;
this->input->node->n_used_input_links++;
@ -1005,9 +1038,16 @@ do_deactivate_link(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
pw_log_trace("link %p: disable %p and %p", this, &this->rt.out_port, &this->rt.in_port);
SPA_FLAG_SET(this->rt.out_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
SPA_FLAG_SET(this->rt.in_port.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
pw_log_debug("link %p: disable %p and %p", this, &this->rt.mix[0], &this->rt.mix[1]);
__atomic_sub_fetch(&this->input->node->rt.activation->required, 1, __ATOMIC_SEQ_CST);
SPA_FLAG_SET(this->rt.mix[0].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
SPA_FLAG_SET(this->rt.mix[1].port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
spa_list_remove(&this->rt.out_node_link);
spa_list_remove(&this->rt.in_node_link);
return 0;
}
@ -1016,13 +1056,17 @@ int pw_link_deactivate(struct pw_link *this)
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_node *input_node, *output_node;
pw_log_debug("link %p: deactivate %d", this, impl->active);
if (!impl->active)
return 0;
impl->active = false;
pw_log_debug("link %p: deactivate", this);
pw_loop_invoke(this->output->node->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this);
if (impl->activated) {
pw_loop_invoke(this->output->node->data_loop,
do_deactivate_link, SPA_ID_INVALID, NULL, 0, true, this);
impl->activated = false;
}
input_node = this->input->node;
output_node = this->output->node;
@ -1108,11 +1152,7 @@ do_add_link(struct spa_loop *loop,
struct pw_link *this = user_data;
struct pw_port *port = ((struct pw_port **) data)[0];
if (port->direction == PW_DIRECTION_OUTPUT) {
spa_graph_port_add(&port->rt.mix_node, &this->rt.out_port.port);
} else {
spa_graph_port_add(&port->rt.mix_node, &this->rt.in_port.port);
}
spa_graph_port_add(&port->rt.mix_node, &this->rt.mix[port->direction].port);
return 0;
}
@ -1211,16 +1251,17 @@ struct pw_link *pw_link_new(struct pw_core *core,
this->info.format = NULL;
this->info.props = this->properties ? &this->properties->dict : NULL;
this->io = SPA_IO_BUFFERS_INIT;
impl->io = SPA_IO_BUFFERS_INIT;
pw_port_init_mix(output, &this->rt.out_port);
pw_port_init_mix(input, &this->rt.in_port);
pw_port_init_mix(output, &this->rt.mix[SPA_DIRECTION_OUTPUT]);
pw_port_init_mix(input, &this->rt.mix[SPA_DIRECTION_INPUT]);
pw_log_debug("link %p: constructed %p:%d.%d -> %p:%d.%d", impl,
output_node, output->port_id, this->rt.out_port.port.port_id,
input_node, input->port_id, this->rt.in_port.port.port_id);
output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id);
spa_graph_port_link(&this->rt.out_port.port, &this->rt.in_port.port);
spa_graph_port_link(&this->rt.mix[SPA_DIRECTION_OUTPUT].port,
&this->rt.mix[SPA_DIRECTION_INPUT].port);
/* nodes can be in different data loops so we do this twice */
pw_loop_invoke(output_node->data_loop, do_add_link,

View file

@ -41,6 +41,8 @@ struct impl {
struct pw_work_queue *work;
bool pause_on_idle;
struct pw_node_activation activation;
};
struct resource_data {
@ -301,17 +303,6 @@ global_bind(void *_data, struct pw_client *client, uint32_t permissions,
return;
}
static int
do_node_add(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
spa_graph_node_add(this->rt.graph, &this->rt.node);
return 0;
}
static void global_destroy(void *data)
{
struct pw_node *this = data;
@ -347,8 +338,6 @@ int pw_node_register(struct pw_node *this,
pw_node_update_ports(this);
pw_loop_invoke(this->data_loop, do_node_add, 1, NULL, 0, false, this);
if ((str = pw_properties_get(this->properties, "media.class")) != NULL)
pw_properties_set(properties, "media.class", str);
pw_properties_set(properties, "node.name", this->info.name);
@ -382,6 +371,45 @@ int pw_node_register(struct pw_node *this,
return 0;
}
static void node_have_output(void *data);
static void node_need_input(void *data);
static int impl_node_process(struct pw_node *node)
{
struct spa_graph_port *p;
int res = 0;
pw_log_trace("node %p: process %d", node, node->rt.activation->status);
if (node->rt.activation->status != SPA_STATUS_HAVE_BUFFER) {
if (!spa_list_is_empty(&node->rt.node.ports[SPA_DIRECTION_INPUT])) {
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_input(p->peer->node->implementation);
if (node->node->process_input)
res = node->rt.activation->status = spa_node_process_input(node->node);
}
else {
if (node->node->process_output)
res = node->rt.activation->status = spa_node_process_output(node->node);
}
} else if (node->rt.activation->status == SPA_STATUS_HAVE_BUFFER) {
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link)
spa_node_process_output(p->peer->node->implementation);
if (node->node->process_output)
res = node->rt.activation->status = spa_node_process_output(node->node);
}
if (res == SPA_STATUS_HAVE_BUFFER)
node_have_output(node);
else if (res == SPA_STATUS_NEED_BUFFER)
node_need_input(node);
return node->rt.activation->status;
}
static void check_properties(struct pw_node *node)
{
struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
@ -427,8 +455,6 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->data_loop = core->data_loop;
this->rt.graph = &core->rt.graph;
spa_list_init(&this->resource_list);
spa_hook_list_init(&this->listener_list);
@ -442,6 +468,12 @@ struct pw_node *pw_node_new(struct pw_core *core,
pw_map_init(&this->output_port_map, 64, 64);
spa_graph_node_init(&this->rt.node);
spa_list_init(&this->rt.links[SPA_DIRECTION_INPUT]);
spa_list_init(&this->rt.links[SPA_DIRECTION_OUTPUT]);
this->rt.activation = &impl->activation;
impl->activation.status = SPA_STATUS_NEED_BUFFER;
this->process = impl_node_process;
return this;
@ -520,20 +552,99 @@ static void node_event(void *data, struct spa_event *event)
spa_hook_list_call(&node->listener_list, struct pw_node_events, event, event);
}
static void node_process(struct pw_node *node)
{
pw_log_trace("node %p: pending %d required %d %d", node,
node->rt.activation->pending, node->rt.activation->required,
node->rt.activation->status);
if (--node->rt.activation->pending == 0) {
if (node->process)
node->rt.activation->status = node->process(node);
else
node->rt.activation->status = SPA_STATUS_OK;
pw_log_trace("node %p: %d", node, node->rt.activation->status);
}
}
static void node_need_input(void *data)
{
struct pw_node *node = data;
struct pw_node *node = data, *n, *pn;
struct spa_list queue, pending;
struct spa_graph_port *p;
pw_log_trace("node %p: need input", node);
spa_hook_list_call(&node->listener_list, struct pw_node_events, need_input);
spa_graph_need_input(node->rt.graph, &node->rt.node);
spa_list_init(&queue);
spa_list_init(&pending);
node->rt.activation->status = SPA_STATUS_NEED_BUFFER;
spa_list_append(&queue, &node->rt.sched_link);
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_INPUT], link)
spa_node_process_output(p->peer->node->implementation);
while (!spa_list_is_empty(&queue)) {
struct pw_link *l;
n = spa_list_first(&queue, struct pw_node, rt.sched_link);
spa_list_remove(&n->rt.sched_link);
if (n != node);
spa_list_prepend(&pending, &n->rt.sched_link);
n->rt.activation->pending = 1;
if (n->rt.activation->status == SPA_STATUS_HAVE_BUFFER)
continue;
n->rt.activation->pending += n->rt.activation->required;
pw_log_trace("node %p: add %d %d",
n, n->rt.activation->pending, n->rt.activation->required);
spa_list_for_each(l, &n->rt.links[SPA_DIRECTION_INPUT], rt.in_node_link) {
pn = l->output->node;
pw_log_trace("node %p: %p in %p %d", n, pn, l, l->io->status);
if (l->io->status == SPA_STATUS_OK) {
n->rt.activation->pending -= 1;
continue;
}
if (pn->rt.sched_link.next == NULL)
spa_list_append(&queue, &pn->rt.sched_link);
}
}
while (!spa_list_is_empty(&pending)) {
n = spa_list_first(&pending, struct pw_node, rt.sched_link);
spa_list_remove(&n->rt.sched_link);
n->rt.sched_link.next = NULL;
node_process(n);
}
}
static void node_have_output(void *data)
{
struct pw_node *node = data;
struct pw_node *node = data, *pn;
struct pw_link *l;
struct spa_graph_port *p;
pw_log_trace("node %p: have output", node);
spa_graph_have_output(node->rt.graph, &node->rt.node);
node->rt.activation->status = SPA_STATUS_HAVE_BUFFER;
spa_list_for_each(p, &node->rt.node.ports[SPA_DIRECTION_OUTPUT], link)
spa_node_process_input(p->peer->node->implementation);
spa_hook_list_call(&node->listener_list, struct pw_node_events, have_output);
spa_list_for_each(l, &node->rt.links[SPA_DIRECTION_OUTPUT], rt.out_node_link) {
pn = l->input->node;
pw_log_trace("node %p: %p out %p %d", node, pn, l, l->io->status);
node_process(pn);
}
}
static void node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
@ -561,7 +672,6 @@ static const struct spa_node_callbacks node_callbacks = {
.reuse_buffer = node_reuse_buffer,
};
void pw_node_set_implementation(struct pw_node *node,
struct spa_node *spa_node)
{
@ -586,19 +696,6 @@ void pw_node_add_listener(struct pw_node *node,
spa_hook_list_append(&node->listener_list, listener, events, data);
}
static int
do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
pause_node(this);
spa_graph_node_remove(&this->rt.node);
return 0;
}
/** Destroy a node
* \param node a node to destroy
*
@ -616,8 +713,9 @@ void pw_node_destroy(struct pw_node *node)
pw_log_debug("node %p: destroy", impl);
spa_hook_list_call(&node->listener_list, struct pw_node_events, destroy);
pause_node(node);
if (node->registered) {
pw_loop_invoke(node->data_loop, do_node_remove, 1, NULL, 0, true, node);
spa_list_remove(&node->link);
}

View file

@ -57,18 +57,15 @@ static int schedule_tee_input(struct spa_node *data)
struct spa_io_buffers *io = this->rt.mix_port.io;
if (!spa_list_is_empty(&node->ports[SPA_DIRECTION_OUTPUT])) {
pw_log_trace("node %p: tee input %d %d", node, io->status, io->buffer_id);
pw_log_trace("port %p: tee input %d %d", this, io->status, io->buffer_id);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
pw_log_trace("node %p: port %p %d %p->%p", node, p, p->flags, io, p->io);
pw_log_trace("port %p: port %d %d %p->%p", this,
p->port_id, p->flags, io, p->io);
if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
*p->io = *io;
}
io->buffer_id = SPA_ID_INVALID;
}
else
io->status = SPA_STATUS_NEED_BUFFER;
return io->status;
}
static int schedule_tee_output(struct spa_node *data)
@ -79,23 +76,24 @@ static int schedule_tee_output(struct spa_node *data)
struct spa_io_buffers *io = this->rt.mix_port.io;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
pw_log_trace("node %p: port %p %d %p->%p", node, p, p->flags, p->io, io);
pw_log_trace("port %p: port %d %d %p->%p %d %d",
this, p->port_id, p->flags, p->io, io,
p->io->status, p->io->buffer_id);
if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
*io = *p->io;
}
pw_log_trace("node %p: tee output %d %d", node, io->status, io->buffer_id);
pw_log_trace("port %p: tee output %d %d", this, io->status, io->buffer_id);
return io->status;
}
static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p = &this->rt.mix_port, *pp;
if ((pp = p->peer) != NULL) {
pw_log_trace("node %p: tee reuse buffer %d %d", node, port_id, buffer_id);
pw_log_trace("port %p: tee reuse buffer %d %d", this, port_id, buffer_id);
spa_node_port_reuse_buffer(pp->node->implementation, port_id, buffer_id);
}
return 0;
@ -119,10 +117,9 @@ static int schedule_mix_input(struct spa_node *data)
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
pw_log_trace("mix %p: input %p %p->%p %d %d", node,
p, p->io, io, p->io->status, p->io->buffer_id);
pw_log_trace("port %p: mix input %d %p->%p %d %d", this,
p->port_id, p->io, io, p->io->status, p->io->buffer_id);
*io = *p->io;
p->io->buffer_id = SPA_ID_INVALID;
break;
}
return io->status;
@ -137,6 +134,8 @@ static int schedule_mix_output(struct spa_node *data)
if (!spa_list_is_empty(&node->ports[SPA_DIRECTION_INPUT])) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
pw_log_trace("port %p: port %d %d %p->%p", this,
p->port_id, p->flags, io, p->io);
if (p->flags & SPA_GRAPH_PORT_FLAG_DISABLED)
continue;
*p->io = *io;
@ -146,7 +145,7 @@ static int schedule_mix_output(struct spa_node *data)
io->status = SPA_STATUS_HAVE_BUFFER;
io->buffer_id = SPA_ID_INVALID;
}
pw_log_trace("mix %p: output %d %d", node, io->status, io->buffer_id);
pw_log_trace("port %p: output %d %d", this, io->status, io->buffer_id);
return io->status;
}
@ -158,7 +157,7 @@ static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, ui
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
if ((pp = p->peer) != NULL) {
pw_log_trace("mix %p: reuse buffer %d %d", node, port_id, buffer_id);
pw_log_trace("port %p: reuse buffer %d %d", this, port_id, buffer_id);
spa_node_port_reuse_buffer(pp->node->implementation, port_id, buffer_id);
}
}
@ -179,7 +178,7 @@ int pw_port_init_mix(struct pw_port *port, struct pw_port_mix *mix)
int res = 0;
const struct pw_port_implementation *pi = port->implementation;
id = pw_map_insert_new(&port->mix_port_map, NULL);
id = pw_map_insert_new(&port->mix_port_map, mix);
spa_graph_port_init(&mix->port,
port->direction, id,
@ -343,7 +342,6 @@ static int do_add_port(struct spa_loop *loop,
this->rt.port.flags = this->spa_info->flags;
spa_graph_port_add(&this->node->rt.node, &this->rt.port);
spa_graph_node_add(this->rt.graph, &this->rt.mix_node);
spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port);
spa_graph_port_link(&this->rt.port, &this->rt.mix_port);
@ -468,6 +466,9 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
struct pw_type *t = &core->type;
const char *str, *dir;
if (port->node != NULL)
return -EEXIST;
port->node = node;
spa_node_port_get_info(node->node,
@ -516,7 +517,6 @@ int pw_port_add(struct pw_port *port, struct pw_node *node)
pw_port_register(port, node->global->owner, node->global,
pw_properties_copy(port->properties));
port->rt.graph = node->rt.graph;
pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port);
if (port->state <= PW_PORT_STATE_INIT)
@ -554,7 +554,6 @@ static int do_remove_port(struct spa_loop *loop,
spa_graph_port_remove(p);
spa_graph_port_remove(&this->rt.mix_port);
spa_graph_node_remove(&this->rt.mix_node);
return 0;
}
@ -563,11 +562,13 @@ static void pw_port_remove(struct pw_port *port)
{
struct pw_node *node = port->node;
if (node == NULL)
return;
pw_log_debug("port %p: remove", port);
if (port->rt.graph)
pw_loop_invoke(port->node->data_loop, do_remove_port,
SPA_ID_INVALID, NULL, 0, true, port);
pw_loop_invoke(port->node->data_loop, do_remove_port,
SPA_ID_INVALID, NULL, 0, true, port);
if (port->direction == PW_DIRECTION_INPUT) {
pw_map_remove(&node->input_port_map, port->port_id);
@ -583,7 +584,6 @@ static void pw_port_remove(struct pw_port *port)
void pw_port_destroy(struct pw_port *port)
{
struct pw_node *node = port->node;
struct pw_control *control, *ctemp;
struct pw_resource *resource, *tmp;
@ -591,8 +591,7 @@ void pw_port_destroy(struct pw_port *port)
spa_hook_list_call(&port->listener_list, struct pw_port_events, destroy);
if (node)
pw_port_remove(port);
pw_port_remove(port);
spa_list_for_each_safe(control, ctemp, &port->control_list[0], port_link)
pw_control_destroy(control);
@ -731,10 +730,15 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
return res;
}
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers)
int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id,
struct spa_buffer **buffers, uint32_t n_buffers)
{
int res;
struct pw_node *node = port->node;
struct pw_port_mix *mix;
struct spa_graph_port *p;
pw_log_debug("port %p: %d.%d: %d buffers ", port, port->port_id, mix_id, n_buffers);
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
return 0;
@ -742,8 +746,18 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
if (n_buffers > 0 && port->state < PW_PORT_STATE_READY)
return -EIO;
res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers);
pw_log_debug("port %p: use %d buffers: %d (%s)", port, n_buffers, res, spa_strerror(res));
if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL)
return -EIO;
p = &mix->port;
if (port->mix_node.port_use_buffers)
res = spa_node_port_use_buffers(&port->mix_node, p->direction, p->port_id, buffers, n_buffers);
else
res = spa_node_port_use_buffers(node->node, port->direction, port->port_id, buffers, n_buffers);
pw_log_debug("port %p: %d.%d: use %d buffers: %d (%s)", port,
port->port_id, mix_id, n_buffers, res, spa_strerror(res));
port->allocated = false;
@ -753,6 +767,8 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
n_buffers = 0;
buffers = NULL;
}
mix->n_buffers = n_buffers;
mix->buffers = buffers;
if (n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY);
@ -762,33 +778,49 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
return res;
}
int pw_port_alloc_buffers(struct pw_port *port,
int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id,
struct spa_pod **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers)
{
int res;
struct pw_node *node = port->node;
struct pw_port_mix *mix;
struct spa_graph_port *p;
if (port->state < PW_PORT_STATE_READY)
return -EIO;
res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id,
params, n_params,
buffers, n_buffers);
pw_log_debug("port %p: alloc %d buffers: %d (%s)", port, *n_buffers, res, spa_strerror(res));
if ((mix = pw_map_lookup(&port->mix_port_map, mix_id)) == NULL)
return -EIO;
p = &mix->port;
if (port->mix_node.port_use_buffers)
res = spa_node_port_alloc_buffers(&port->mix_node, p->direction, p->port_id,
params, n_params,
buffers, n_buffers);
else
res = spa_node_port_alloc_buffers(node->node, port->direction, port->port_id,
params, n_params,
buffers, n_buffers);
pw_log_debug("port %p: %d.%d alloc %d buffers: %d (%s)", port,
port->port_id, mix_id, *n_buffers, res, spa_strerror(res));
free_allocation(&port->allocation);
if (res < 0) {
n_buffers = 0;
*n_buffers = 0;
buffers = NULL;
port->allocated = false;
}
else {
port->allocated = true;
}
mix->n_buffers = *n_buffers;
mix->buffers = buffers;
if (n_buffers == 0)
if (*n_buffers == 0)
port_update_state (port, PW_PORT_STATE_READY);
else if (!SPA_RESULT_IS_ASYNC(res))
port_update_state (port, PW_PORT_STATE_PAUSED);

View file

@ -163,10 +163,6 @@ struct pw_core {
struct pw_client *current_client; /**< client currently executing code in mainloop */
long sc_pagesize;
struct {
struct spa_graph graph;
} rt;
};
struct pw_data_loop {
@ -227,6 +223,22 @@ struct pw_module {
void *user_data; /**< module user_data */
};
struct pw_node_activation {
#define NOT_TRIGGERED 0
#define TRIGGERED 1
#define AWAKE 2
#define FINISHED 3
int state;
uint64_t signal_time;
uint64_t awake_time;
uint64_t finish_time;
int status;
uint32_t required;
uint32_t pending;
};
struct pw_node {
struct pw_core *core; /**< core object */
struct spa_list link; /**< link in core node_list */
@ -260,9 +272,13 @@ struct pw_node {
struct pw_loop *data_loop; /**< the data loop for this node */
int (*process) (struct pw_node *node);
struct {
struct spa_graph *graph;
struct spa_graph_node node;
struct spa_list links[2];
struct pw_node_activation *activation;
struct spa_list sched_link;
} rt;
void *user_data; /**< extra user data */
@ -270,7 +286,7 @@ struct pw_node {
struct pw_port_mix {
struct spa_graph_port port;
struct spa_buffer *buffers;
struct spa_buffer **buffers;
uint32_t n_buffers;
};
@ -318,7 +334,6 @@ struct pw_port {
struct pw_map mix_port_map; /**< map from port_id from mixer */
struct {
struct spa_graph *graph;
struct spa_io_buffers io; /**< io area of the port */
struct spa_graph_port port; /**< this graph port, linked to mix_port */
struct spa_graph_port mix_port; /**< port from the mixer */
@ -344,7 +359,7 @@ struct pw_link {
struct spa_list resource_list; /**< list of bound resources */
struct spa_io_buffers io; /**< link io area if not provided by ports */
struct spa_io_buffers *io; /**< link io area */
struct pw_port *output; /**< output port */
struct spa_list output_link; /**< link in output port links */
@ -354,8 +369,9 @@ struct pw_link {
struct spa_hook_list listener_list;
struct {
struct pw_port_mix out_port;
struct pw_port_mix in_port;
struct pw_port_mix mix[2];
struct spa_list in_node_link;
struct spa_list out_node_link;
} rt;
void *user_data;
@ -560,10 +576,11 @@ int pw_port_set_param(struct pw_port *port, uint32_t id, uint32_t flags,
const struct spa_pod *param);
/** Use buffers on a port \memberof pw_port */
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers);
int pw_port_use_buffers(struct pw_port *port, uint32_t mix_id,
struct spa_buffer **buffers, uint32_t n_buffers);
/** Allocate memory for buffers on a port \memberof pw_port */
int pw_port_alloc_buffers(struct pw_port *port,
int pw_port_alloc_buffers(struct pw_port *port, uint32_t mix_id,
struct spa_pod **params, uint32_t n_params,
struct spa_buffer **buffers, uint32_t *n_buffers);

View file

@ -67,6 +67,7 @@ struct buffer {
};
struct mix {
struct spa_list link;
struct pw_port *port;
uint32_t mix_id;
struct pw_port_mix mix;
@ -83,7 +84,9 @@ struct node_data {
struct spa_source *rtsocket_source;
struct pw_client_node_transport *trans;
struct mix mix[MAX_MIX];
struct mix mix_pool[MAX_MIX];
struct spa_list mix[2];
struct spa_list free_mix;
struct pw_array mems;
@ -470,10 +473,8 @@ do_remove_source(struct spa_loop *loop,
}
static void unhandle_socket(struct pw_proxy *proxy)
static void unhandle_socket(struct node_data *data)
{
struct node_data *data = proxy->user_data;
pw_loop_invoke(data->core->data_loop,
do_remove_source, 1, NULL, 0, true, data);
}
@ -504,8 +505,8 @@ static void node_need_input(void *data)
{
struct node_data *d = data;
uint64_t cmd = 1;
pw_log_trace("remote %p: send need input", data);
do_pull(data, SPA_DIRECTION_INPUT);
pw_log_trace("remote %p: send need input", data);
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(d->rtwritefd, &cmd, 8);
@ -517,6 +518,7 @@ static void node_have_output(void *data)
uint64_t cmd = 1;
do_push(data, SPA_DIRECTION_OUTPUT);
pw_log_trace("remote %p: send have output", data);
pw_client_node_transport_add_message(d->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(d->rtwritefd, &cmd, 8);
@ -608,7 +610,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask)
if (mask & (SPA_IO_ERR | SPA_IO_HUP)) {
pw_log_warn("got error");
unhandle_socket(proxy);
unhandle_socket(data);
return;
}
@ -691,15 +693,14 @@ static void clear_mem(struct node_data *data, struct mem *m)
}
}
static void clean_transport(struct pw_proxy *proxy)
static void clean_transport(struct node_data *data)
{
struct node_data *data = proxy->user_data;
struct mem *m;
if (data->trans == NULL)
return;
unhandle_socket(proxy);
unhandle_socket(data);
pw_array_for_each(m, &data->mems)
clear_mem(data, m);
@ -733,35 +734,43 @@ do_activate_mix(struct spa_loop *loop,
static struct mix *find_mix(struct node_data *data,
enum spa_direction direction, uint32_t port_id, uint32_t mix_id)
{
struct mix *mix, *empty = NULL;
struct pw_port *port;
int i;
struct mix *mix;
for (i = 0; i < MAX_MIX; i++) {
mix = &data->mix[i];
if (mix->port == NULL) {
empty = mix;
continue;
}
if (mix->port->direction == (enum pw_direction) direction &&
mix->port->port_id == port_id &&
spa_list_for_each(mix, &data->mix[direction], link) {
if (mix->port->port_id == port_id &&
mix->mix_id == mix_id)
return mix;
}
if (empty == NULL)
return NULL;
}
static struct mix *ensure_mix(struct node_data *data,
enum spa_direction direction, uint32_t port_id, uint32_t mix_id)
{
struct mix *mix;
struct pw_port *port;
if ((mix = find_mix(data, direction, port_id, mix_id)))
return mix;
if (spa_list_is_empty(&data->free_mix))
return NULL;
port = pw_node_find_port(data->node, direction, port_id);
if (port == NULL)
return NULL;
mix_init(empty, port, mix_id);
mix = spa_list_first(&data->free_mix, struct mix, link);
spa_list_remove(&mix->link);
mix_init(mix, port, mix_id);
spa_list_append(&data->mix[direction], &mix->link);
pw_loop_invoke(data->core->data_loop,
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, empty);
do_activate_mix, SPA_ID_INVALID, NULL, 0, false, mix);
return empty;
return mix;
}
static void client_node_add_mem(void *object,
@ -797,7 +806,7 @@ static void client_node_transport(void *object, uint32_t node_id,
struct pw_proxy *proxy = object;
struct node_data *data = proxy->user_data;
clean_transport(proxy);
clean_transport(data);
data->node_id = node_id;
data->trans = transport;
@ -887,21 +896,22 @@ static void client_node_event(void *object, const struct spa_event *event)
static void do_start(struct node_data *data)
{
int i;
uint64_t cmd = 1;
struct mix *mix;
for (i = 0; i < MAX_MIX; i++) {
struct mix *mix = &data->mix[i];
if (mix->port == NULL)
continue;
spa_list_for_each(mix, &data->mix[SPA_DIRECTION_INPUT], link) {
mix->mix.port.io->status = SPA_STATUS_NEED_BUFFER;
mix->mix.port.io->buffer_id = SPA_ID_INVALID;
}
pw_client_node_transport_add_message(data->trans,
spa_list_for_each(mix, &data->mix[SPA_DIRECTION_OUTPUT], link) {
mix->mix.port.io->status = SPA_STATUS_NEED_BUFFER;
mix->mix.port.io->buffer_id = SPA_ID_INVALID;
}
if (!spa_list_is_empty(&data->mix[SPA_DIRECTION_INPUT])) {
pw_client_node_transport_add_message(data->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(data->rtwritefd, &cmd, 8);
write(data->rtwritefd, &cmd, 8);
}
}
static void client_node_command(void *object, uint32_t seq, const struct spa_command *command)
@ -1007,7 +1017,7 @@ static void clear_buffers(struct node_data *data, struct mix *mix)
int i;
pw_log_debug("port %p: clear buffers", port);
pw_port_use_buffers(port, NULL, 0);
pw_port_use_buffers(port, mix->mix_id, NULL, 0);
pw_array_for_each(bid, &mix->buffers) {
if (bid->ptr != NULL) {
@ -1047,7 +1057,7 @@ client_node_port_use_buffers(void *object,
struct pw_type *t = &core->type;
int res, prot;
mix = find_mix(data, direction, port_id, mix_id);
mix = ensure_mix(data, direction, port_id, mix_id);
if (mix == NULL) {
res = -EINVAL;
goto done;
@ -1164,7 +1174,7 @@ client_node_port_use_buffers(void *object,
bufs[i] = b;
}
res = pw_port_use_buffers(mix->port, bufs, n_buffers);
res = pw_port_use_buffers(mix->port, mix->mix_id, bufs, n_buffers);
done:
pw_client_node_proxy_done(data->node_proxy, seq, res);
@ -1212,7 +1222,7 @@ client_node_port_set_io(void *object,
struct mem *mid;
void *ptr;
mix = find_mix(data, direction, port_id, mix_id);
mix = ensure_mix(data, direction, port_id, mix_id);
if (mix == NULL)
return;
@ -1317,26 +1327,40 @@ static const struct pw_node_events node_events = {
.have_output = node_have_output,
};
static int
do_deactivate_mix(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct mix *mix = user_data;
SPA_FLAG_SET(mix->mix.port.flags, SPA_GRAPH_PORT_FLAG_DISABLED);
spa_graph_port_remove(&mix->mix.port);
return 0;
}
static void clear_mix(struct node_data *data, struct mix *mix)
{
if (mix->port) {
clear_buffers(data, mix);
pw_array_clear(&mix->buffers);
mix->port = NULL;
}
clear_buffers(data, mix);
pw_array_clear(&mix->buffers);
pw_loop_invoke(data->core->data_loop,
do_deactivate_mix, SPA_ID_INVALID, NULL, 0, true, mix);
spa_list_remove(&mix->link);
spa_list_append(&data->free_mix, &mix->link);
}
static void node_proxy_destroy(void *_data)
{
struct node_data *data = _data;
struct pw_proxy *proxy = (struct pw_proxy*) data->node_proxy;
int i;
struct mix *mix, *tmp;
if (data->trans) {
for (i = 0; i < MAX_MIX; i++)
clear_mix(data, &data->mix[i]);
spa_list_for_each_safe(mix, tmp, &data->mix[SPA_DIRECTION_INPUT], link)
clear_mix(data, mix);
spa_list_for_each_safe(mix, tmp, &data->mix[SPA_DIRECTION_OUTPUT], link)
clear_mix(data, mix);
}
clean_transport(proxy);
clean_transport(data);
spa_hook_remove(&data->node_listener);
}
@ -1352,6 +1376,7 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
struct remote *impl = SPA_CONTAINER_OF(remote, struct remote, this);
struct pw_proxy *proxy;
struct node_data *data;
int i;
proxy = pw_core_proxy_create_object(remote->core_proxy,
"client-node",
@ -1369,6 +1394,12 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
data->t = pw_core_get_type(data->core);
data->node_proxy = (struct pw_client_node_proxy *)proxy;
spa_list_init(&data->free_mix);
spa_list_init(&data->mix[0]);
spa_list_init(&data->mix[1]);
for (i = 0; i < MAX_MIX; i++)
spa_list_append(&data->free_mix, &data->mix_pool[i].link);
pw_array_init(&data->mems, 64);
pw_array_ensure_size(&data->mems, sizeof(struct mem) * 64);