graph: new scheduling model

Make explicit links between elements that are used to activate the
next element in the graph.
Make subgraphs a special regular node. Make a link from the
subgraph children to the parent so that the subgraph completes when
all the children completed.
Implement a single process function in plugins
Remove many messages in the client node
This commit is contained in:
Wim Taymans 2018-03-20 11:37:11 +01:00
parent 9b0a880afb
commit 33a322b96e
36 changed files with 401 additions and 750 deletions

View file

@ -211,7 +211,7 @@ static int impl_node_set_param(struct spa_node *node, uint32_t id, uint32_t flag
static void set_timer(struct impl *this, bool enabled)
{
if ((this->callbacks && this->callbacks->need_input) || this->props.live) {
if ((this->callbacks && this->callbacks->process) || this->props.live) {
if (enabled) {
if (this->props.live) {
uint64_t next_time = this->start_time + this->elapsed_time;
@ -233,7 +233,7 @@ static inline void read_timer(struct impl *this)
{
uint64_t expirations;
if ((this->callbacks && this->callbacks->need_input) || this->props.live) {
if ((this->callbacks && this->callbacks->process) || this->props.live) {
if (read(this->timer_source.fd, &expirations, sizeof(uint64_t)) != sizeof(uint64_t))
perror("read timerfd");
}
@ -253,8 +253,8 @@ static int consume_buffer(struct impl *this)
if (spa_list_is_empty(&this->ready)) {
io->status = SPA_STATUS_NEED_BUFFER;
if (this->callbacks->need_input)
this->callbacks->need_input(this->callbacks_data);
if (this->callbacks->process)
this->callbacks->process(this->callbacks_data, SPA_STATUS_NEED_BUFFER);
}
if (spa_list_is_empty(&this->ready)) {
spa_log_error(this->log, NAME " %p: no buffers", this);
@ -356,7 +356,7 @@ impl_node_set_callbacks(struct spa_node *node,
this = SPA_CONTAINER_OF(node, struct impl, node);
if (this->data_loop == NULL && callbacks != NULL && callbacks->need_input != NULL) {
if (this->data_loop == NULL && callbacks != NULL && callbacks->process != NULL) {
spa_log_error(this->log, "a data_loop is needed for async operation");
return -EINVAL;
}
@ -703,7 +703,7 @@ impl_node_port_send_command(struct spa_node *node,
return -ENOTSUP;
}
static int impl_node_process_input(struct spa_node *node)
static int impl_node_process(struct spa_node *node)
{
struct impl *this;
struct spa_io_buffers *input;
@ -732,17 +732,12 @@ static int impl_node_process_input(struct spa_node *node)
input->buffer_id = SPA_ID_INVALID;
input->status = SPA_STATUS_OK;
}
if (this->callbacks == NULL || this->callbacks->need_input == NULL)
if (this->callbacks == NULL || this->callbacks->process == NULL)
return consume_buffer(this);
else
return SPA_STATUS_OK;
}
static int impl_node_process_output(struct spa_node *node)
{
return -ENOTSUP;
}
static const struct spa_node impl_node = {
SPA_VERSION_NODE,
NULL,
@ -762,8 +757,7 @@ static const struct spa_node impl_node = {
impl_node_port_set_io,
impl_node_port_reuse_buffer,
impl_node_port_send_command,
impl_node_process_input,
impl_node_process_output,
impl_node_process,
};
static int impl_clock_enum_params(struct spa_clock *clock, uint32_t id, uint32_t *index,

View file

@ -230,7 +230,7 @@ static int fill_buffer(struct impl *this, struct buffer *b)
static void set_timer(struct impl *this, bool enabled)
{
if ((this->callbacks && this->callbacks->have_output) || this->props.live) {
if ((this->callbacks && this->callbacks->process) || this->props.live) {
if (enabled) {
if (this->props.live) {
uint64_t next_time = this->start_time + this->elapsed_time;
@ -252,7 +252,7 @@ static inline void read_timer(struct impl *this)
{
uint64_t expirations;
if ((this->callbacks && this->callbacks->have_output) || this->props.live) {
if ((this->callbacks && this->callbacks->process) || this->props.live) {
if (read(this->timer_source.fd, &expirations, sizeof(uint64_t)) != sizeof(uint64_t))
perror("read timerfd");
}
@ -309,8 +309,8 @@ static void on_output(struct spa_source *source)
res = make_buffer(this);
if (res == SPA_STATUS_HAVE_BUFFER && this->callbacks && this->callbacks->have_output)
this->callbacks->have_output(this->callbacks_data);
if (res == SPA_STATUS_HAVE_BUFFER && this->callbacks && this->callbacks->process)
this->callbacks->process(this->callbacks_data, res);
}
static int impl_node_send_command(struct spa_node *node, const struct spa_command *command)
@ -372,7 +372,7 @@ impl_node_set_callbacks(struct spa_node *node,
this = SPA_CONTAINER_OF(node, struct impl, node);
if (this->data_loop == NULL && (callbacks != NULL && callbacks->have_output != NULL)) {
if (this->data_loop == NULL && (callbacks != NULL && callbacks->process != NULL)) {
spa_log_error(this->log, "a data_loop is needed for async operation");
return -EINVAL;
}
@ -747,12 +747,7 @@ impl_node_port_send_command(struct spa_node *node,
return -ENOTSUP;
}
static int impl_node_process_input(struct spa_node *node)
{
return -ENOTSUP;
}
static int impl_node_process_output(struct spa_node *node)
static int impl_node_process(struct spa_node *node)
{
struct impl *this;
struct spa_io_buffers *io;
@ -771,7 +766,7 @@ static int impl_node_process_output(struct spa_node *node)
this->io->buffer_id = SPA_ID_INVALID;
}
if ((this->callbacks == NULL || this->callbacks->have_output == NULL) &&
if ((this->callbacks == NULL || this->callbacks->process == NULL) &&
(io->status == SPA_STATUS_NEED_BUFFER))
return make_buffer(this);
else
@ -797,8 +792,7 @@ static const struct spa_node impl_node = {
impl_node_port_set_io,
impl_node_port_reuse_buffer,
impl_node_port_send_command,
impl_node_process_input,
impl_node_process_output,
impl_node_process,
};
static int impl_clock_enum_params(struct spa_clock *clock, uint32_t id, uint32_t *index,