stream: handle driver sources and sinks

This commit is contained in:
Wim Taymans 2018-04-25 15:58:38 +02:00
parent 9376ba6098
commit b3c842a7ec

View file

@ -386,7 +386,7 @@ static int impl_port_set_io(struct spa_node *node, enum spa_direction direction,
struct pw_type *t = impl->t; struct pw_type *t = impl->t;
int res = 0; int res = 0;
if (id == t->io.Buffers) { if (id == t->io.Buffers && size >= sizeof(struct spa_io_buffers)) {
pw_log_debug("stream %p: set io %d %p %zd", impl, id, data, size); pw_log_debug("stream %p: set io %d %p %zd", impl, id, data, size);
if (impl->use_converter) { if (impl->use_converter) {
@ -489,15 +489,15 @@ static int port_set_format(struct spa_node *node,
if (spa_pod_is_object_type(format, t->spa_format)) { if (spa_pod_is_object_type(format, t->spa_format)) {
impl->format = pw_spa_pod_copy(format); impl->format = pw_spa_pod_copy(format);
((struct spa_pod_object*)impl->format)->body.id = t->param.idFormat; ((struct spa_pod_object*)impl->format)->body.id = t->param.idFormat;
if ((res = configure_converter(impl)) < 0) {
pw_stream_finish_format(stream, res, NULL, 0);
return res;
}
} }
else else
impl->format = NULL; impl->format = NULL;
if ((res = configure_converter(impl)) < 0) {
pw_stream_finish_format(stream, res, NULL, 0);
return res;
}
count = spa_hook_list_call(&stream->listener_list, count = spa_hook_list_call(&stream->listener_list,
struct pw_stream_events, struct pw_stream_events,
format_changed, impl->format); format_changed, impl->format);
@ -766,14 +766,16 @@ static int impl_node_process_output(struct spa_node *node)
if (!SPA_FLAG_CHECK(res, SPA_STATUS_HAVE_BUFFER)) if (!SPA_FLAG_CHECK(res, SPA_STATUS_HAVE_BUFFER))
goto again; goto again;
} else { } else {
call_process(impl); if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) {
if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0) call_process(impl);
goto again; if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0)
goto again;
}
res = io->status;
} }
pw_log_trace("stream %p: res %d", stream, res); pw_log_trace("stream %p: res %d", stream, res);
return res; return res;
} }
@ -1203,20 +1205,6 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
return 0; return 0;
} }
struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
pw_log_trace("stream %p: no more buffers", stream);
return NULL;
}
pw_log_trace("stream %p: dequeue buffer %d", stream, b->id);
return &b->this;
}
static int static int
do_process(struct spa_loop *loop, do_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data) bool async, uint32_t seq, const void *data, size_t size, void *user_data)
@ -1226,6 +1214,31 @@ do_process(struct spa_loop *loop,
return 0; return 0;
} }
static inline int call_trigger(struct stream *impl)
{
int res = 0;
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) {
res = pw_loop_invoke(impl->core->data_loop,
do_process, 1, NULL, 0, false, impl);
}
return res;
}
struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
pw_log_trace("stream %p: no more buffers", stream);
call_trigger(impl);
return NULL;
}
pw_log_trace("stream %p: dequeue buffer %d", stream, b->id);
return &b->this;
}
int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
{ {
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
@ -1241,9 +1254,5 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
if ((res = push_queue(impl, &impl->queued, b)) < 0) if ((res = push_queue(impl, &impl->queued, b)) < 0)
return res; return res;
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { return call_trigger(impl);
pw_loop_invoke(impl->core->data_loop,
do_process, 1, NULL, 0, false, impl);
}
return 0;
} }