spa: improve draining

Make a new DRAINED status.
Place the DRAINED status on an input IO when a stream is out of
buffers and draining.
All nodes that don't have HAVE_DATA on the input io need to copy
it to the output io and return the status. This makes sure the
DRAINED is forwarded and nodes return DRAINED from _process()
DRAINED on the resampler flushes out the last queued samples and then
forwards the DRAINED in the next iteration.
Emit a new drained signal from the context when a node returns
DRAINED. Use this to trigger the drained signal in the stream.
This commit is contained in:
Wim Taymans 2020-04-07 17:58:43 +02:00
parent 029f431418
commit b18dacde9a
10 changed files with 46 additions and 42 deletions

View file

@ -77,6 +77,9 @@ enum spa_io_type {
* If status is SPA_STATUS_STOPPED, some error occured on the * If status is SPA_STATUS_STOPPED, some error occured on the
* port. * port.
* *
* If status is SPA_STATUS_DRAINED, data from the io area was
* used to drain.
*
* Status can also be a negative errno value to indicate errors. * Status can also be a negative errno value to indicate errors.
* such as: * such as:
* -EINVAL: buffer_id is invalid * -EINVAL: buffer_id is invalid
@ -87,6 +90,7 @@ struct spa_io_buffers {
#define SPA_STATUS_NEED_DATA (1<<0) #define SPA_STATUS_NEED_DATA (1<<0)
#define SPA_STATUS_HAVE_DATA (1<<1) #define SPA_STATUS_HAVE_DATA (1<<1)
#define SPA_STATUS_STOPPED (1<<2) #define SPA_STATUS_STOPPED (1<<2)
#define SPA_STATUS_DRAINED (1<<3)
int32_t status; /**< the status code */ int32_t status; /**< the status code */
uint32_t buffer_id; /**< a buffer id */ uint32_t buffer_id; /**< a buffer id */
}; };

View file

@ -865,7 +865,7 @@ impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
static int impl_node_process(void *object) static int impl_node_process(void *object)
{ {
struct impl *this = object; struct impl *this = object;
int status; int status = 0;
spa_log_trace_fp(this->log, "%p: process convert:%u master:%d", spa_log_trace_fp(this->log, "%p: process convert:%u master:%d",
this, this->use_converter, this->master); this, this->use_converter, this->master);
@ -875,22 +875,17 @@ static int impl_node_process(void *object)
status = spa_node_process(this->convert); status = spa_node_process(this->convert);
} }
status = spa_node_process(this->follower); if (status >= 0)
status = spa_node_process(this->follower);
if (this->direction == SPA_DIRECTION_OUTPUT && if (this->direction == SPA_DIRECTION_OUTPUT &&
!this->master && this->use_converter) { !this->master && this->use_converter) {
while (true) { while (status >= 0) {
status = spa_node_process(this->convert); status = spa_node_process(this->convert);
if (status & SPA_STATUS_HAVE_DATA) if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED))
break; break;
if (status & SPA_STATUS_NEED_DATA)
if (status & SPA_STATUS_NEED_DATA) {
status = spa_node_process(this->follower); status = spa_node_process(this->follower);
if (!(status & SPA_STATUS_HAVE_DATA)) {
spa_node_call_xrun(&this->callbacks, 0, 0, NULL);
break;
}
}
} }
} }
spa_log_trace_fp(this->log, "%p: process status:%d", this, status); spa_log_trace_fp(this->log, "%p: process status:%d", this, status);

View file

@ -1084,7 +1084,7 @@ static int impl_node_process(void *object)
if (SPA_UNLIKELY(i == 0)) if (SPA_UNLIKELY(i == 0))
res |= r & SPA_STATUS_NEED_DATA; res |= r & SPA_STATUS_NEED_DATA;
if (SPA_UNLIKELY(i == this->n_nodes-1)) if (SPA_UNLIKELY(i == this->n_nodes-1))
res |= r & SPA_STATUS_HAVE_DATA; res |= r & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED);
} }
if (res & SPA_STATUS_HAVE_DATA) if (res & SPA_STATUS_HAVE_DATA)
break; break;

View file

@ -864,15 +864,13 @@ static int impl_node_process(void *object)
if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA))
return SPA_STATUS_HAVE_DATA; return SPA_STATUS_HAVE_DATA;
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
return SPA_STATUS_NEED_DATA;
/* recycle */ /* recycle */
if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) {
recycle_buffer(this, outio->buffer_id); recycle_buffer(this, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID; outio->buffer_id = SPA_ID_INVALID;
} }
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
return outio->status = inio->status;
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL; return inio->status = -EINVAL;

View file

