loopback: Flush asyncmsgq from the right context

u->asyncmsg is accessed from two IO threads. teardown() shouldn't
flush the queue from the main thread while both IO threads are still
potentially using the queue. This patch fixes that error by flushing
the queue from the sink input thread when the sink input is being
unlinked.

Flushing the queue in teardown() caused this assertion in
pa_asyncmsgq_get() to crash sometimes: pa_assert(!a->current)
This commit is contained in:
Tanu Kaskinen 2013-03-14 22:07:14 +02:00
parent ee0a5d5014
commit 259be540e3

View file

@ -138,29 +138,35 @@ static void teardown(struct userdata *u) {
pa_assert(u); pa_assert(u);
pa_assert_ctl_context(); pa_assert_ctl_context();
if (u->asyncmsgq)
pa_asyncmsgq_flush(u->asyncmsgq, 0);
u->adjust_time = 0; u->adjust_time = 0;
enable_adjust_timer(u, false); enable_adjust_timer(u, false);
if (u->sink_input) /* Handling the asyncmsgq between the source output and the sink input
pa_sink_input_unlink(u->sink_input); * requires some care. When the source output is unlinked, nothing needs
* to be done for the asyncmsgq, because the source output is the sending
if (u->source_output) * end. But when the sink input is unlinked, we should ensure that the
pa_source_output_unlink(u->source_output); * asyncmsgq is emptied, because the messages in the queue hold references
* to the sink input. Also, we need to ensure that new messages won't be
if (u->sink_input) { * written to the queue after we have emptied it.
u->sink_input->parent.process_msg = pa_sink_input_process_msg; *
pa_sink_input_unref(u->sink_input); * Emptying the queue can be done in the state_changed() callback of the
u->sink_input = NULL; * sink input, when the new state is "unlinked".
} *
* Preventing new messages from being written to the queue can be achieved
* by unlinking the source output before unlinking the sink input. There
* are no other writers for that queue, so this is sufficient. */
if (u->source_output) { if (u->source_output) {
u->source_output->parent.process_msg = pa_source_output_process_msg; pa_source_output_unlink(u->source_output);
pa_source_output_unref(u->source_output); pa_source_output_unref(u->source_output);
u->source_output = NULL; u->source_output = NULL;
} }
if (u->sink_input) {
pa_sink_input_unlink(u->sink_input);
pa_sink_input_unref(u->sink_input);
u->sink_input = NULL;
}
} }
/* Called from main context */ /* Called from main context */
@ -646,6 +652,17 @@ static void sink_input_kill_cb(pa_sink_input *i) {
pa_module_unload_request(u->module, TRUE); pa_module_unload_request(u->module, TRUE);
} }
/* Called from the output thread context */
static void sink_input_state_change_cb(pa_sink_input *i, pa_sink_input_state_t state) {
struct userdata *u;
pa_sink_input_assert_ref(i);
pa_assert_se(u = i->userdata);
if (state == PA_SINK_INPUT_UNLINKED)
pa_asyncmsgq_flush(u->asyncmsgq, false);
}
/* Called from main thread */ /* Called from main thread */
static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) { static void sink_input_moving_cb(pa_sink_input *i, pa_sink *dest) {
struct userdata *u; struct userdata *u;
@ -857,6 +874,7 @@ int pa__init(pa_module *m) {
u->sink_input->pop = sink_input_pop_cb; u->sink_input->pop = sink_input_pop_cb;
u->sink_input->process_rewind = sink_input_process_rewind_cb; u->sink_input->process_rewind = sink_input_process_rewind_cb;
u->sink_input->kill = sink_input_kill_cb; u->sink_input->kill = sink_input_kill_cb;
u->sink_input->state_change = sink_input_state_change_cb;
u->sink_input->attach = sink_input_attach_cb; u->sink_input->attach = sink_input_attach_cb;
u->sink_input->detach = sink_input_detach_cb; u->sink_input->detach = sink_input_detach_cb;
u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; u->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;