rework sink input/source output state machine

git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1478 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2007-06-14 17:12:40 +00:00
parent 260dd1e886
commit 5e72ac353e
4 changed files with 98 additions and 61 deletions

View file

@ -166,7 +166,7 @@ pa_sink_input* pa_sink_input_new(
i->parent.process_msg = pa_sink_input_process_msg; i->parent.process_msg = pa_sink_input_process_msg;
i->core = core; i->core = core;
pa_atomic_store(&i->state, PA_SINK_INPUT_DRAINED); i->state = PA_SINK_INPUT_RUNNING;
i->flags = flags; i->flags = flags;
i->name = pa_xstrdup(data->name); i->name = pa_xstrdup(data->name);
i->driver = pa_xstrdup(data->driver); i->driver = pa_xstrdup(data->driver);
@ -181,7 +181,6 @@ pa_sink_input* pa_sink_input_new(
i->volume = data->volume; i->volume = data->volume;
i->muted = data->muted; i->muted = data->muted;
i->process_msg = NULL;
i->peek = NULL; i->peek = NULL;
i->drop = NULL; i->drop = NULL;
i->kill = NULL; i->kill = NULL;
@ -189,6 +188,9 @@ pa_sink_input* pa_sink_input_new(
i->underrun = NULL; i->underrun = NULL;
i->userdata = NULL; i->userdata = NULL;
i->thread_info.state = i->state;
pa_atomic_store(&i->thread_info.drained, 1);
i->thread_info.sample_spec = i->sample_spec;
i->thread_info.silence_memblock = NULL; i->thread_info.silence_memblock = NULL;
/* i->thread_info.move_silence = 0; */ /* i->thread_info.move_silence = 0; */
pa_memchunk_reset(&i->thread_info.resampled_chunk); pa_memchunk_reset(&i->thread_info.resampled_chunk);
@ -210,28 +212,41 @@ pa_sink_input* pa_sink_input_new(
return i; return i;
} }
static int sink_input_set_state(pa_sink_input *i, pa_sink_input_state_t state) {
pa_assert(i);
if (state == PA_SINK_INPUT_DRAINED)
state = PA_SINK_INPUT_RUNNING;
if (i->state == state)
return 0;
if (pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i), PA_SINK_INPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
return -1;
i->state = state;
return 0;
}
void pa_sink_input_disconnect(pa_sink_input *i) { void pa_sink_input_disconnect(pa_sink_input *i) {
pa_assert(i); pa_assert(i);
pa_return_if_fail(pa_sink_input_get_state(i) != PA_SINK_INPUT_DISCONNECTED); pa_return_if_fail(i->state != PA_SINK_INPUT_DISCONNECTED);
pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL); pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_REMOVE_INPUT, i, NULL);
pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL); pa_idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL);
pa_idxset_remove_by_data(i->sink->inputs, i, NULL); pa_idxset_remove_by_data(i->sink->inputs, i, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index);
sink_input_set_state(i, PA_SINK_INPUT_DISCONNECTED);
pa_sink_update_status(i->sink); pa_sink_update_status(i->sink);
i->sink = NULL; i->sink = NULL;
i->process_msg = NULL;
i->peek = NULL; i->peek = NULL;
i->drop = NULL; i->drop = NULL;
i->kill = NULL; i->kill = NULL;
i->get_latency = NULL; i->get_latency = NULL;
i->underrun = NULL; i->underrun = NULL;
pa_atomic_store(&i->state, PA_SINK_INPUT_DISCONNECTED);
} }
static void sink_input_free(pa_object *o) { static void sink_input_free(pa_object *o) {
@ -240,7 +255,8 @@ static void sink_input_free(pa_object *o) {
pa_assert(i); pa_assert(i);
pa_assert(pa_sink_input_refcnt(i) == 0); pa_assert(pa_sink_input_refcnt(i) == 0);
pa_sink_input_disconnect(i); if (i->state != PA_SINK_INPUT_DISCONNECTED)
pa_sink_input_disconnect(i);
pa_log_info("Freeing output %u \"%s\"", i->index, i->name); pa_log_info("Freeing output %u \"%s\"", i->index, i->name);
@ -295,21 +311,15 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
int ret = -1; int ret = -1;
int do_volume_adj_here; int do_volume_adj_here;
int volume_is_norm; int volume_is_norm;
pa_sink_input_state_t state;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_assert(chunk); pa_assert(chunk);
pa_assert(volume); pa_assert(volume);
state = pa_sink_input_get_state(i); if (!i->peek || !i->drop || i->thread_info.state == PA_SINK_INPUT_DISCONNECTED || i->thread_info.state == PA_SINK_INPUT_CORKED)
if (state == PA_SINK_INPUT_DISCONNECTED)
return -1;
if (!i->peek || !i->drop || state == PA_SINK_INPUT_CORKED)
goto finish; goto finish;
pa_assert(state == PA_SINK_INPUT_RUNNING || state == PA_SINK_INPUT_DRAINED); pa_assert(i->thread_info.state == PA_SINK_INPUT_RUNNING || i->thread_info.state == PA_SINK_INPUT_DRAINED);
/* if (i->thread_info.move_silence > 0) { */ /* if (i->thread_info.move_silence > 0) { */
/* size_t l; */ /* size_t l; */
@ -359,7 +369,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
/* It might be necessary to adjust the volume here */ /* It might be necessary to adjust the volume here */
if (do_volume_adj_here && !volume_is_norm) { if (do_volume_adj_here && !volume_is_norm) {
pa_memchunk_make_writable(&tchunk, 0); pa_memchunk_make_writable(&tchunk, 0);
pa_volume_memchunk(&tchunk, &i->sample_spec, &i->thread_info.volume); pa_volume_memchunk(&tchunk, &i->thread_info.sample_spec, &i->thread_info.volume);
} }
pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk); pa_resampler_run(i->thread_info.resampler, &tchunk, &i->thread_info.resampled_chunk);
@ -376,13 +386,13 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
finish: finish:
if (ret < 0 && state == PA_SINK_INPUT_RUNNING && i->underrun) if (ret < 0 && !pa_atomic_load(&i->thread_info.drained) && i->underrun)
i->underrun(i); i->underrun(i);
if (ret >= 0) if (ret >= 0)
pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_RUNNING); pa_atomic_store(&i->thread_info.drained, 0);
else if (ret < 0 && state == PA_SINK_INPUT_RUNNING) else if (ret < 0)
pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED); pa_atomic_store(&i->thread_info.drained, 1);
if (ret >= 0) { if (ret >= 0) {
/* Let's see if we had to apply the volume adjustment /* Let's see if we had to apply the volume adjustment
@ -487,17 +497,9 @@ int pa_sink_input_get_mute(pa_sink_input *i) {
} }
void pa_sink_input_cork(pa_sink_input *i, int b) { void pa_sink_input_cork(pa_sink_input *i, int b) {
pa_sink_input_state_t state;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
state = pa_sink_input_get_state(i); sink_input_set_state(i, b ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING);
pa_assert(state != PA_SINK_INPUT_DISCONNECTED);
if (b && state != PA_SINK_INPUT_CORKED)
pa_atomic_store(&i->state, PA_SINK_INPUT_CORKED);
else if (!b && state == PA_SINK_INPUT_CORKED)
pa_atomic_cmpxchg(&i->state, state, PA_SINK_INPUT_DRAINED);
} }
int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) { int pa_sink_input_set_rate(pa_sink_input *i, uint32_t rate) {
@ -730,7 +732,26 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memc
return 0; return 0;
} }
case PA_SINK_INPUT_MESSAGE_SET_STATE: {
if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
(i->thread_info.state != PA_SINK_INPUT_DRAINED) && (i->thread_info.state != PA_SINK_INPUT_RUNNING))
pa_atomic_store(&i->thread_info.drained, 1);
i->thread_info.state = PA_PTR_TO_UINT(userdata);
return 0;
}
} }
return -1; return -1;
} }
pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i) {
pa_sink_input_assert_ref(i);
if (i->state == PA_SINK_INPUT_RUNNING || i->state == PA_SINK_INPUT_DRAINED)
return pa_atomic_load(&i->thread_info.drained) ? PA_SINK_INPUT_DRAINED : PA_SINK_INPUT_RUNNING;
return i->state;
}

View file

@ -39,8 +39,8 @@ typedef struct pa_sink_input pa_sink_input;
#include <pulsecore/core.h> #include <pulsecore/core.h>
typedef enum pa_sink_input_state { typedef enum pa_sink_input_state {
PA_SINK_INPUT_RUNNING, /*< The stream is alive and kicking */
PA_SINK_INPUT_DRAINED, /*< The stream stopped playing because there was no data to play */ PA_SINK_INPUT_DRAINED, /*< The stream stopped playing because there was no data to play */
PA_SINK_INPUT_RUNNING, /*< The stream is alive and kicking */
PA_SINK_INPUT_CORKED, /*< The stream was corked on user request */ PA_SINK_INPUT_CORKED, /*< The stream was corked on user request */
PA_SINK_INPUT_DISCONNECTED /*< The stream is dead */ PA_SINK_INPUT_DISCONNECTED /*< The stream is dead */
} pa_sink_input_state_t; } pa_sink_input_state_t;
@ -55,7 +55,11 @@ struct pa_sink_input {
uint32_t index; uint32_t index;
pa_core *core; pa_core *core;
pa_atomic_t state;
/* Please note that this state should only be read with
* pa_sink_input_get_state(). That function will transparently
* merge the thread_info.drained value in. */
pa_sink_input_state_t state;
pa_sink_input_flags_t flags; pa_sink_input_flags_t flags;
char *name, *driver; /* may be NULL */ char *name, *driver; /* may be NULL */
@ -70,7 +74,6 @@ struct pa_sink_input {
pa_cvolume volume; pa_cvolume volume;
int muted; int muted;
int (*process_msg)(pa_sink_input *i, int code, void *userdata);
int (*peek) (pa_sink_input *i, pa_memchunk *chunk); int (*peek) (pa_sink_input *i, pa_memchunk *chunk);
void (*drop) (pa_sink_input *i, const pa_memchunk *chunk, size_t length); void (*drop) (pa_sink_input *i, const pa_memchunk *chunk, size_t length);
void (*kill) (pa_sink_input *i); /* may be NULL */ void (*kill) (pa_sink_input *i); /* may be NULL */
@ -80,6 +83,9 @@ struct pa_sink_input {
pa_resample_method_t resample_method; pa_resample_method_t resample_method;
struct { struct {
pa_sink_input_state_t state;
pa_atomic_t drained;
pa_sample_spec sample_spec; pa_sample_spec sample_spec;
pa_memchunk resampled_chunk; pa_memchunk resampled_chunk;
@ -106,6 +112,7 @@ enum {
PA_SINK_INPUT_MESSAGE_SET_MUTE, PA_SINK_INPUT_MESSAGE_SET_MUTE,
PA_SINK_INPUT_MESSAGE_GET_LATENCY, PA_SINK_INPUT_MESSAGE_GET_LATENCY,
PA_SINK_INPUT_MESSAGE_SET_RATE, PA_SINK_INPUT_MESSAGE_SET_RATE,
PA_SINK_INPUT_MESSAGE_SET_STATE,
PA_SINK_INPUT_MESSAGE_MAX PA_SINK_INPUT_MESSAGE_MAX
}; };
@ -166,7 +173,7 @@ pa_resample_method_t pa_sink_input_get_resample_method(pa_sink_input *i);
int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately); int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately);
#define pa_sink_input_get_state(i) ((pa_sink_input_state_t) pa_atomic_load(&i->state)) pa_sink_input_state_t pa_sink_input_get_state(pa_sink_input *i);
/* To be used exclusively by the sink driver thread */ /* To be used exclusively by the sink driver thread */

View file

@ -135,7 +135,7 @@ pa_source_output* pa_source_output_new(
o->parent.process_msg = pa_source_output_process_msg; o->parent.process_msg = pa_source_output_process_msg;
o->core = core; o->core = core;
pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_RUNNING); o->state = PA_SOURCE_OUTPUT_RUNNING;
o->flags = flags; o->flags = flags;
o->name = pa_xstrdup(data->name); o->name = pa_xstrdup(data->name);
o->driver = pa_xstrdup(data->driver); o->driver = pa_xstrdup(data->driver);
@ -147,12 +147,13 @@ pa_source_output* pa_source_output_new(
o->sample_spec = data->sample_spec; o->sample_spec = data->sample_spec;
o->channel_map = data->channel_map; o->channel_map = data->channel_map;
o->process_msg = NULL;
o->push = NULL; o->push = NULL;
o->kill = NULL; o->kill = NULL;
o->get_latency = NULL; o->get_latency = NULL;
o->userdata = NULL; o->userdata = NULL;
o->thread_info.state = o->state;
o->thread_info.sample_spec = o->sample_spec;
o->thread_info.resampler = resampler; o->thread_info.resampler = resampler;
pa_assert_se(pa_idxset_put(core->source_outputs, o, &o->index) == 0); pa_assert_se(pa_idxset_put(core->source_outputs, o, &o->index) == 0);
@ -169,11 +170,22 @@ pa_source_output* pa_source_output_new(
return o; return o;
} }
static int source_output_set_state(pa_source_output *o, pa_source_output_state_t state) {
pa_assert(o);
if (o->state == state)
return 0;
if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), NULL) < 0)
return -1;
o->state = state;
return 0;
}
void pa_source_output_disconnect(pa_source_output*o) { void pa_source_output_disconnect(pa_source_output*o) {
pa_assert(o); pa_assert(o);
pa_return_if_fail(pa_source_output_get_state(o) != PA_SOURCE_OUTPUT_DISCONNECTED); pa_return_if_fail(o->state != PA_SOURCE_OUTPUT_DISCONNECTED);
pa_assert(o->source);
pa_assert(o->source->core);
pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL); pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_REMOVE_OUTPUT, o, NULL);
@ -182,15 +194,13 @@ void pa_source_output_disconnect(pa_source_output*o) {
pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index);
source_output_set_state(o, PA_SOURCE_OUTPUT_DISCONNECTED);
pa_source_update_status(o->source); pa_source_update_status(o->source);
o->source = NULL; o->source = NULL;
o->process_msg = NULL;
o->push = NULL; o->push = NULL;
o->kill = NULL; o->kill = NULL;
o->get_latency = NULL; o->get_latency = NULL;
pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_DISCONNECTED);
} }
static void source_output_free(pa_object* mo) { static void source_output_free(pa_object* mo) {
@ -198,7 +208,8 @@ static void source_output_free(pa_object* mo) {
pa_assert(pa_source_output_refcnt(o) == 0); pa_assert(pa_source_output_refcnt(o) == 0);
pa_source_output_disconnect(o); if (o->state != PA_SOURCE_OUTPUT_DISCONNECTED)
pa_source_output_disconnect(o);
pa_log_info("Freeing output %u \"%s\"", o->index, o->name); pa_log_info("Freeing output %u \"%s\"", o->index, o->name);
@ -242,18 +253,15 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) {
void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
pa_memchunk rchunk; pa_memchunk rchunk;
pa_source_output_state_t state;
pa_source_output_assert_ref(o); pa_source_output_assert_ref(o);
pa_assert(chunk); pa_assert(chunk);
pa_assert(chunk->length); pa_assert(chunk->length);
state = pa_source_output_get_state(o); if (!o->push || o->state == PA_SOURCE_OUTPUT_DISCONNECTED || o->state == PA_SOURCE_OUTPUT_CORKED)
if (!o->push || state == PA_SOURCE_OUTPUT_DISCONNECTED || state == PA_SOURCE_OUTPUT_CORKED)
return; return;
pa_assert(state = PA_SOURCE_OUTPUT_RUNNING); pa_assert(o->state = PA_SOURCE_OUTPUT_RUNNING);
if (!o->thread_info.resampler) { if (!o->thread_info.resampler) {
o->push(o, chunk); o->push(o, chunk);
@ -270,17 +278,9 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
} }
void pa_source_output_cork(pa_source_output *o, int b) { void pa_source_output_cork(pa_source_output *o, int b) {
pa_source_output_state_t state;
pa_source_output_assert_ref(o); pa_source_output_assert_ref(o);
state = pa_source_output_get_state(o); source_output_set_state(o, b ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING);
pa_assert(state != PA_SOURCE_OUTPUT_DISCONNECTED);
if (b && state != PA_SOURCE_OUTPUT_CORKED)
pa_atomic_store(&o->state, PA_SOURCE_OUTPUT_CORKED);
else if (!b && state == PA_SOURCE_OUTPUT_CORKED)
pa_atomic_cmpxchg(&o->state, state, PA_SOURCE_OUTPUT_RUNNING);
} }
int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) { int pa_source_output_set_rate(pa_source_output *o, uint32_t rate) {
@ -393,6 +393,13 @@ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, pa_
return 0; return 0;
} }
case PA_SOURCE_OUTPUT_MESSAGE_SET_STATE: {
o->thread_info.state = PA_PTR_TO_UINT(userdata);
return 0;
}
} }
return -1; return -1;

View file

@ -51,7 +51,7 @@ struct pa_source_output {
uint32_t index; uint32_t index;
pa_core *core; pa_core *core;
pa_atomic_t state; pa_source_output_state_t state;
pa_source_output_flags_t flags; pa_source_output_flags_t flags;
char *name, *driver; /* may be NULL */ char *name, *driver; /* may be NULL */
@ -63,7 +63,6 @@ struct pa_source_output {
pa_sample_spec sample_spec; pa_sample_spec sample_spec;
pa_channel_map channel_map; pa_channel_map channel_map;
int (*process_msg)(pa_sink_input *i, int code, void *userdata);
void (*push)(pa_source_output *o, const pa_memchunk *chunk); void (*push)(pa_source_output *o, const pa_memchunk *chunk);
void (*kill)(pa_source_output* o); /* may be NULL */ void (*kill)(pa_source_output* o); /* may be NULL */
pa_usec_t (*get_latency) (pa_source_output *o); /* may be NULL */ pa_usec_t (*get_latency) (pa_source_output *o); /* may be NULL */
@ -71,6 +70,8 @@ struct pa_source_output {
pa_resample_method_t resample_method; pa_resample_method_t resample_method;
struct { struct {
pa_source_output_state_t state;
pa_sample_spec sample_spec; pa_sample_spec sample_spec;
pa_resampler* resampler; /* may be NULL */ pa_resampler* resampler; /* may be NULL */
@ -85,6 +86,7 @@ PA_DECLARE_CLASS(pa_source_output);
enum { enum {
PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY, PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY,
PA_SOURCE_OUTPUT_MESSAGE_SET_RATE, PA_SOURCE_OUTPUT_MESSAGE_SET_RATE,
PA_SOURCE_OUTPUT_MESSAGE_SET_STATE,
PA_SOURCE_OUTPUT_MESSAGE_MAX PA_SOURCE_OUTPUT_MESSAGE_MAX
}; };
@ -135,7 +137,7 @@ pa_resample_method_t pa_source_output_get_resample_method(pa_source_output *o);
int pa_source_output_move_to(pa_source_output *o, pa_source *dest); int pa_source_output_move_to(pa_source_output *o, pa_source *dest);
#define pa_source_output_get_state(o) ((pa_source_output_state_t) pa_atomic_load(&o->state)) #define pa_source_output_get_state(o) ((o)->state)
/* To be used exclusively by the source driver thread */ /* To be used exclusively by the source driver thread */