@ -844,7 +844,8 @@ static int impl_node_process(void *object)
outio->buffer_id = SPA_ID_INVALID; outio->buffer_id = SPA_ID_INVALID;
} }
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
return SPA_STATUS_NEED_DATA; return outio->status = inio->status;
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL; return inio->status = -EINVAL;

View file

@ -118,6 +118,7 @@ struct impl {
int mode; int mode;
unsigned int started:1; unsigned int started:1;
unsigned int peaks:1; unsigned int peaks:1;
unsigned int drained:1;
struct resample resample; struct resample resample;
}; };
@ -723,6 +724,7 @@ static int impl_node_process(void *object)
void **dst_datas; void **dst_datas;
bool flush_out = false; bool flush_out = false;
bool flush_in = false; bool flush_in = false;
bool draining = false;
spa_return_val_if_fail(this != NULL, -EINVAL); spa_return_val_if_fail(this != NULL, -EINVAL);
@ -741,15 +743,18 @@ static int impl_node_process(void *object)
if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA))
return SPA_STATUS_HAVE_DATA; return SPA_STATUS_HAVE_DATA;
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
return SPA_STATUS_NEED_DATA;
/* recycle */ /* recycle */
if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) {
recycle_buffer(this, outio->buffer_id); recycle_buffer(this, outio->buffer_id);
outio->buffer_id = SPA_ID_INVALID; outio->buffer_id = SPA_ID_INVALID;
} }
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) {
if (inio->status != SPA_STATUS_DRAINED || this->drained)
return outio->status = inio->status;
inio->buffer_id = 0;
inport->buffers[0].outbuf->datas[0].chunk->size = 0;
}
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL; return inio->status = -EINVAL;
@ -784,7 +789,7 @@ static int impl_node_process(void *object)
size = sb->datas[0].maxsize; size = sb->datas[0].maxsize;
memset(sb->datas[0].data, 0, size); memset(sb->datas[0].data, 0, size);
inport->offset = 0; inport->offset = 0;
flush_in = true; flush_in = draining = true;
} }
if (this->io_rate_match) { if (this->io_rate_match) {
@ -829,18 +834,19 @@ static int impl_node_process(void *object)
if (inport->offset >= size || flush_in) { if (inport->offset >= size || flush_in) {
inio->status = SPA_STATUS_NEED_DATA; inio->status = SPA_STATUS_NEED_DATA;
inport->offset = 0; inport->offset = 0;
SPA_FLAG_SET(res, SPA_STATUS_NEED_DATA); SPA_FLAG_SET(res, inio->status);
spa_log_trace_fp(this->log, NAME " %p: return input buffer", this); spa_log_trace_fp(this->log, NAME " %p: return input buffer of size %d", this, size);
} }
outport->offset += out_len * sizeof(float); outport->offset += out_len * sizeof(float);
if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) { if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) {
outio->status = SPA_STATUS_HAVE_DATA; outio->status = SPA_STATUS_HAVE_DATA;
outio->buffer_id = dbuf->id; outio->buffer_id = dbuf->id;
spa_log_trace_fp(this->log, NAME " %p: have output buffer of size %d", this, outport->offset);
dequeue_buffer(this, dbuf); dequeue_buffer(this, dbuf);
outport->offset = 0; outport->offset = 0;
this->drained = draining;
SPA_FLAG_SET(res, SPA_STATUS_HAVE_DATA); SPA_FLAG_SET(res, SPA_STATUS_HAVE_DATA);
spa_log_trace_fp(this->log, NAME " %p: have output buffer", this);
} }
if (out_len == 0 && this->peaks) { if (out_len == 0 && this->peaks) {
outio->status = SPA_STATUS_HAVE_DATA; outio->status = SPA_STATUS_HAVE_DATA;

View file

@ -860,7 +860,7 @@ static int impl_node_process(void *object)
inio, inio->status, inio->buffer_id); inio, inio->status, inio->buffer_id);
if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA))
return SPA_STATUS_NEED_DATA; return inio->status;
if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers))
return inio->status = -EINVAL; return inio->status = -EINVAL;

View file

@ -906,6 +906,9 @@ static inline int process_node(void *data)
} else { } else {
resume_node(this, status); resume_node(this, status);
} }
if (status & SPA_STATUS_DRAINED) {
pw_context_driver_emit_drained(this->context, this);
}
return 0; return 0;
} }

View file

