simplify things with just 1 process function

Make just one process function in spa node. With the io area states
we can do more complicated io patterns.
This commit is contained in:
Wim Taymans 2018-03-16 16:55:25 +01:00
parent e8d0281982
commit 9b0a880afb
21 changed files with 202 additions and 359 deletions

View file

@ -539,7 +539,7 @@ static int do_render(struct spa_loop *loop, bool async, uint32_t seq,
return 0;
}
static int impl_node_process_input(struct spa_node *node)
static int impl_node_process(struct spa_node *node)
{
struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
struct spa_buffer *buf;
@ -574,7 +574,7 @@ static const struct spa_node impl_node = {
.port_enum_params = impl_port_enum_params,
.port_set_param = impl_port_set_param,
.port_use_buffers = impl_port_use_buffers,
.process_input = impl_node_process_input,
.process = impl_node_process,
};
static void make_node(struct data *data)

View file

@ -467,7 +467,7 @@ static void fill_s16(struct data *d, void *dest, int avail)
}
}
static int impl_node_process_output(struct spa_node *node)
static int impl_node_process(struct spa_node *node)
{
struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
struct buffer *b;
@ -529,7 +529,7 @@ static const struct spa_node impl_node = {
.port_set_param = impl_port_set_param,
.port_use_buffers = impl_port_use_buffers,
.port_reuse_buffer = impl_port_reuse_buffer,
.process_output = impl_node_process_output,
.process = impl_node_process,
};
static void make_node(struct data *data)

View file

@ -433,7 +433,7 @@ static int do_render(struct spa_loop *loop, bool async, uint32_t seq,
return 0;
}
static int impl_node_process_input(struct spa_node *node)
static int impl_node_process(struct spa_node *node)
{
struct data *d = SPA_CONTAINER_OF(node, struct data, impl_node);
int res;
@ -461,7 +461,7 @@ static const struct spa_node impl_node = {
.port_enum_params = impl_port_enum_params,
.port_set_param = impl_port_set_param,
.port_use_buffers = impl_port_use_buffers,
.process_input = impl_node_process_input,
.process = impl_node_process,
};
static void make_nodes(struct data *data)

View file

@ -279,7 +279,7 @@ static void dequeue_buffer(struct node *n, struct buffer *b)
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
}
static int node_process_input(struct spa_node *node)
static int node_process(struct spa_node *node)
{
struct node *n = SPA_CONTAINER_OF(node, struct node, node_impl);
struct pw_node *this = n->node;
@ -311,43 +311,6 @@ static int node_process_input(struct spa_node *node)
return outio->status;
}
static int node_process_output(struct spa_node *node)
{
struct node *n = SPA_CONTAINER_OF(node, struct node, node_impl);
struct pw_node *this = n->node;
struct port *outp = GET_OUT_PORT(n, 0);
struct spa_io_buffers *outio = outp->io;
int i;
pw_log_trace(NAME " %p: process output %d", this, outio->status);
if (outio->status == SPA_STATUS_HAVE_BUFFER)
return SPA_STATUS_HAVE_BUFFER;
if (outio->buffer_id < outp->n_buffers) {
recycle_buffer(n, outp, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID;
}
if (n->n_in_ports == 0) {
outio->buffer_id = SPA_ID_INVALID;
outio->status = SPA_STATUS_HAVE_BUFFER;
}
else {
for (i = 0; i < n->n_in_ports; i++) {
struct port *inp = GET_IN_PORT(n, i);
struct spa_io_buffers *inio = inp->io;
if (inio == NULL)
continue;
inio->status = SPA_STATUS_NEED_BUFFER;
pw_log_trace(NAME " %p: port %d %d", this, i, inio->buffer_id);
}
outio->status = SPA_STATUS_NEED_BUFFER;
}
return outio->status;
}
static int port_set_io(struct spa_node *node,
enum spa_direction direction, uint32_t port_id,
uint32_t id, void *data, size_t size)
@ -623,12 +586,11 @@ static const struct spa_node node_impl = {
.port_set_io = port_set_io,
.port_reuse_buffer = port_reuse_buffer,
.port_send_command = port_send_command,
.process_input = node_process_input,
.process_output = node_process_output,
.process = node_process,
};
static int schedule_mix_input(struct spa_node *_node)
static int schedule_mix(struct spa_node *_node)
{
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct port *p = port->owner_data;
@ -678,24 +640,6 @@ static int schedule_mix_input(struct spa_node *_node)
return SPA_STATUS_HAVE_BUFFER;
}
static int schedule_mix_output(struct spa_node *_node)
{
struct pw_port *port = SPA_CONTAINER_OF(_node, struct pw_port, mix_node);
struct spa_graph_node *node = &port->rt.mix_node;
struct spa_graph_port *gp;
struct spa_io_buffers *io = port->rt.mix_port.io;
pw_log_trace("port %p", port);
spa_list_for_each(gp, &node->ports[SPA_DIRECTION_INPUT], link) {
pw_log_trace("port %p: port %d %d %p->%p %d %d", port,
gp->port_id, gp->flags, io, gp->io, io->status, io->buffer_id);
gp->io->status = io->status;
}
io->status = SPA_STATUS_HAVE_BUFFER;
return io->status;
}
static int schedule_mix_use_buffers(struct spa_node *_node,
enum spa_direction direction,
uint32_t port_id,
@ -720,8 +664,7 @@ static const struct spa_node schedule_mix_node = {
SPA_VERSION_NODE,
NULL,
.port_use_buffers = schedule_mix_use_buffers,
.process_input = schedule_mix_input,
.process_output = schedule_mix_output,
.process = schedule_mix,
};
static void port_free(void *data)

View file

@ -842,63 +842,18 @@ impl_node_port_send_command(struct spa_node *node,
return 0;
}
static int impl_node_process_input(struct spa_node *node)
static int impl_node_process(struct spa_node *node)
{
struct node *this = SPA_CONTAINER_OF(node, struct node, node);
struct impl *impl = this->impl;
struct spa_graph_node *n = &impl->this.node->rt.node;
bool client_reuse = impl->client_reuse;
struct spa_graph_port *p, *pp;
int res;
if (impl->input_ready == 0) {
/* the client is not ready to receive our buffers, recycle them */
pw_log_trace("node not ready, recycle buffers");
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link)
p->io->status = SPA_STATUS_NEED_BUFFER;
res = SPA_STATUS_NEED_BUFFER;
}
else {
spa_list_for_each(p, &n->ports[SPA_DIRECTION_INPUT], link) {
struct spa_io_buffers *io = p->io;
pw_log_trace("set io status to %d %d", io->status, io->buffer_id);
/* explicitly recycle buffers when the client is not going to do it */
if (!client_reuse && (pp = p->peer))
spa_graph_node_reuse_buffer(pp->node,
pp->port_id, io->buffer_id);
}
pw_log_trace("client-node %p: send process input", this);
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
do_flush(this);
impl->input_ready--;
res = SPA_STATUS_OK;
}
return res;
}
static int impl_node_process_output(struct spa_node *node)
{
struct node *this;
struct impl *impl;
this = SPA_CONTAINER_OF(node, struct node, node);
impl = this->impl;
pw_log_trace("client-node %p: process output %d", this, impl->out_pending);
if (impl->out_pending)
return SPA_STATUS_OK;
impl->out_pending = true;
pw_log_trace("client-node %p: send process input", this);
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT));
do_flush(this);
return SPA_STATUS_OK;
res = SPA_STATUS_OK;
return res;
}
static int handle_node_message(struct node *this, struct pw_client_node_message *message)
@ -1082,8 +1037,7 @@ static const struct spa_node impl_node = {
impl_node_port_set_io,
impl_node_port_reuse_buffer,
impl_node_port_send_command,
impl_node_process_input,
impl_node_process_output,
impl_node_process,
};
static int
@ -1285,7 +1239,7 @@ static int mix_port_set_io(struct spa_node *node,
id, data, size);
}
static int mix_port_process_input(struct spa_node *data)
static int mix_port_process(struct spa_node *data)
{
struct pw_port *p = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct spa_io_buffers *io = p->rt.mix_port.io;
@ -1293,22 +1247,13 @@ static int mix_port_process_input(struct spa_node *data)
return SPA_STATUS_HAVE_BUFFER;
}
static int mix_port_process_output(struct spa_node *data)
{
struct pw_port *p = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct spa_io_buffers *io = p->rt.mix_port.io;
pw_log_trace("client-node %p: pass %d %d", data, io->status, io->buffer_id);
return SPA_STATUS_NEED_BUFFER;
}
static void node_port_added(void *data, struct pw_port *port)
{
struct impl *impl = data;
pw_log_debug("client-node %p: port %p added", &impl->this, port);
port->mix_node.port_set_io = mix_port_set_io;
port->mix_node.process_input = mix_port_process_input;
port->mix_node.process_output = mix_port_process_output;
port->mix_node.process = mix_port_process;
port->implementation = &port_impl;
port->implementation_data = impl;

View file

@ -1154,6 +1154,30 @@ static const struct pw_node_events output_node_events = {
.async_complete = output_node_async_complete,
};
static int
do_join_graphs(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_link *this = user_data;
struct spa_graph *in_graph, *out_graph;
struct spa_graph *in_root, *out_root;
in_graph = this->input->node->rt.node.graph;
out_graph = this->output->node->rt.node.graph;
in_root = spa_graph_find_root(in_graph);
out_root = spa_graph_find_root(out_graph);
if (out_root == in_root)
return 0;
if (SPA_FLAG_CHECK(in_root->flags, SPA_GRAPH_FLAG_DRIVER))
spa_graph_add_subgraph(in_root, out_root);
else
spa_graph_add_subgraph(out_root, in_root);
return 0;
}
struct pw_link *pw_link_new(struct pw_core *core,
struct pw_port *output,
struct pw_port *input,
@ -1166,6 +1190,7 @@ struct pw_link *pw_link_new(struct pw_core *core,
struct pw_link *this;
struct pw_node *input_node, *output_node;
struct spa_graph *in_graph, *out_graph;
struct spa_graph *in_root, *out_root;
if (output == input)
goto same_ports;
@ -1179,7 +1204,16 @@ struct pw_link *pw_link_new(struct pw_core *core,
in_graph = input_node->rt.node.graph;
out_graph = output_node->rt.node.graph;
if (in_graph != NULL && out_graph != NULL && in_graph != out_graph)
pw_log_debug("link new %p %p", in_graph, out_graph);
in_root = spa_graph_find_root(in_graph);
out_root = spa_graph_find_root(out_graph);
pw_log_debug("link new %p %p", in_root, out_root);
if (SPA_FLAG_CHECK(in_root->flags, SPA_GRAPH_FLAG_DRIVER) &&
SPA_FLAG_CHECK(out_root->flags, SPA_GRAPH_FLAG_DRIVER) &&
in_root != out_root)
goto link_not_supported;
impl = calloc(1, sizeof(struct impl) + user_data_size);
@ -1243,10 +1277,8 @@ struct pw_link *pw_link_new(struct pw_core *core,
output_node, output->port_id, this->rt.mix[SPA_DIRECTION_OUTPUT].port.port_id,
input_node, input->port_id, this->rt.mix[SPA_DIRECTION_INPUT].port.port_id);
if (out_graph != NULL)
pw_node_join_graph(input_node, out_graph);
else if (in_graph != NULL)
pw_node_join_graph(output_node, in_graph);
pw_loop_invoke(output->node->data_loop,
do_join_graphs, SPA_ID_INVALID, NULL, 0, false, this);
spa_hook_list_call(&output->listener_list, struct pw_port_events, link_added, this);
spa_hook_list_call(&input->listener_list, struct pw_port_events, link_added, this);

View file

@ -257,7 +257,7 @@ void pw_memblock_free(struct pw_memblock *mem)
if (mem == NULL)
return;
pw_log_debug("mem %p: free", mem);
pw_log_debug("mem %p: free %p %d", mem, mem->ptr, mem->fd);
if (mem->flags & PW_MEMBLOCK_FLAG_WITH_FD) {
if (mem->ptr)
munmap(mem->ptr, mem->size);

View file

@ -392,6 +392,11 @@ static void check_properties(struct pw_node *node)
node->driver = pw_properties_parse_bool(str);
else
node->driver = false;
if (node->driver)
SPA_FLAG_SET(impl->graph.flags, SPA_GRAPH_FLAG_DRIVER);
else
SPA_FLAG_UNSET(impl->graph.flags, SPA_GRAPH_FLAG_DRIVER);
}
static int
@ -458,7 +463,7 @@ struct pw_node *pw_node_new(struct pw_core *core,
this->rt.activation = &impl->activation;
spa_graph_node_init(&this->rt.node, &this->rt.activation->state);
pw_loop_invoke(this->data_loop, do_node_join, 1, NULL, 0, false, this);
pw_loop_invoke(this->data_loop, do_node_join, 1, NULL, 0, true, this);
spa_list_init(&this->rt.links[SPA_DIRECTION_INPUT]);
spa_list_init(&this->rt.links[SPA_DIRECTION_OUTPUT]);
@ -628,7 +633,12 @@ do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
if (impl->graph.parent)
spa_graph_remove_subgraph(&impl->graph);
spa_graph_node_remove(&this->rt.node);
return 0;
}

View file

@ -64,23 +64,8 @@ static int schedule_tee_input(struct spa_node *data)
*p->io = *io;
}
}
return io->status;
}
static int schedule_tee_output(struct spa_node *data)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_io_buffers *io = this->rt.mix_port.io;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
pw_log_trace("port %p: port %d %d %p->%p %d %d",
this, p->port_id, p->flags, p->io, io,
p->io->status, p->io->buffer_id);
*io = *p->io;
}
pw_log_trace("port %p: tee output %d %d", this, io->status, io->buffer_id);
return io->status;
io->status = SPA_STATUS_NEED_BUFFER;
return SPA_STATUS_HAVE_BUFFER;
}
static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
@ -98,8 +83,7 @@ static int schedule_tee_reuse_buffer(struct spa_node *data, uint32_t port_id, ui
static const struct spa_node schedule_tee_node = {
SPA_VERSION_NODE,
NULL,
.process_input = schedule_tee_input,
.process_output = schedule_tee_output,
.process = schedule_tee_input,
.port_reuse_buffer = schedule_tee_reuse_buffer,
};
@ -119,24 +103,6 @@ static int schedule_mix_input(struct spa_node *data)
return io->status;
}
static int schedule_mix_output(struct spa_node *data)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
struct spa_graph_node *node = &this->rt.mix_node;
struct spa_graph_port *p;
struct spa_io_buffers *io = this->rt.mix_port.io;
if (!spa_list_is_empty(&node->ports[SPA_DIRECTION_INPUT])) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
pw_log_trace("port %p: port %d %d %p->%p", this,
p->port_id, p->flags, io, p->io);
*p->io = *io;
}
}
pw_log_trace("port %p: output %d %d", this, io->status, io->buffer_id);
return io->status;
}
static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, uint32_t buffer_id)
{
struct pw_port *this = SPA_CONTAINER_OF(data, struct pw_port, mix_node);
@ -155,8 +121,7 @@ static int schedule_mix_reuse_buffer(struct spa_node *data, uint32_t port_id, ui
static const struct spa_node schedule_mix_node = {
SPA_VERSION_NODE,
NULL,
.process_input = schedule_mix_input,
.process_output = schedule_mix_output,
.process = schedule_mix_input,
.port_reuse_buffer = schedule_mix_reuse_buffer,
};
@ -335,9 +300,7 @@ static int do_add_port(struct spa_loop *loop,
spa_graph_port_add(&this->node->rt.node, &this->rt.port);
spa_graph_port_add(&this->rt.mix_node, &this->rt.mix_port);
spa_graph_port_link(&this->rt.port, &this->rt.mix_port);
if (this->rt.mix_node.graph)
spa_graph_node_add(this->rt.mix_node.graph, &this->rt.mix_node);
spa_graph_node_add(this->node->rt.node.graph, &this->rt.mix_node);
return 0;
}
@ -549,9 +512,7 @@ static int do_remove_port(struct spa_loop *loop,
spa_graph_port_remove(p);
spa_graph_port_remove(&this->rt.mix_port);
if (this->rt.mix_node.graph)
spa_graph_node_remove(&this->rt.mix_node);
this->rt.mix_node.graph = NULL;
spa_graph_node_remove(&this->rt.mix_node);
return 0;
}

View file

@ -612,41 +612,37 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
struct buffer *b;
uint32_t buffer_id;
buffer_id = io->buffer_id;
if (impl->direction == SPA_DIRECTION_INPUT) {
buffer_id = io->buffer_id;
pw_log_trace("stream %p: process input %d %d", stream, io->status,
pw_log_trace("stream %p: process input %d %d", stream, io->status,
buffer_id);
if ((b = find_buffer(stream, buffer_id)) == NULL)
return;
if ((b = find_buffer(stream, buffer_id)) == NULL)
return;
if (impl->client_reuse)
if (impl->client_reuse)
io->buffer_id = SPA_ID_INVALID;
if (io->status == SPA_STATUS_HAVE_BUFFER) {
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, buffer_id);
impl->in_new_buffer = false;
}
io->status = SPA_STATUS_NEED_BUFFER;
} else {
reuse_buffer(stream, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
if (io->status == SPA_STATUS_HAVE_BUFFER) {
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
impl->in_new_buffer = true;
pw_log_trace("stream %p: process output", stream);
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, buffer_id);
impl->in_new_buffer = false;
need_buffer);
impl->in_need_buffer = false;
}
io->status = SPA_STATUS_NEED_BUFFER;
send_need_input(stream);
break;
}
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
{
struct spa_io_buffers *io = impl->io;
reuse_buffer(stream, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
pw_log_trace("stream %p: process output", stream);
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer);
impl->in_need_buffer = false;
break;
}
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: