stream: implement drain

Set the drained flag.

If the server calls us and we're out of buffers and the drained
flag is set, emit the drained signal.

Deactivate the node on disconnect.

Make sure we have no more pending process callbacks in the invoke
queue before when we pause.
This commit is contained in:
Wim Taymans 2019-08-16 15:14:23 +02:00
parent 7ad111de47
commit 23fe46b698

View file

@ -154,6 +154,7 @@ struct stream {
unsigned int free_data:1;
unsigned int subscribe:1;
unsigned int alloc_buffers:1;
unsigned int draining:1;
};
static int get_param_index(uint32_t id)
@ -303,6 +304,7 @@ do_call_process(struct spa_loop *loop,
static void call_process(struct stream *impl)
{
pw_log_trace(NAME" %p: call process", impl);
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) {
do_call_process(NULL, false, 1, NULL, 0, impl);
}
@ -312,6 +314,24 @@ static void call_process(struct stream *impl)
}
}
static int
do_call_drained(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
struct pw_stream *stream = &impl->this;
pw_log_trace(NAME" %p: drained", stream);
pw_stream_emit_drained(stream);
impl->draining = false;
return 0;
}
static void call_drained(struct stream *impl)
{
pw_loop_invoke(impl->core->main_loop,
do_call_drained, 1, NULL, 0, false, impl);
}
static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
{
struct stream *impl = object;
@ -338,7 +358,10 @@ static int impl_send_command(void *object, const struct spa_command *command)
switch (SPA_NODE_COMMAND_ID(command)) {
case SPA_NODE_COMMAND_Pause:
pw_loop_invoke(impl->core->main_loop,
NULL, 0, NULL, 0, false, impl);
if (stream->state == PW_STREAM_STATE_STREAMING) {
pw_log_debug(NAME" %p: pause", stream);
stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
}
@ -785,15 +808,20 @@ again:
io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_NEED_BUFFER;
pw_log_trace(NAME" %p: no more buffers %p", stream, io);
if (impl->draining) {
call_drained(impl);
goto exit;
}
}
}
if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) {
if (!impl->draining && !SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) {
call_process(impl);
if (spa_ringbuffer_get_read_index(&impl->queued.ring, &index) >= MIN_QUEUED &&
io->status == SPA_STATUS_NEED_BUFFER)
goto again;
}
exit:
copy_position(impl, impl->queued.outcount);
res = io->status;
@ -1491,6 +1519,9 @@ int pw_stream_disconnect(struct pw_stream *stream)
pw_log_debug(NAME" %p: disconnect", stream);
impl->disconnecting = true;
if (impl->node)
pw_node_set_active(impl->node, false);
if (stream->proxy)
pw_proxy_destroy(stream->proxy);
@ -1498,6 +1529,7 @@ int pw_stream_disconnect(struct pw_stream *stream)
pw_node_destroy(impl->node);
impl->node = NULL;
}
return 0;
}
@ -1698,12 +1730,20 @@ do_flush(struct spa_loop *loop,
return 0;
}
static int
do_drain(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
impl->draining = true;
return 0;
}
SPA_EXPORT
int pw_stream_flush(struct pw_stream *stream, bool drain)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_loop_invoke(impl->core->data_loop,
do_flush, 1, NULL, 0, true, impl);
drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
return 0;
}