stream: prepare output before signaling ready

Just prepare the output on the port and signal ready. When the graph
completes we will be signaled again to recycle the buffer and
prepare more output if we can.

Improve the bookkeeping a little when activating nodes.

Fix race with moving nodes between drivers.
This commit is contained in:
Wim Taymans 2019-03-07 23:01:32 +01:00
parent 94ce6d528c
commit 1d3ce5a9d9
5 changed files with 112 additions and 74 deletions

View file

@ -980,14 +980,13 @@ static int impl_node_process(struct spa_node *node)
struct impl *impl = this->impl;
struct pw_node *n = impl->this.node;
struct timespec ts;
uint64_t cmd = 1, nsec;
uint64_t cmd = 1;
spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node);
clock_gettime(CLOCK_MONOTONIC, &ts);
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
n->rt.activation->status = TRIGGERED;
n->rt.activation->signal_time = nsec;
n->rt.activation->signal_time = SPA_TIMESPEC_TO_NSEC(&ts);
if (write(this->writefd, &cmd, sizeof(cmd)) != sizeof(cmd))
spa_log_warn(this->log, "node %p: error %m", this);

View file

@ -835,8 +835,10 @@ static int impl_node_process(struct spa_node *node)
struct spa_io_position *q = impl->this.node->driver_node->rt.position;
int status, trigger;
if (impl->driver)
if (impl->driver) {
impl->client_node->node->rt.target.signal(impl->client_node->node->rt.target.data);
return SPA_STATUS_OK;
}
if (!impl->active)
return SPA_STATUS_HAVE_BUFFER;

View file

