Work on improved scheduling or remote nodes

This commit is contained in:
Wim Taymans 2017-07-26 17:51:52 +02:00
parent 3d9f28c676
commit 9831786eb7
12 changed files with 94 additions and 93 deletions

View file

@ -489,7 +489,7 @@ static void make_node(struct data *data)
data->port->user_data = data;
data->port->implementation = &impl_port;
pw_port_add(data->port, data->node);
pw_node_export(data->node);
pw_node_register(data->node);
pw_remote_export(data->remote, data->node);
}

View file

@ -81,7 +81,7 @@ static void make_node(struct data *data)
"spa.factory.name", "v4l2-source", NULL);
data->node = pw_node_factory_create_node(factory, NULL, "v4l2-source", props);
pw_node_export(data->node);
pw_node_register(data->node);
pw_remote_export(data->remote, data->node);
}

View file

@ -472,7 +472,7 @@ static void make_nodes(struct data *data)
data->port->user_data = data;
data->port->implementation = &impl_port;
pw_port_add(data->port, data->node);
pw_node_export(data->node);
pw_node_register(data->node);
factory = pw_core_find_node_factory(data->core, "spa-node-factory");
props = pw_properties_new("spa.library.name", "v4l2/libspa-v4l2",

View file

@ -156,24 +156,6 @@ static inline void do_flush(struct proxy *this)
}
static inline void send_need_input(struct proxy *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy);
pw_transport_add_event(impl->transport,
&SPA_EVENT_INIT(impl->core->type.event_transport.NeedInput));
do_flush(this);
}
static inline void send_have_output(struct proxy *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, proxy);
pw_transport_add_event(impl->transport,
&SPA_EVENT_INIT(impl->core->type.event_transport.HaveOutput));
do_flush(this);
}
static int spa_proxy_node_send_command(struct spa_node *node, const struct spa_command *command)
{
struct proxy *this;
@ -195,9 +177,6 @@ static int spa_proxy_node_send_command(struct spa_node *node, const struct spa_c
} else {
/* send start */
pw_client_node_resource_node_command(this->resource, this->seq, command);
if (SPA_COMMAND_TYPE(command) == core->type.command_node.Start)
send_need_input(this);
res = SPA_RESULT_RETURN_ASYNC(this->seq++);
}
return res;
@ -780,9 +759,11 @@ static int spa_proxy_node_process_input(struct spa_node *node)
pw_log_trace("%d %d", io->status, io->buffer_id);
impl->transport->inputs[i] = *io;
io->status = SPA_RESULT_OK;
io->status = SPA_RESULT_NEED_BUFFER;
}
send_have_output(this);
pw_transport_add_event(impl->transport,
&SPA_EVENT_INIT(impl->core->type.event_transport.ProcessInput));
do_flush(this);
if (this->callbacks->need_input)
return SPA_RESULT_OK;
@ -795,45 +776,26 @@ static int spa_proxy_node_process_output(struct spa_node *node)
struct proxy *this;
struct impl *impl;
int i;
bool send_need = false, flush = false;
this = SPA_CONTAINER_OF(node, struct proxy, node);
impl = this->impl;
pw_log_trace("process output");
for (i = 0; i < MAX_OUTPUTS; i++) {
struct spa_port_io *io = this->out_ports[i].io, tmp;
struct spa_port_io *io = this->out_ports[i].io;
if (!io)
continue;
if (io->buffer_id != SPA_ID_INVALID) {
struct pw_event_transport_reuse_buffer rb =
PW_EVENT_TRANSPORT_REUSE_BUFFER_INIT(impl->core->type.event_transport.
ReuseBuffer, i, io->buffer_id);
spa_log_trace(this->log, "reuse buffer %d", io->buffer_id);
pw_transport_add_event(impl->transport, (struct spa_event *) &rb);
io->buffer_id = SPA_ID_INVALID;
flush = true;
}
tmp = impl->transport->outputs[i];
impl->transport->outputs[i] = *io;
pw_log_trace("%d %d %d %d", io->status, io->buffer_id, tmp.status, tmp.buffer_id);
if (io->status == SPA_RESULT_NEED_BUFFER)
send_need = true;
*io = tmp;
pw_log_trace("%d %d %d", io->status, io->buffer_id, io->status);
}
if (send_need)
send_need_input(this);
else if (flush)
do_flush(this);
pw_transport_add_event(impl->transport,
&SPA_EVENT_INIT(impl->core->type.event_transport.ProcessOutput));
do_flush(this);
return SPA_RESULT_HAVE_BUFFER;
return SPA_RESULT_OK;
}
static int handle_node_event(struct proxy *this, struct spa_event *event)

View file

