remote: make extra nodes for scheduler

Add an extra input and output node for remote nodes. We link this
node to all remote inputs and outputs to make it easier to use
the scheduler.
Improve scheduler debug
This commit is contained in:
Wim Taymans 2017-09-15 17:26:01 +02:00
parent 4d03224141
commit 3c1957fd9d
7 changed files with 125 additions and 77 deletions

View file

@ -53,7 +53,7 @@ static inline void spa_graph_data_port_check(struct spa_graph_data *data, struct
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready[SPA_DIRECTION_INPUT]++;
debug("port %p node %p check %d %d %d\n", port, node,
spa_debug("port %p node %p check %d %d %d", port, node,
port->io->status, node->ready[SPA_DIRECTION_INPUT], required);
if (required > 0 && node->ready[SPA_DIRECTION_INPUT] == required) {
@ -80,7 +80,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data)
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
debug("node %p state %d\n", n, n->state);
spa_debug("node %p state %d", n, n->state);
switch (n->state) {
case SPA_GRAPH_STATE_IN:
@ -89,7 +89,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
debug("node %p processed input state %d\n", n, n->state);
spa_debug("node %p processed input state %d", n, n->state);
if (n == data->node)
break;
spa_list_append(&data->ready, &n->ready_link);
@ -101,7 +101,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_RESULT_HAVE_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_OUT;
debug("node %p processed output state %d\n", n, n->state);
spa_debug("node %p processed output state %d", n, n->state);
spa_list_append(&data->ready, &n->ready_link);
break;
@ -135,7 +135,7 @@ static inline bool spa_graph_data_iterate(struct spa_graph_data *data)
static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *node)
{
struct spa_graph_data *d = data;
debug("node %p start pull\n", node);
spa_debug("node %p start pull", node);
node->state = SPA_GRAPH_STATE_CHECK_IN;
d->node = node;
if (node->ready_link.next == NULL)
@ -149,7 +149,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *node)
{
struct spa_graph_data *d = data;
debug("node %p start push\n", node);
spa_debug("node %p start push", node);
node->state = SPA_GRAPH_STATE_OUT;
d->node = node;
if (node->ready_link.next == NULL)

View file

@ -48,7 +48,7 @@ static inline void spa_graph_port_check(struct spa_graph *graph, struct spa_grap
if (port->io->status == SPA_RESULT_HAVE_BUFFER)
node->ready++;
debug("port %p node %p check %d %d %d\n", port, node, port->io->status, node->ready, node->required);
spa_debug("port %p node %p check %d %d %d", port, node, port->io->status, node->ready, node->required);
if (node->required > 0 && node->ready == node->required) {
node->action = SPA_GRAPH_ACTION_IN;
@ -68,7 +68,7 @@ static inline void spa_graph_node_update(struct spa_graph *graph, struct spa_gra
if (p->io->status == SPA_RESULT_OK && !(node->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
node->ready++;
}
debug("node %p update %d ready\n", node, node->ready);
spa_debug("node %p update %d ready", node, node->ready);
}
static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
@ -82,7 +82,7 @@ static inline bool spa_graph_scheduler_iterate(struct spa_graph *graph)
next:
empty = spa_list_is_empty(&graph->ready);
if (empty && !spa_list_is_empty(&graph->pending)) {
debug("copy pending\n");
spa_debug("copy pending");
spa_list_insert_list(&graph->ready, &graph->pending);
spa_list_init(&graph->pending);
empty = false;
@ -94,7 +94,7 @@ next:
spa_list_remove(&n->ready_link);
n->ready_link.next = NULL;
debug("node %p state %d\n", n, n->state);
spa_debug("node %p state %d", n, n->state);
switch (n->state) {
case SPA_GRAPH_STATE_IN:
@ -104,13 +104,13 @@ next:
n->state = SPA_GRAPH_STATE_OUT;
state = n->schedule(n);
debug("node %p schedule %d res %d\n", n, action, state);
spa_debug("node %p schedule %d res %d", n, action, state);
if (n->state == SPA_GRAPH_STATE_IN && n == graph->node)
break;
if (n->state != SPA_GRAPH_STATE_END) {
debug("node %p add ready for CHECK\n", n);
spa_debug("node %p add ready for CHECK", n);
if (state == SPA_RESULT_NEED_BUFFER)
n->state = SPA_GRAPH_STATE_CHECK_IN;
else if (state == SPA_RESULT_HAVE_BUFFER)
@ -132,7 +132,7 @@ next:
if (pn != graph->node
|| pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC) {
pn->state = SPA_GRAPH_STATE_OUT;
debug("node %p add ready OUT\n", n);
spa_debug("node %p add ready OUT", n);
spa_list_insert(graph->ready.prev,
&pn->ready_link);
}
@ -145,7 +145,7 @@ next:
spa_list_for_each(p, &n->ports[SPA_DIRECTION_OUTPUT], link)
spa_graph_port_check(graph, p->peer);
debug("node %p add pending\n", n);
spa_debug("node %p add pending", n);
n->state = SPA_GRAPH_STATE_END;
spa_list_insert(&graph->pending, &n->ready_link);
break;
@ -165,7 +165,7 @@ static inline void spa_graph_scheduler_pull(struct spa_graph *graph, struct spa_
node->action = SPA_GRAPH_ACTION_CHECK;
node->state = SPA_RESULT_NEED_BUFFER;
graph->node = node;
debug("node %p start pull\n", node);
spa_debug("node %p start pull", node);
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}
@ -174,7 +174,7 @@ static inline void spa_graph_scheduler_push(struct spa_graph *graph, struct spa_
{
node->action = SPA_GRAPH_ACTION_OUT;
graph->node = node;
debug("node %p start push\n", node);
spa_debug("node %p start push", node);
if (node->ready_link.next == NULL)
spa_list_insert(graph->ready.prev, &node->ready_link);
}

View file

@ -32,7 +32,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
struct spa_graph_node *n, *t;
struct spa_list ready;
debug("node %p start pull\n", node);
spa_debug("node %p start pull", node);
spa_list_init(&ready);
@ -43,7 +43,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
if ((pport = p->peer) == NULL)
continue;
pnode = pport->node;
debug("node %p peer %p io %d %d\n", node, pnode, pport->io->status, pport->io->buffer_id);
spa_debug("node %p peer %p io %d %d", node, pnode, pport->io->status, pport->io->buffer_id);
if (pport->io->status == SPA_RESULT_NEED_BUFFER) {
if (pnode->ready_link.next == NULL)
spa_list_append(&ready, &pnode->ready_link);
@ -54,7 +54,7 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->state = spa_node_process_output(n->implementation);
debug("peer %p processed out %d\n", n, n->state);
spa_debug("peer %p processed out %d", n, n->state);
if (n->state == SPA_RESULT_NEED_BUFFER)
spa_graph_need_input(n->graph, n);
else {
@ -67,11 +67,11 @@ static inline int spa_graph_impl_need_input(void *data, struct spa_graph_node *n
n->ready_link.next = NULL;
}
debug("node %p ready:%d required:%d\n", node, node->ready[SPA_DIRECTION_INPUT], node->required[SPA_DIRECTION_INPUT]);
spa_debug("node %p ready:%d required:%d", node, node->ready[SPA_DIRECTION_INPUT], node->required[SPA_DIRECTION_INPUT]);
if (node->required[SPA_DIRECTION_INPUT] > 0 && node->ready[SPA_DIRECTION_INPUT] == node->required[SPA_DIRECTION_INPUT]) {
node->state = spa_node_process_input(node->implementation);
debug("node %p processed in %d\n", node, node->state);
spa_debug("node %p processed in %d", node, node->state);
if (node->state == SPA_RESULT_HAVE_BUFFER) {
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link) {
if (p->io->status == SPA_RESULT_HAVE_BUFFER)
@ -89,7 +89,7 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *
struct spa_list ready;
struct spa_graph_node *n, *t;
debug("node %p start push\n", node);
spa_debug("node %p start push", node);
spa_list_init(&ready);
@ -108,7 +108,7 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *
pready = pnode->ready[SPA_DIRECTION_INPUT];
prequired = pnode->required[SPA_DIRECTION_INPUT];
debug("node %p peer %p io %d %d %d\n", node, pnode, pport->io->status,
spa_debug("node %p peer %p io %d %d %d", node, pnode, pport->io->status,
pready, prequired);
if (prequired > 0 && pready == prequired)
@ -118,7 +118,7 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *
spa_list_for_each_safe(n, t, &ready, ready_link) {
n->state = spa_node_process_input(n->implementation);
debug("node %p chain processed in %d\n", n, n->state);
spa_debug("node %p chain processed in %d", n, n->state);
if (n->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_have_output(n->graph, n);
else {
@ -134,7 +134,7 @@ static inline int spa_graph_impl_have_output(void *data, struct spa_graph_node *
}
node->state = spa_node_process_output(node->implementation);
debug("node %p processed out %d\n", node, node->state);
spa_debug("node %p processed out %d", node, node->state);
if (node->state == SPA_RESULT_NEED_BUFFER) {
node->ready[SPA_DIRECTION_INPUT] = 0;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {

View file

@ -30,10 +30,8 @@ extern "C" {
#include <spa/list.h>
#include <spa/node.h>
#if 0
#define debug(...) printf(__VA_ARGS__)
#else
#define debug(...)
#ifndef spa_debug
#define spa_debug(...)
#endif
struct spa_graph;
@ -105,7 +103,7 @@ spa_graph_node_init(struct spa_graph_node *node)
node->flags = 0;
node->required[SPA_DIRECTION_INPUT] = node->ready[SPA_DIRECTION_INPUT] = 0;
node->required[SPA_DIRECTION_OUTPUT] = node->ready[SPA_DIRECTION_OUTPUT] = 0;
debug("node %p init\n", node);
spa_debug("node %p init", node);
}
static inline void
@ -123,7 +121,7 @@ spa_graph_node_add(struct spa_graph *graph,
node->state = SPA_RESULT_NEED_BUFFER;
node->ready_link.next = NULL;
spa_list_append(&graph->nodes, &node->link);
debug("node %p add\n", node);
spa_debug("node %p add", node);
}
static inline void
@ -133,7 +131,7 @@ spa_graph_port_init(struct spa_graph_port *port,
uint32_t flags,
struct spa_port_io *io)
{
debug("port %p init type %d id %d\n", port, direction, port_id);
spa_debug("port %p init type %d id %d", port, direction, port_id);
port->direction = direction;
port->port_id = port_id;
port->flags = flags;
@ -144,7 +142,7 @@ static inline void
spa_graph_port_add(struct spa_graph_node *node,
struct spa_graph_port *port)
{
debug("port %p add to node %p\n", port, node);
spa_debug("port %p add to node %p", port, node);
port->node = node;
spa_list_append(&node->ports[port->direction], &port->link);
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL))
@ -153,7 +151,7 @@ spa_graph_port_add(struct spa_graph_node *node,
static inline void spa_graph_node_remove(struct spa_graph_node *node)
{
debug("node %p remove\n", node);
spa_debug("node %p remove", node);
spa_list_remove(&node->link);
if (node->ready_link.next)
spa_list_remove(&node->ready_link);
@ -161,7 +159,7 @@ static inline void spa_graph_node_remove(struct spa_graph_node *node)
static inline void spa_graph_port_remove(struct spa_graph_port *port)
{
debug("port %p remove\n", port);
spa_debug("port %p remove", port);
spa_list_remove(&port->link);
if (!(port->flags & SPA_PORT_INFO_FLAG_OPTIONAL))
port->node->required[port->direction]--;
@ -170,7 +168,7 @@ static inline void spa_graph_port_remove(struct spa_graph_port *port)
static inline void
spa_graph_port_link(struct spa_graph_port *out, struct spa_graph_port *in)
{
debug("port %p link to %p \n", out, in);
spa_debug("port %p link to %p", out, in);
out->peer = in;
in->peer = out;
}
@ -178,7 +176,7 @@ spa_graph_port_link(struct spa_graph_port *out, struct spa_graph_port *in)
static inline void
spa_graph_port_unlink(struct spa_graph_port *port)
{
debug("port %p unlink from %p \n", port, port->peer);
spa_debug("port %p unlink from %p", port, port->peer);
if (port->peer) {
port->peer->peer = NULL;
port->peer = NULL;

View file

@ -794,15 +794,17 @@ static int spa_proxy_node_process_output(struct spa_node *node)
impl->transport->outputs[i] = *io;
if (tmp.status == SPA_RESULT_HAVE_BUFFER)
res = SPA_RESULT_HAVE_BUFFER;
else if (tmp.status == SPA_RESULT_NEED_BUFFER)
res = SPA_RESULT_NEED_BUFFER;
*io = tmp;
pw_log_trace("%d %d -> %d %d", io->status, io->buffer_id,
impl->transport->outputs[i].status,
impl->transport->outputs[i].buffer_id);
}
pw_client_node_transport_add_message(impl->transport,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT));
do_flush(this);
return res;
}

View file

@ -20,9 +20,10 @@
#include <time.h>
#include <stdio.h>
#define spa_debug pw_log_trace
#include <spa/lib/debug.h>
#include <spa/format-utils.h>
#include <spa/graph-scheduler3.h>
#include <pipewire/pipewire.h>
#include <pipewire/private.h>
@ -31,6 +32,8 @@
#include <pipewire/core.h>
#include <pipewire/data-loop.h>
#include <spa/graph-scheduler3.h>
/** \cond */
struct resource_data {
struct spa_hook resource_listener;

View file

@ -61,6 +61,11 @@ struct buffer_id {
struct spa_buffer *buf;
};
struct port {
struct spa_graph_port output;
struct spa_graph_port input;
};
struct node_data {
struct pw_remote *remote;
struct pw_core *core;
@ -70,8 +75,13 @@ struct node_data {
int rtwritefd;
struct spa_source *rtsocket_source;
struct pw_client_node_transport *trans;
struct spa_graph_port *in_ports;
struct spa_graph_port *out_ports;
struct spa_node out_node_impl;
struct spa_graph_node out_node;
struct port *out_ports;
struct spa_node in_node_impl;
struct spa_graph_node in_node;
struct port *in_ports;
struct pw_node *node;
struct spa_hook node_listener;
@ -430,36 +440,14 @@ static void unhandle_socket(struct pw_proxy *proxy)
static void handle_rtnode_message(struct pw_proxy *proxy, struct pw_client_node_message *message)
{
struct node_data *data = proxy->user_data;
struct spa_graph_node *n = &data->node->rt.node;
struct spa_graph_port *port, *pp;
struct spa_graph_node *pn;
if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT) {
/* process all input in the mixers */
spa_list_for_each(port, &n->ports[SPA_DIRECTION_INPUT], link) {
pn = port->peer->node;
pn->state = spa_node_process_input(pn->implementation);
if (pn->state == SPA_RESULT_HAVE_BUFFER)
spa_graph_have_output(data->node->rt.graph, pn);
else {
pn->ready[SPA_DIRECTION_INPUT] = 0;
spa_list_for_each(pp, &pn->ports[SPA_DIRECTION_INPUT], link) {
if (pp->io->status == SPA_RESULT_OK &&
!(pn->flags & SPA_GRAPH_NODE_FLAG_ASYNC))
pn->ready[SPA_DIRECTION_INPUT]++;
}
}
}
pw_log_trace("remote %p: process input", data->remote);
spa_graph_have_output(data->node->rt.graph, &data->in_node);
}
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT) {
spa_list_for_each(port, &n->ports[SPA_DIRECTION_OUTPUT], link) {
pn = port->peer->node;
pn->state = spa_node_process_output(pn->implementation);
pw_log_trace("node %p: process output %d", pn->implementation, pn->state);
if (pn->state == SPA_RESULT_NEED_BUFFER) {
spa_graph_need_input(data->node->rt.graph, pn);
}
}
pw_log_trace("remote %p: process output", data->remote);
spa_graph_need_input(data->node->rt.graph, &data->out_node);
}
else if (PW_CLIENT_NODE_MESSAGE_TYPE(message) == PW_CLIENT_NODE_MESSAGE_REUSE_BUFFER) {
}
@ -503,10 +491,14 @@ static void clean_transport(struct pw_proxy *proxy)
if (data->trans == NULL)
return;
spa_list_for_each(port, &data->node->input_ports, link)
spa_graph_port_remove(&data->in_ports[port->port_id]);
spa_list_for_each(port, &data->node->output_ports, link)
spa_graph_port_remove(&data->out_ports[port->port_id]);
spa_list_for_each(port, &data->node->input_ports, link) {
spa_graph_port_remove(&data->in_ports[port->port_id].output);
spa_graph_port_remove(&data->in_ports[port->port_id].input);
}
spa_list_for_each(port, &data->node->output_ports, link) {
spa_graph_port_remove(&data->out_ports[port->port_id].output);
spa_graph_port_remove(&data->out_ports[port->port_id].input);
}
free(data->in_ports);
free(data->out_ports);
@ -517,6 +509,11 @@ static void clean_transport(struct pw_proxy *proxy)
data->trans = NULL;
}
struct port_info {
struct spa_graph_port internal;
struct spa_graph_port external;
};
static void client_node_transport(void *object, uint32_t node_id,
int readfd, int writefd,
struct pw_client_node_transport *transport)
@ -535,31 +532,45 @@ static void client_node_transport(void *object, uint32_t node_id,
proxy, data->trans, readfd, writefd, node_id);
data->in_ports = calloc(data->trans->area->max_input_ports,
sizeof(struct spa_graph_port));
sizeof(struct port));
data->out_ports = calloc(data->trans->area->max_output_ports,
sizeof(struct spa_graph_port));
sizeof(struct port));
for (i = 0; i < data->trans->area->max_input_ports; i++) {
spa_graph_port_init(&data->in_ports[i],
spa_graph_port_init(&data->in_ports[i].input,
SPA_DIRECTION_INPUT,
i,
0,
&data->trans->inputs[i]);
spa_graph_port_init(&data->in_ports[i].output,
SPA_DIRECTION_OUTPUT,
i,
0,
&data->trans->inputs[i]);
spa_graph_port_add(&data->in_node, &data->in_ports[i].output);
spa_graph_port_link(&data->in_ports[i].output, &data->in_ports[i].input);
pw_log_info("transport in %d %p", i, &data->trans->inputs[i]);
}
spa_list_for_each(port, &data->node->input_ports, link)
spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id]);
spa_graph_port_add(&port->rt.mix_node, &data->in_ports[port->port_id].input);
for (i = 0; i < data->trans->area->max_output_ports; i++) {
spa_graph_port_init(&data->out_ports[i],
spa_graph_port_init(&data->out_ports[i].output,
SPA_DIRECTION_OUTPUT,
i,
0,
&data->trans->outputs[i]);
spa_graph_port_init(&data->out_ports[i].input,
SPA_DIRECTION_INPUT,
i,
0,
&data->trans->outputs[i]);
spa_graph_port_add(&data->out_node, &data->out_ports[i].input);
spa_graph_port_link(&data->out_ports[i].output, &data->out_ports[i].input);
pw_log_info("transport out %d %p", i, &data->trans->inputs[i]);
}
spa_list_for_each(port, &data->node->output_ports, link)
spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id]);
spa_graph_port_add(&port->rt.mix_node, &data->out_ports[port->port_id].output);
data->rtwritefd = writefd;
data->rtsocket_source = pw_loop_add_io(proxy->remote->core->data_loop,
@ -1039,6 +1050,33 @@ static const struct pw_proxy_events proxy_events = {
.destroy = node_proxy_destroy,
};
static int impl_process_input(struct spa_node *node)
{
#if 0
struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, out_node_impl);
node_have_output(data);
#endif
pw_log_trace("node %p: have output", node);
return SPA_RESULT_OK;
}
static int impl_process_output(struct spa_node *node)
{
#if 0
struct node_data *data = SPA_CONTAINER_OF(node, struct node_data, in_node_impl);
node_need_input(data);
#endif
pw_log_trace("node %p: need input", node);
return SPA_RESULT_OK;
}
static const struct spa_node node_impl = {
SPA_VERSION_NODE,
NULL,
.process_input = impl_process_input,
.process_output = impl_process_output,
};
struct pw_proxy *pw_remote_export(struct pw_remote *remote,
struct pw_node *node)
{
@ -1062,6 +1100,13 @@ struct pw_proxy *pw_remote_export(struct pw_remote *remote,
data->core = pw_node_get_core(node);
data->t = pw_core_get_type(data->core);
data->node_proxy = (struct pw_client_node_proxy *)proxy;
data->in_node_impl = node_impl;
data->out_node_impl = node_impl;
spa_graph_node_init(&data->in_node);
spa_graph_node_set_implementation(&data->in_node, &data->in_node_impl);
spa_graph_node_init(&data->out_node);
spa_graph_node_set_implementation(&data->out_node, &data->out_node_impl);
pw_array_init(&data->mem_ids, 64);
pw_array_ensure_size(&data->mem_ids, sizeof(struct mem_id) * 64);