@ -27,6 +27,7 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
#include <time.h>
#include <sys/mman.h>
#include <spa/pod/parser.h>
@ -950,7 +951,14 @@ static int link_signal_func(void *user_data)
{
struct link *link = user_data;
uint64_t cmd = 1;
struct timespec ts;
pw_log_trace("link %p: signal", link);
clock_gettime(CLOCK_MONOTONIC, &ts);
link->target.activation->status = TRIGGERED;
link->target.activation->signal_time = SPA_TIMESPEC_TO_NSEC(&ts);
if (write(link->signalfd, &cmd, sizeof(cmd)) != sizeof(cmd))
pw_log_warn("link %p: write failed %m", link);
return 0;
@ -1176,14 +1184,21 @@ static int node_ready(void *d, int status)
{
struct node_data *data = d;
struct pw_node *node = data->node;
struct timespec ts;
struct pw_port *p;
uint64_t cmd = 1;
pw_log_trace("node %p: ready driver:%d exported:%d status:%d", node,
node->driver, node->exported, status);
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
if (status == SPA_STATUS_HAVE_BUFFER) {
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
}
clock_gettime(CLOCK_MONOTONIC, &ts);
node->rt.activation->status = TRIGGERED;
node->rt.activation->signal_time = SPA_TIMESPEC_TO_NSEC(&ts);
if (write(data->rtwritefd, &cmd, sizeof(cmd)) != sizeof(cmd))
pw_log_warn("node %p: write failed %m", node);

View file

@ -84,32 +84,27 @@ static void node_deactivate(struct pw_node *this)
static void add_node(struct pw_node *this, struct pw_node *driver)
{
if (this->rt.driver_target.activation == NULL) {
pw_log_trace("node %p: add to driver %p", this, driver);
/* signal the driver */
this->rt.driver_target.activation = driver->rt.activation;
this->rt.driver_target.node = driver;
this->rt.driver_target.data = driver;
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
this->rt.driver_target.activation->state[0].required++;
pw_log_trace("node %p: add to driver %p", this, driver);
/* signal the driver */
this->rt.driver_target.activation = driver->rt.activation;
this->rt.driver_target.node = driver;
this->rt.driver_target.data = driver;
spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
this->rt.driver_target.activation->state[0].required++;
spa_list_append(&driver->rt.target_list, &this->rt.target.link);
this->rt.activation->state[0].required++;
}
spa_list_append(&driver->rt.target_list, &this->rt.target.link);
this->rt.activation->state[0].required++;
}
static void remove_node(struct pw_node *this)
{
if (this->rt.driver_target.activation != NULL) {
pw_log_trace("node %p: remove from driver %p", this,
pw_log_trace("node %p: remove from driver %p", this,
this->rt.driver_target.data);
spa_list_remove(&this->rt.driver_target.link);
this->rt.driver_target.activation->state[0].required--;
this->rt.driver_target.activation = NULL;
spa_list_remove(&this->rt.driver_target.link);
this->rt.driver_target.activation->state[0].required--;
spa_list_remove(&this->rt.target.link);
this->rt.activation->state[0].required--;
}
spa_list_remove(&this->rt.target.link);
this->rt.activation->state[0].required--;
}
static int
@ -117,9 +112,10 @@ do_node_remove(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct pw_node *this = user_data;
if (this->source.loop != NULL)
if (this->source.loop != NULL) {
spa_loop_remove_source(loop, &this->source);
remove_node(this);
remove_node(this);
}
return 0;
}
@ -150,9 +146,10 @@ do_node_add(struct spa_loop *loop,
struct pw_node *this = user_data;
struct pw_node *driver = this->driver_node;
if (this->source.loop == NULL)
if (this->source.loop == NULL) {
spa_loop_add_source(loop, &this->source);
add_node(this, driver);
add_node(this, driver);
}
return 0;
}
@ -468,13 +465,17 @@ do_move_nodes(struct spa_loop *loop,
pw_log_trace("node %p: driver:%p->%p", this, this, driver);
remove_node(this);
add_node(this, driver);
if (this->source.loop != NULL) {
remove_node(this);
add_node(this, driver);
}
spa_list_for_each_safe(n, t, &this->rt.target_list, link) {
struct pw_node *node = n->node;
remove_node(node);
add_node(node, driver);
if (node && node->source.loop != NULL) {
remove_node(node);
add_node(node, driver);
}
}
return 0;
}
@ -581,11 +582,41 @@ static void check_properties(struct pw_node *node)
}
static int pw_node_trigger(struct pw_node *this, uint64_t nsec)
static void dump_states(struct pw_node *driver)
{
struct pw_node_target *t;
spa_debug("node %p: trigger %"PRIu64, this, nsec);
spa_list_for_each(t, &driver->rt.target_list, link) {
struct pw_node_activation *a = t->activation;
pw_log_trace("node %p: required:%d waiting:%"PRIu64
" process:%"PRIu64" status:%d",
t->node, a->state[0].required,
a->awake_time - a->signal_time,
a->finish_time - a->awake_time,
t->activation->status);
}
}
static inline int resume_node(struct pw_node *this, int status)
{
struct pw_port *p;
struct pw_node_target *t;
struct timespec ts;
struct pw_node_activation *activation = this->rt.activation;
uint64_t nsec;
if (status & SPA_STATUS_HAVE_BUFFER) {
spa_list_for_each(p, &this->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
}
clock_gettime(CLOCK_MONOTONIC, &ts);
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
activation->status = FINISHED;
activation->finish_time = nsec;
pw_log_trace("node %p: trigger peers", this);
spa_list_for_each(t, &this->rt.target_list, link) {
struct pw_node_activation_state *state;
@ -609,37 +640,34 @@ static inline int process_node(void *data)
struct pw_node *this = data;
struct timespec ts;
struct pw_port *p;
uint64_t nsec;
struct pw_node_activation *activation = this->rt.activation;
int status;
pw_log_trace("node %p: process", this);
clock_gettime(CLOCK_MONOTONIC, &ts);
this->rt.activation->status = AWAKE;
this->rt.activation->awake_time = SPA_TIMESPEC_TO_NSEC(&ts);
activation->status = AWAKE;
activation->awake_time = SPA_TIMESPEC_TO_NSEC(&ts);
spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
spa_node_process(p->mix);
spa_node_process(this->node);
status = spa_node_process(this->node);
activation->state[0].status = status;
spa_list_for_each(p, &this->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
clock_gettime(CLOCK_MONOTONIC, &ts);
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
this->rt.activation->status = FINISHED;
this->rt.activation->finish_time = nsec;
pw_log_trace("node %p: finish waiting:%"PRIu64" processing:%"PRIu64, this,
this->rt.activation->awake_time - this->rt.activation->signal_time,
this->rt.activation->finish_time - this->rt.activation->awake_time);
if (this != this->driver_node || this->exported) {
pw_node_trigger(this, nsec);
} else {
if (this == this->driver_node && !this->exported) {
clock_gettime(CLOCK_MONOTONIC, &ts);
activation->status = FINISHED;
activation->finish_time = SPA_TIMESPEC_TO_NSEC(&ts);
pw_log_trace("node %p: graph completed", this);
dump_states(this);
} else if (status == SPA_STATUS_OK) {
pw_log_trace("node %p: async continue", this);
} else {
resume_node(this, status);
}
return 0;
}
static void node_on_fd_events(struct spa_source *source)
@ -921,30 +949,18 @@ static int node_ready(void *data, int status)
{
struct pw_node *node = data;
struct pw_node *driver = node->driver_node;
struct pw_port *p;
struct pw_node_target *t;
struct timespec ts;
uint64_t nsec;
pw_log_trace("node %p: ready driver:%d exported:%d %p status:%d", node,
node->driver, node->exported, driver, status);
spa_list_for_each(t, &driver->rt.target_list, link) {
pw_node_activation_state_reset(&t->activation->state[0]);
t->activation->status = NOT_TRIGGERED;
if (node == driver) {
spa_list_for_each(t, &driver->rt.target_list, link) {
pw_node_activation_state_reset(&t->activation->state[0]);
t->activation->status = NOT_TRIGGERED;
}
}
if (status == SPA_STATUS_HAVE_BUFFER) {
spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
spa_node_process(p->mix);
}
clock_gettime(CLOCK_MONOTONIC, &ts);
nsec = SPA_TIMESPEC_TO_NSEC(&ts);
driver->rt.activation->status = TRIGGERED;
driver->rt.activation->signal_time = nsec;
return pw_node_trigger(driver, nsec);
return resume_node(node, status);
}
static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)

View file

@ -1337,10 +1337,16 @@ do_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
struct buffer *b;
struct spa_io_buffers *io = impl->io;
int res;
res = impl_node_process_output(&impl->impl_node);
return impl->callbacks->ready(impl->callbacks_data, res);
if ((b = pop_queue(impl, &impl->queued)) != NULL) {
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: pop %d %p", impl, b->id, io);
}
return impl->callbacks->ready(impl->callbacks_data, io->status);
}
static inline int call_trigger(struct stream *impl)