@ -49,7 +49,6 @@ struct port {
struct spa_node *node;
};
static int port_impl_enum_formats(struct pw_port *port,
struct spa_format **format,
const struct spa_format *filter,
@ -313,7 +312,7 @@ static void complete_init(struct impl *impl)
{
struct pw_node *this = impl->this;
update_port_ids(impl);
pw_node_export(this);
pw_node_register(this);
}
static void on_node_done(struct spa_node *node, int seq, int res, void *user_data)

View file

@ -42,7 +42,7 @@ struct impl {
struct pw_listener on_need_input;
struct pw_listener on_have_output;
bool exported;
bool registered;
};
/** \endcond */
@ -270,12 +270,12 @@ do_node_add(struct spa_loop *loop,
}
void pw_node_export(struct pw_node *this)
void pw_node_register(struct pw_node *this)
{
struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
struct pw_core *core = this->core;
pw_log_debug("node %p: export", this);
pw_log_debug("node %p: register", this);
update_info(this);
@ -286,7 +286,7 @@ void pw_node_export(struct pw_node *this)
core->type.node, PW_VERSION_NODE,
node_bind_func, this);
impl->exported = true;
impl->registered = true;
pw_signal_emit(&this->initialized, this);
pw_node_update_state(this, PW_NODE_STATE_SUSPENDED, NULL);
@ -419,7 +419,7 @@ void pw_node_destroy(struct pw_node *node)
pw_loop_invoke(node->data_loop, do_node_remove, 1, 0, NULL, true, node);
if (impl->exported) {
if (impl->registered) {
spa_list_remove(&node->link);
pw_global_destroy(node->global);
node->global = NULL;

View file

@ -158,8 +158,8 @@ pw_node_new(struct pw_core *core, /**< the core */
struct pw_properties *properties, /**< extra properties */
size_t user_data_size /**< user data size */);
/** Complete initialization of the node */
void pw_node_export(struct pw_node *node);
/** Complete initialization of the node and register */
void pw_node_register(struct pw_node *node);
/** Destroy a node */
void pw_node_destroy(struct pw_node *node);

View file

@ -52,6 +52,7 @@ static int schedule_tee_input(struct spa_graph_node *node, void *user_data)
res = SPA_RESULT_NEED_BUFFER;
}
else {
pw_log_trace("tee input %d %d", io->status, io->buffer_id);
spa_list_for_each(p, &node->ports[SPA_DIRECTION_OUTPUT], link)
*p->io = *io;
io->status = SPA_RESULT_OK;
@ -96,6 +97,7 @@ static int schedule_mix_input(struct spa_graph_node *node, void *user_data)
struct spa_port_io *io = this->rt.mix_port.io;
spa_list_for_each(p, &node->ports[SPA_DIRECTION_INPUT], link) {
pw_log_trace("mix input %p %p->%p %d %d", p, p->io, io, p->io->status, p->io->buffer_id);
*io = *p->io;
p->io->status = SPA_RESULT_OK;
p->io->buffer_id = SPA_ID_INVALID;
@ -297,6 +299,8 @@ int pw_port_set_format(struct pw_port *port, uint32_t flags, const struct spa_fo
if (!SPA_RESULT_IS_ASYNC(res)) {
if (format == NULL) {
if (port->buffers)
free(port->buffers);
port->buffers = NULL;
port->n_buffers = 0;
if (port->allocated)
@ -334,6 +338,7 @@ int pw_port_set_param(struct pw_port *port, struct spa_param *param)
int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint32_t n_buffers)
{
int res;
size_t size;
if (n_buffers == 0 && port->state <= PW_PORT_STATE_READY)
return SPA_RESULT_OK;
@ -350,7 +355,11 @@ int pw_port_use_buffers(struct pw_port *port, struct spa_buffer **buffers, uint3
pw_log_debug("port %p: use %d buffers", port, n_buffers);
res = port->implementation->use_buffers(port, buffers, n_buffers);
port->buffers = buffers;
size = sizeof(struct spa_buffer *) * n_buffers;
if (port->buffers)
free(port->buffers);
port->buffers = size ? memcpy(malloc(size), buffers, size) : NULL;
port->n_buffers = n_buffers;
if (port->allocated)
pw_memblock_free(&port->buffer_mem);
@ -369,6 +378,7 @@ int pw_port_alloc_buffers(struct pw_port *port,
struct spa_buffer **buffers, uint32_t *n_buffers)
{
int res;
size_t size;
if (port->state < PW_PORT_STATE_READY)
return SPA_RESULT_NO_FORMAT;
@ -382,7 +392,11 @@ int pw_port_alloc_buffers(struct pw_port *port,
pw_log_debug("port %p: alloc %d buffers", port, *n_buffers);
res = port->implementation->alloc_buffers(port, params, n_params, buffers, n_buffers);
port->buffers = buffers;
size = sizeof(struct spa_buffer *) * *n_buffers;
if (port->buffers)
free(port->buffers);
port->buffers = size ? memcpy(malloc(size), buffers, size) : NULL;
port->n_buffers = *n_buffers;
port->allocated = true;

View file

@ -81,6 +81,7 @@ struct node_data {
struct trans_data {
struct spa_graph_port *in_ports;
struct spa_graph_port *out_ports;
/* memory for ports follows */
};
/** \endcond */
@ -375,10 +376,18 @@ static void handle_rtnode_event(struct pw_proxy *proxy, struct spa_event *event)
struct spa_graph_node *n = &data->node->rt.node;
int res;
if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.HaveOutput) {
res = n->methods->process_input(n, n->user_data);
if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessInput) {
struct spa_list ready;
struct spa_graph_port *port;
spa_list_init(&ready);
spa_list_for_each(port, &n->ports[SPA_DIRECTION_INPUT], link)
spa_list_insert(ready.prev, &port->peer->node->ready_link);
spa_graph_scheduler_chain(data->node->rt.sched, &ready);
}
else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.NeedInput) {
else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessOutput) {
res = n->methods->process_output(n, n->user_data);
}
else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ReuseBuffer) {
@ -450,6 +459,7 @@ static void client_node_transport(void *object, uint32_t node_id,
i,
0,
&data->trans->inputs[i]);
pw_log_info("transport in %d %p", i, &data->trans->inputs[i]);
}
spa_list_for_each(port, &data->node->input_ports, link)
spa_graph_port_add(&port->rt.mix_node, &t->in_ports[port->port_id]);
@ -460,6 +470,7 @@ static void client_node_transport(void *object, uint32_t node_id,
i,
0,
&data->trans->outputs[i]);
pw_log_info("transport out %d %p", i, &data->trans->inputs[i]);
}
spa_list_for_each(port, &data->node->output_ports, link)
spa_graph_port_add(&port->rt.mix_node, &t->out_ports[port->port_id]);

View file

@ -466,13 +466,13 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct pw_remote *remote = impl->this.remote;
if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.HaveOutput) {
if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessInput) {
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_port_io *input = &impl->trans->inputs[i];
pw_log_trace("stream %p: have output %d %d", stream, input->status,
pw_log_trace("stream %p: process input %d %d", stream, input->status,
input->buffer_id);
if (input->buffer_id == SPA_ID_INVALID)
continue;
@ -481,7 +481,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
input->buffer_id = SPA_ID_INVALID;
}
send_need_input(stream);
} else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.NeedInput) {
} else if (SPA_EVENT_TYPE(event) == remote->core->type.event_transport.ProcessOutput) {
int i;
for (i = 0; i < impl->trans->area->n_output_ports; i++) {
@ -493,7 +493,7 @@ static void handle_rtnode_event(struct pw_stream *stream, struct spa_event *even
reuse_buffer(stream, output->buffer_id);
output->buffer_id = SPA_ID_INVALID;
}
pw_log_trace("stream %p: need input", stream);
pw_log_trace("stream %p: process output", stream);
impl->in_need_buffer = true;
pw_signal_emit(&stream->need_buffer, stream);
impl->in_need_buffer = false;

View file

@ -94,14 +94,18 @@ pw_transport_parse_event(struct pw_transport *trans, void *event);
#define PW_TYPE_EVENT__Transport SPA_TYPE_EVENT_BASE "Transport"
#define PW_TYPE_EVENT_TRANSPORT_BASE PW_TYPE_EVENT__Transport ":"
#define PW_TYPE_EVENT_TRANSPORT__HaveOutput PW_TYPE_EVENT_TRANSPORT_BASE "HaveOutput"
#define PW_TYPE_EVENT_TRANSPORT__NeedInput PW_TYPE_EVENT_TRANSPORT_BASE "NeedInput"
#define PW_TYPE_EVENT_TRANSPORT__ReuseBuffer PW_TYPE_EVENT_TRANSPORT_BASE "ReuseBuffer"
#define PW_TYPE_EVENT_TRANSPORT__HaveOutput PW_TYPE_EVENT_TRANSPORT_BASE "HaveOutput"
#define PW_TYPE_EVENT_TRANSPORT__NeedInput PW_TYPE_EVENT_TRANSPORT_BASE "NeedInput"
#define PW_TYPE_EVENT_TRANSPORT__ReuseBuffer PW_TYPE_EVENT_TRANSPORT_BASE "ReuseBuffer"
#define PW_TYPE_EVENT_TRANSPORT__ProcessInput PW_TYPE_EVENT_TRANSPORT_BASE "ProcessInput"
#define PW_TYPE_EVENT_TRANSPORT__ProcessOutput PW_TYPE_EVENT_TRANSPORT_BASE "ProcessOutput"
struct pw_type_event_transport {
uint32_t HaveOutput;
uint32_t NeedInput;
uint32_t ReuseBuffer;
uint32_t ProcessInput;
uint32_t ProcessOutput;
};
static inline void
@ -111,6 +115,8 @@ pw_type_event_transport_map(struct spa_type_map *map, struct pw_type_event_trans
type->HaveOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__HaveOutput);
type->NeedInput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__NeedInput);
type->ReuseBuffer = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ReuseBuffer);
type->ProcessInput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ProcessInput);
type->ProcessOutput = spa_type_map_get_id(map, PW_TYPE_EVENT_TRANSPORT__ProcessOutput);
}
}