From b3c842a7eca7b6cbe85f42afb27e7e384d206ada Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 25 Apr 2018 15:58:38 +0200 Subject: [PATCH] stream: handle driver sources and sinks --- src/pipewire/stream.c | 67 ++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 9dead4a6c..d87bc42cd 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -386,7 +386,7 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction, struct pw_type *t = impl->t; int res = 0; - if (id == t->io.Buffers) { + if (id == t->io.Buffers && size >= sizeof(struct spa_io_buffers)) { pw_log_debug("stream %p: set io %d %p %zd", impl, id, data, size); if (impl->use_converter) { @@ -489,15 +489,15 @@ static int port_set_format(struct spa_node *node, if (spa_pod_is_object_type(format, t->spa_format)) { impl->format = pw_spa_pod_copy(format); ((struct spa_pod_object*)impl->format)->body.id = t->param.idFormat; + + if ((res = configure_converter(impl)) < 0) { + pw_stream_finish_format(stream, res, NULL, 0); + return res; + } } else impl->format = NULL; - if ((res = configure_converter(impl)) < 0) { - pw_stream_finish_format(stream, res, NULL, 0); - return res; - } - count = spa_hook_list_call(&stream->listener_list, struct pw_stream_events, format_changed, impl->format); @@ -766,14 +766,16 @@ static int impl_node_process_output(struct spa_node *node) if (!SPA_FLAG_CHECK(res, SPA_STATUS_HAVE_BUFFER)) goto again; } else { - call_process(impl); - if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) - goto again; + if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { + call_process(impl); + if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) + goto again; + } + res = io->status; } pw_log_trace("stream %p: res %d", stream, res); - return res; } @@ -1203,20 +1205,6 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) return 0; } -struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer *b; - - if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { - pw_log_trace("stream %p: no more buffers", stream); - return NULL; - } - pw_log_trace("stream %p: dequeue buffer %d", stream, b->id); - - return &b->this; -} - static int do_process(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) @@ -1226,6 +1214,31 @@ do_process(struct spa_loop *loop, return 0; } +static inline int call_trigger(struct stream *impl) +{ + int res = 0; + if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { + res = pw_loop_invoke(impl->core->data_loop, + do_process, 1, NULL, 0, false, impl); + } + return res; +} + +struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + struct buffer *b; + + if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { + pw_log_trace("stream %p: no more buffers", stream); + call_trigger(impl); + return NULL; + } + pw_log_trace("stream %p: dequeue buffer %d", stream, b->id); + + return &b->this; +} + int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); @@ -1241,9 +1254,5 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) if ((res = push_queue(impl, &impl->queued, b)) < 0) return res; - if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { - pw_loop_invoke(impl->core->data_loop, - do_process, 1, NULL, 0, false, impl); - } - return 0; + return call_trigger(impl); }