From b18dacde9aee94f5e784a2465363f662c9683135 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 7 Apr 2020 17:58:43 +0200 Subject: [PATCH] spa: improve draining Make a new DRAINED status. Place the DRAINED status on an input IO when a stream is out of buffers and draining. All nodes that don't have HAVE_DATA on the input io need to copy it to the output io and return the status. This makes sure the DRAINED is forwarded and nodes return DRAINED from _process() DRAINED on the resampler flushes out the last queued samples and then forwards the DRAINED in the next iteration. Emit a new drained signal from the context when a node returns DRAINED. Use this to trigger the drained signal in the stream. --- spa/include/spa/node/io.h | 4 ++++ spa/plugins/audioconvert/audioadapter.c | 17 ++++++---------- spa/plugins/audioconvert/audioconvert.c | 2 +- spa/plugins/audioconvert/channelmix.c | 6 ++---- spa/plugins/audioconvert/fmtconvert.c | 3 ++- spa/plugins/audioconvert/resample.c | 22 +++++++++++++-------- spa/plugins/audioconvert/splitter.c | 2 +- src/pipewire/impl-node.c | 3 +++ src/pipewire/private.h | 3 +++ src/pipewire/stream.c | 26 ++++++++++--------------- 10 files changed, 46 insertions(+), 42 deletions(-) diff --git a/spa/include/spa/node/io.h b/spa/include/spa/node/io.h index bc780f882..eebc62a9e 100644 --- a/spa/include/spa/node/io.h +++ b/spa/include/spa/node/io.h @@ -77,6 +77,9 @@ enum spa_io_type { * If status is SPA_STATUS_STOPPED, some error occured on the * port. * + * If status is SPA_STATUS_DRAINED, data from the io area was + * used to drain. + * * Status can also be a negative errno value to indicate errors. * such as: * -EINVAL: buffer_id is invalid @@ -87,6 +90,7 @@ struct spa_io_buffers { #define SPA_STATUS_NEED_DATA (1<<0) #define SPA_STATUS_HAVE_DATA (1<<1) #define SPA_STATUS_STOPPED (1<<2) +#define SPA_STATUS_DRAINED (1<<3) int32_t status; /**< the status code */ uint32_t buffer_id; /**< a buffer id */ }; diff --git a/spa/plugins/audioconvert/audioadapter.c b/spa/plugins/audioconvert/audioadapter.c index 21f61068a..851cd3895 100644 --- a/spa/plugins/audioconvert/audioadapter.c +++ b/spa/plugins/audioconvert/audioadapter.c @@ -865,7 +865,7 @@ impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id) static int impl_node_process(void *object) { struct impl *this = object; - int status; + int status = 0; spa_log_trace_fp(this->log, "%p: process convert:%u master:%d", this, this->use_converter, this->master); @@ -875,22 +875,17 @@ static int impl_node_process(void *object) status = spa_node_process(this->convert); } - status = spa_node_process(this->follower); + if (status >= 0) + status = spa_node_process(this->follower); if (this->direction == SPA_DIRECTION_OUTPUT && !this->master && this->use_converter) { - while (true) { + while (status >= 0) { status = spa_node_process(this->convert); - if (status & SPA_STATUS_HAVE_DATA) + if (status & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED)) break; - - if (status & SPA_STATUS_NEED_DATA) { + if (status & SPA_STATUS_NEED_DATA) status = spa_node_process(this->follower); - if (!(status & SPA_STATUS_HAVE_DATA)) { - spa_node_call_xrun(&this->callbacks, 0, 0, NULL); - break; - } - } } } spa_log_trace_fp(this->log, "%p: process status:%d", this, status); diff --git a/spa/plugins/audioconvert/audioconvert.c b/spa/plugins/audioconvert/audioconvert.c index c308ab7ee..08a6c03e8 100644 --- a/spa/plugins/audioconvert/audioconvert.c +++ b/spa/plugins/audioconvert/audioconvert.c @@ -1084,7 +1084,7 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(i == 0)) res |= r & SPA_STATUS_NEED_DATA; if (SPA_UNLIKELY(i == this->n_nodes-1)) - res |= r & SPA_STATUS_HAVE_DATA; + res |= r & (SPA_STATUS_HAVE_DATA | SPA_STATUS_DRAINED); } if (res & SPA_STATUS_HAVE_DATA) break; diff --git a/spa/plugins/audioconvert/channelmix.c b/spa/plugins/audioconvert/channelmix.c index 651928a04..2132af007 100644 --- a/spa/plugins/audioconvert/channelmix.c +++ b/spa/plugins/audioconvert/channelmix.c @@ -864,15 +864,13 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) return SPA_STATUS_HAVE_DATA; - - if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; - /* recycle */ if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { recycle_buffer(this, outio->buffer_id); outio->buffer_id = SPA_ID_INVALID; } + if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) + return outio->status = inio->status; if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; diff --git a/spa/plugins/audioconvert/fmtconvert.c b/spa/plugins/audioconvert/fmtconvert.c index 4fc3e0c66..5c58b8df7 100644 --- a/spa/plugins/audioconvert/fmtconvert.c +++ b/spa/plugins/audioconvert/fmtconvert.c @@ -844,7 +844,8 @@ static int impl_node_process(void *object) outio->buffer_id = SPA_ID_INVALID; } if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; + return outio->status = inio->status; + if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; diff --git a/spa/plugins/audioconvert/resample.c b/spa/plugins/audioconvert/resample.c index 68aed53b6..dd615fde0 100644 --- a/spa/plugins/audioconvert/resample.c +++ b/spa/plugins/audioconvert/resample.c @@ -118,6 +118,7 @@ struct impl { int mode; unsigned int started:1; unsigned int peaks:1; + unsigned int drained:1; struct resample resample; }; @@ -723,6 +724,7 @@ static int impl_node_process(void *object) void **dst_datas; bool flush_out = false; bool flush_in = false; + bool draining = false; spa_return_val_if_fail(this != NULL, -EINVAL); @@ -741,15 +743,18 @@ static int impl_node_process(void *object) if (SPA_UNLIKELY(outio->status == SPA_STATUS_HAVE_DATA)) return SPA_STATUS_HAVE_DATA; - - if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; - /* recycle */ if (SPA_LIKELY(outio->buffer_id < outport->n_buffers)) { recycle_buffer(this, outio->buffer_id); outio->buffer_id = SPA_ID_INVALID; } + if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) { + if (inio->status != SPA_STATUS_DRAINED || this->drained) + return outio->status = inio->status; + + inio->buffer_id = 0; + inport->buffers[0].outbuf->datas[0].chunk->size = 0; + } if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; @@ -784,7 +789,7 @@ static int impl_node_process(void *object) size = sb->datas[0].maxsize; memset(sb->datas[0].data, 0, size); inport->offset = 0; - flush_in = true; + flush_in = draining = true; } if (this->io_rate_match) { @@ -829,18 +834,19 @@ static int impl_node_process(void *object) if (inport->offset >= size || flush_in) { inio->status = SPA_STATUS_NEED_DATA; inport->offset = 0; - SPA_FLAG_SET(res, SPA_STATUS_NEED_DATA); - spa_log_trace_fp(this->log, NAME " %p: return input buffer", this); + SPA_FLAG_SET(res, inio->status); + spa_log_trace_fp(this->log, NAME " %p: return input buffer of size %d", this, size); } outport->offset += out_len * sizeof(float); if (outport->offset > 0 && (outport->offset >= maxsize || flush_out)) { outio->status = SPA_STATUS_HAVE_DATA; outio->buffer_id = dbuf->id; + spa_log_trace_fp(this->log, NAME " %p: have output buffer of size %d", this, outport->offset); dequeue_buffer(this, dbuf); outport->offset = 0; + this->drained = draining; SPA_FLAG_SET(res, SPA_STATUS_HAVE_DATA); - spa_log_trace_fp(this->log, NAME " %p: have output buffer", this); } if (out_len == 0 && this->peaks) { outio->status = SPA_STATUS_HAVE_DATA; diff --git a/spa/plugins/audioconvert/splitter.c b/spa/plugins/audioconvert/splitter.c index 48dc1c3ca..6877df8f6 100644 --- a/spa/plugins/audioconvert/splitter.c +++ b/spa/plugins/audioconvert/splitter.c @@ -860,7 +860,7 @@ static int impl_node_process(void *object) inio, inio->status, inio->buffer_id); if (SPA_UNLIKELY(inio->status != SPA_STATUS_HAVE_DATA)) - return SPA_STATUS_NEED_DATA; + return inio->status; if (SPA_UNLIKELY(inio->buffer_id >= inport->n_buffers)) return inio->status = -EINVAL; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 450eb87f1..134738504 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -906,6 +906,9 @@ static inline int process_node(void *data) } else { resume_node(this, status); } + if (status & SPA_STATUS_DRAINED) { + pw_context_driver_emit_drained(this->context, this); + } return 0; } diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 795614caf..a59698b22 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -235,6 +235,7 @@ pw_core_resource_errorf(struct pw_resource *resource, uint32_t id, int seq, #define pw_context_driver_emit_xrun(c,n) pw_context_driver_emit(c, xrun, 0, n) #define pw_context_driver_emit_incomplete(c,n) pw_context_driver_emit(c, incomplete, 0, n) #define pw_context_driver_emit_timeout(c,n) pw_context_driver_emit(c, timeout, 0, n) +#define pw_context_driver_emit_drained(c,n) pw_context_driver_emit(c, drained, 0, n) struct pw_context_driver_events { #define PW_VERSION_CONTEXT_DRIVER_EVENTS 0 @@ -248,6 +249,8 @@ struct pw_context_driver_events { void (*incomplete) (void *data, struct pw_impl_node *node); /** The driver got a sync timeout */ void (*timeout) (void *data, struct pw_impl_node *node); + /** a node drained */ + void (*drained) (void *data, struct pw_impl_node *node); }; #define pw_registry_resource(r,m,v,...) pw_resource_call(r, struct pw_registry_events,m,v,##__VA_ARGS__) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index d1b41e191..ec8482438 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -331,7 +331,6 @@ do_call_drained(struct spa_loop *loop, struct pw_stream *stream = &impl->this; pw_log_trace(NAME" %p: drained", stream); pw_stream_emit_drained(stream); - impl->draining = false; return 0; } @@ -765,8 +764,7 @@ again: pw_log_trace(NAME" %p: process out status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64, stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay); - res = 0; - if (io->status != SPA_STATUS_HAVE_DATA) { + if ((res = io->status) != SPA_STATUS_HAVE_DATA) { /* recycle old buffer */ if ((b = get_buffer(stream, io->buffer_id)) != NULL) { pw_log_trace(NAME" %p: recycle buffer %d", stream, b->id); @@ -776,20 +774,17 @@ again: /* pop new buffer */ if ((b = pop_queue(impl, &impl->queued)) != NULL) { io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_DATA; + res = io->status = SPA_STATUS_HAVE_DATA; pw_log_trace(NAME" %p: pop %d %p", stream, b->id, io); + } else if (impl->draining) { + impl->drained = true; + io->buffer_id = SPA_ID_INVALID; + res = io->status = SPA_STATUS_DRAINED; + pw_log_trace(NAME" %p: draining", stream); } else { io->buffer_id = SPA_ID_INVALID; - io->status = SPA_STATUS_NEED_DATA; + res = io->status = SPA_STATUS_NEED_DATA; pw_log_trace(NAME" %p: no more buffers %p", stream, io); - if (impl->draining && !impl->drained) { - b = pop_queue(impl, &impl->dequeued); - io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_DATA; - b->this.buffer->datas[0].chunk->size = 0; - pw_log_trace(NAME" %p: drain buffer %d", stream, b->id); - impl->drained = true; - } } } @@ -803,7 +798,6 @@ again: } copy_position(impl, impl->queued.outcount); - res = io->status; pw_log_trace(NAME" %p: res %d", stream, res); return res; @@ -1044,7 +1038,7 @@ static const struct pw_core_events core_events = { .error = on_core_error, }; -static void context_xrun(void *data, struct pw_impl_node *node) +static void context_drained(void *data, struct pw_impl_node *node) { struct stream *impl = data; if (impl->node != node) @@ -1055,7 +1049,7 @@ static void context_xrun(void *data, struct pw_impl_node *node) static const struct pw_context_driver_events context_events = { PW_VERSION_CONTEXT_DRIVER_EVENTS, - .xrun = context_xrun, + .drained = context_drained, }; static struct stream *