@ -235,6 +235,7 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq,
#define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n) #define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n)
#define pw_context_driver_emit_incomplete(c,n) pw_context_driver_emit(c, incomplete, 0, n) #define pw_context_driver_emit_incomplete(c,n) pw_context_driver_emit(c, incomplete, 0, n)
#define pw_context_driver_emit_timeout(c,n) pw_context_driver_emit(c, timeout, 0, n) #define pw_context_driver_emit_timeout(c,n) pw_context_driver_emit(c, timeout, 0, n)
#define pw_context_driver_emit_drained(c,n) pw_context_driver_emit(c, drained, 0, n)
struct pw_context_driver_events { struct pw_context_driver_events {
#define PW_VERSION_CONTEXT_DRIVER_EVENTS 0 #define PW_VERSION_CONTEXT_DRIVER_EVENTS 0
@ -248,6 +249,8 @@ struct pw_context_driver_events {
void (*incomplete) (void *data, struct pw_impl_node *node); void (*incomplete) (void *data, struct pw_impl_node *node);
/** The driver got a sync timeout */ /** The driver got a sync timeout */
void (*timeout) (void *data, struct pw_impl_node *node); void (*timeout) (void *data, struct pw_impl_node *node);
/** a node drained */
void (*drained) (void *data, struct pw_impl_node *node);
}; };
#define pw_registry_resource(r,m,v,...) pw_resource_call(r, struct pw_registry_events,m,v,##__VA_ARGS__) #define pw_registry_resource(r,m,v,...) pw_resource_call(r, struct pw_registry_events,m,v,##__VA_ARGS__)

View file

@ -331,7 +331,6 @@ do_call_drained(struct spa_loop *loop,
struct pw_stream *stream = &impl->this; struct pw_stream *stream = &impl->this;
pw_log_trace(NAME" %p: drained", stream); pw_log_trace(NAME" %p: drained", stream);
pw_stream_emit_drained(stream); pw_stream_emit_drained(stream);
impl->draining = false;
return 0; return 0;
} }
@ -765,8 +764,7 @@ again:
pw_log_trace(NAME" %p: process out status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64, stream, pw_log_trace(NAME" %p: process out status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64, stream,
io->status, io->buffer_id, impl->time.ticks, impl->time.delay); io->status, io->buffer_id, impl->time.ticks, impl->time.delay);
res = 0; if ((res = io->status) != SPA_STATUS_HAVE_DATA) {
if (io->status != SPA_STATUS_HAVE_DATA) {
/* recycle old buffer */ /* recycle old buffer */
if ((b = get_buffer(stream, io->buffer_id)) != NULL) { if ((b = get_buffer(stream, io->buffer_id)) != NULL) {
pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id);
@ -776,20 +774,17 @@ again:
/* pop new buffer */ /* pop new buffer */
if ((b = pop_queue(impl, &impl->queued)) != NULL) { if ((b = pop_queue(impl, &impl->queued)) != NULL) {
io->buffer_id = b->id; io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_DATA; res = io->status = SPA_STATUS_HAVE_DATA;
pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io); pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io);
} else if (impl->draining) {
impl->drained = true;
io->buffer_id = SPA_ID_INVALID;
res = io->status = SPA_STATUS_DRAINED;
pw_log_trace(NAME" %p: draining", stream);
} else { } else {
io->buffer_id = SPA_ID_INVALID; io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_NEED_DATA; res = io->status = SPA_STATUS_NEED_DATA;
pw_log_trace(NAME" %p: no more buffers %p", stream, io); pw_log_trace(NAME" %p: no more buffers %p", stream, io);
if (impl->draining && !impl->drained) {
b = pop_queue(impl, &impl->dequeued);
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_DATA;
b->this.buffer->datas[0].chunk->size = 0;
pw_log_trace(NAME" %p: drain buffer %d", stream, b->id);
impl->drained = true;
}
} }
} }
@ -803,7 +798,6 @@ again:
} }
copy_position(impl, impl->queued.outcount); copy_position(impl, impl->queued.outcount);
res = io->status;
pw_log_trace(NAME" %p: res %d", stream, res); pw_log_trace(NAME" %p: res %d", stream, res);
return res; return res;
@ -1044,7 +1038,7 @@ static const struct pw_core_events core_events = {
.error = on_core_error, .error = on_core_error,
}; };
static void context_xrun(void *data, struct pw_impl_node *node) static void context_drained(void *data, struct pw_impl_node *node)
{ {
struct stream *impl = data; struct stream *impl = data;
if (impl->node != node) if (impl->node != node)
@ -1055,7 +1049,7 @@ static void context_xrun(void *data, struct pw_impl_node *node)
static const struct pw_context_driver_events context_events = { static const struct pw_context_driver_events context_events = {
PW_VERSION_CONTEXT_DRIVER_EVENTS, PW_VERSION_CONTEXT_DRIVER_EVENTS,
.xrun = context_xrun, .drained = context_drained,
}; };
static struct stream * static struct stream *