replace sink/source SET_STATE handlers with callbacks

There are no behaviour changes, the code from almost all the SET_STATE
handlers is moved with minimal changes to the newly introduced
set_state_in_io_thread() callback. The only exception is module-tunnel,
which has to call pa_sink_render() after pa_sink.thread_info.state has
been updated. The set_state_in_io_thread() callback is called before
updating that variable, so moving the SET_STATE handler code to the
callback isn't possible.

The purpose of this change is to make it easier to get state change
handling right in modules. Hooking to the SET_STATE messages in modules
required care in calling pa_sink/source_process_msg() at the right time
(or not calling it at all, as was the case on resume failures), and
there were a few bugs (fixed before this patch). Now the core takes care
of ordering things correctly.

Another motivation for this change is that there was some talk about
adding a suspend_cause variable to pa_sink/source.thread_info. The
variable would be updated in the core SET_STATE handler, but that would
not work with the old design, because in case of resume failures modules
didn't call the core message handler.
This commit is contained in:
Tanu Kaskinen 2018-03-13 19:40:38 +02:00
parent 73b8a57078
commit b2537a8f38
25 changed files with 874 additions and 658 deletions

View file

@ -643,8 +643,6 @@ fail:
/* Called from IO context */
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SINK(o)->userdata;
bool do_trigger = false, quick = true;
pa_sink_state_t new_state;
switch (code) {
@ -662,68 +660,73 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
return 0;
}
}
case PA_SINK_MESSAGE_SET_STATE:
new_state = PA_PTR_TO_UINT(data);
return pa_sink_process_msg(o, code, data, offset, chunk);
}
switch (new_state) {
/* Called from the IO thread. */
static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) {
struct userdata *u;
bool do_trigger = false;
bool quick = true;
case PA_SINK_SUSPENDED:
pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
pa_assert(s);
pa_assert_se(u = s->userdata);
if (!u->source || u->source_suspended)
suspend(u);
switch (new_state) {
do_trigger = true;
case PA_SINK_SUSPENDED:
pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state));
u->sink_suspended = true;
break;
if (!u->source || u->source_suspended)
suspend(u);
case PA_SINK_IDLE:
case PA_SINK_RUNNING:
do_trigger = true;
if (u->sink->thread_info.state == PA_SINK_INIT) {
do_trigger = true;
quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state);
}
u->sink_suspended = true;
break;
if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
case PA_SINK_IDLE:
case PA_SINK_RUNNING:
if (!u->source || u->source_suspended) {
if (unsuspend(u) < 0)
return -1;
quick = false;
}
if (u->sink->thread_info.state == PA_SINK_INIT) {
do_trigger = true;
quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state);
}
do_trigger = true;
if (u->sink->thread_info.state == PA_SINK_SUSPENDED) {
u->out_mmap_current = 0;
u->out_mmap_saved_nfrags = 0;
if (!u->source || u->source_suspended) {
if (unsuspend(u) < 0)
return -1;
quick = false;
}
u->sink_suspended = false;
}
do_trigger = true;
break;
u->out_mmap_current = 0;
u->out_mmap_saved_nfrags = 0;
case PA_SINK_INVALID_STATE:
case PA_SINK_UNLINKED:
case PA_SINK_INIT:
;
u->sink_suspended = false;
}
break;
case PA_SINK_INVALID_STATE:
case PA_SINK_UNLINKED:
case PA_SINK_INIT:
;
}
if (do_trigger)
trigger(u, new_state, u->source ? u->source->thread_info.state : PA_SOURCE_INVALID_STATE, quick);
return pa_sink_process_msg(o, code, data, offset, chunk);
return 0;
}
static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
struct userdata *u = PA_SOURCE(o)->userdata;
bool do_trigger = false, quick = true;
pa_source_state_t new_state;
switch (code) {
@ -740,61 +743,68 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off
*((int64_t*) data) = (int64_t)r;
return 0;
}
}
case PA_SOURCE_MESSAGE_SET_STATE:
new_state = PA_PTR_TO_UINT(data);
return pa_source_process_msg(o, code, data, offset, chunk);
}
switch (new_state) {
/* Called from the IO thread. */
static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) {
struct userdata *u;
bool do_trigger = false;
bool quick = true;
case PA_SOURCE_SUSPENDED:
pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
pa_assert(s);
pa_assert_se(u = s->userdata);
if (!u->sink || u->sink_suspended)
suspend(u);
switch (new_state) {
do_trigger = true;
case PA_SOURCE_SUSPENDED:
pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state));
u->source_suspended = true;
break;
if (!u->sink || u->sink_suspended)
suspend(u);
case PA_SOURCE_IDLE:
case PA_SOURCE_RUNNING:
do_trigger = true;
if (u->source->thread_info.state == PA_SOURCE_INIT) {
do_trigger = true;
quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state);
}
u->source_suspended = true;
break;
if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
case PA_SOURCE_IDLE:
case PA_SOURCE_RUNNING:
if (!u->sink || u->sink_suspended) {
if (unsuspend(u) < 0)
return -1;
quick = false;
}
if (u->source->thread_info.state == PA_SOURCE_INIT) {
do_trigger = true;
quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state);
}
do_trigger = true;
if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) {
u->in_mmap_current = 0;
u->in_mmap_saved_nfrags = 0;
if (!u->sink || u->sink_suspended) {
if (unsuspend(u) < 0)
return -1;
quick = false;
}
u->source_suspended = false;
}
break;
do_trigger = true;
case PA_SOURCE_UNLINKED:
case PA_SOURCE_INIT:
case PA_SOURCE_INVALID_STATE:
;
u->in_mmap_current = 0;
u->in_mmap_saved_nfrags = 0;
u->source_suspended = false;
}
break;
case PA_SOURCE_UNLINKED:
case PA_SOURCE_INIT:
case PA_SOURCE_INVALID_STATE:
;
}
if (do_trigger)
trigger(u, u->sink ? u->sink->thread_info.state : PA_SINK_INVALID_STATE, new_state, quick);
return pa_source_process_msg(o, code, data, offset, chunk);
return 0;
}
static void sink_get_volume(pa_sink *s) {
@ -1334,6 +1344,7 @@ int pa__init(pa_module*m) {
}
u->source->parent.process_msg = source_process_msg;
u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
u->source->userdata = u;
pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
@ -1403,6 +1414,7 @@ int pa__init(pa_module*m) {
}
u->sink->parent.process_msg = sink_process_msg;
u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
u->sink->userdata = u;
pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);