render new data always in the master sink's thread, fixing missing locking

git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1871 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2007-09-19 22:21:55 +00:00
parent a8a9ee499d
commit 75647bc38f

View file

@ -87,8 +87,9 @@ struct output {
pa_sink *sink; pa_sink *sink;
pa_sink_input *sink_input; pa_sink_input *sink_input;
pa_asyncmsgq *asyncmsgq; pa_asyncmsgq *inq, /* Message queue from the master to this sink input */
pa_rtpoll_item *rtpoll_item; *outq; /* Message queue from this sink input to the master */
pa_rtpoll_item *inq_rtpoll_item, *outq_rtpoll_item;
pa_memblockq *memblockq; pa_memblockq *memblockq;
@ -106,8 +107,6 @@ struct userdata {
pa_thread_mq thread_mq; pa_thread_mq thread_mq;
pa_rtpoll *rtpoll; pa_rtpoll *rtpoll;
pa_mutex *mutex;
struct output *master; struct output *master;
pa_time_event *time_event; pa_time_event *time_event;
@ -134,7 +133,8 @@ struct userdata {
enum { enum {
SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX, SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX,
SINK_MESSAGE_REMOVE_OUTPUT SINK_MESSAGE_REMOVE_OUTPUT,
SINK_MESSAGE_NEED
}; };
enum { enum {
@ -275,9 +275,51 @@ finish:
pa_log_debug("Thread shutting down"); pa_log_debug("Thread shutting down");
} }
static void request_memblock(struct output *o, size_t length) { static void render_memblock(struct userdata *u, struct output *o, size_t length) {
pa_memchunk chunk; pa_assert(u);
pa_assert(o);
if (!PA_SINK_OPENED(u->sink->thread_info.state))
return;
/* We are run by the master output (u->master), possibly on behalf
* of another output (o). The other output is waiting for us,
* hence it is safe to access its mainblockq directly. */
/* Maybe there's some data in the requesting output's queue
* now? */
while (pa_asyncmsgq_process_one(o->inq) > 0)
;
/* Ok, now let's prepare some data if we really have to */
while (!pa_memblockq_is_readable(o->memblockq)) {
struct output *j;
pa_memchunk chunk;
/* Render data! */
pa_sink_render(u->sink, length, &chunk);
/* OK, let's send this data to the other threads */
for (j = o->userdata->thread_info.outputs; j; j = j->next)
/* Send to other outputs, which are not the requesting
* one, and not the master */
if (j != o && j != u->master && j->sink_input)
pa_asyncmsgq_post(j->inq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
/* Now push it into the master queue */
pa_memblockq_push_align(u->master->memblockq, &chunk);
/* And into the requesting output's queue */
if (o != u->master)
pa_memblockq_push_align(o->memblockq, &chunk);
pa_memblock_unref(chunk.memblock);
}
}
static void request_memblock(struct output *o, size_t length) {
pa_assert(o); pa_assert(o);
pa_sink_input_assert_ref(o->sink_input); pa_sink_input_assert_ref(o->sink_input);
pa_sink_assert_ref(o->userdata->sink); pa_sink_assert_ref(o->userdata->sink);
@ -285,7 +327,7 @@ static void request_memblock(struct output *o, size_t length) {
/* If another thread already prepared some data we received /* If another thread already prepared some data we received
* the data over the asyncmsgq, hence let's first process * the data over the asyncmsgq, hence let's first process
* it. */ * it. */
while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0) while (pa_asyncmsgq_process_one(o->inq) > 0)
; ;
/* Check whether we're now readable */ /* Check whether we're now readable */
@ -293,33 +335,16 @@ static void request_memblock(struct output *o, size_t length) {
return; return;
/* OK, we need to prepare new data */ /* OK, we need to prepare new data */
pa_mutex_lock(o->userdata->mutex);
if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) { if (o == o->userdata->master)
/* OK, we're the master, so let's render some data */
/* Maybe there's some data now? */ render_memblock(o->userdata, o, length);
while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
; else
/* We're not the master, we need to ask the master to do the
/* Ok, now let's prepare some data if we really have to */ * rendering for us */
while (!pa_memblockq_is_readable(o->memblockq)) {
struct output *j; pa_asyncmsgq_send(o->outq, PA_MSGOBJECT(o->userdata->sink), SINK_MESSAGE_NEED, o, length, NULL);
/* Do it! */
pa_sink_render(o->userdata->sink, length, &chunk);
/* OK, let's send this data to the other threads */
for (j = o->userdata->thread_info.outputs; j; j = j->next)
if (j != o && j->sink_input)
pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
/* And push it into our own queue */
pa_memblockq_push_align(o->memblockq, &chunk);
pa_memblock_unref(chunk.memblock);
}
}
pa_mutex_unlock(o->userdata->mutex);
} }
/* Called from I/O thread context */ /* Called from I/O thread context */
@ -327,8 +352,7 @@ static int sink_input_peek_cb(pa_sink_input *i, size_t length, pa_memchunk *chun
struct output *o; struct output *o;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
o = i->userdata; pa_assert_se(o = i->userdata);
pa_assert(o);
/* If necessary, get some new data */ /* If necessary, get some new data */
request_memblock(o, length); request_memblock(o, length);
@ -342,8 +366,7 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_assert(length > 0); pa_assert(length > 0);
o = i->userdata; pa_assert_se(o = i->userdata);
pa_assert(o);
pa_memblockq_drop(o->memblockq, length); pa_memblockq_drop(o->memblockq, length);
} }
@ -353,23 +376,42 @@ static void sink_input_attach_cb(pa_sink_input *i) {
struct output *o; struct output *o;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
o = i->userdata; pa_assert_se(o = i->userdata);
pa_assert(o);
pa_assert(!o->inq_rtpoll_item);
if (o->userdata->master == o) { if (o->userdata->master == o) {
struct output *k;
pa_assert(!o->outq_rtpoll_item);
/* Set up the queues from the outputs to the master */
for (k = o->userdata->thread_info.outputs; k; k = k->next) {
pa_assert(!k->outq_rtpoll_item);
if (o == k)
continue;
k->outq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
i->sink->rtpoll,
PA_RTPOLL_EARLY+1, /* This one has a slightly lower priority than the normal message handling */
k->outq);
}
/* Calling these two functions here is safe, because both /* Calling these two functions here is safe, because both
* threads that might access this sink input are known to be * threads that might access this sink are known to be
* waiting for us. */ * waiting for us. */
pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq); pa_sink_set_asyncmsgq(o->userdata->sink, i->sink->asyncmsgq);
pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll); pa_sink_set_rtpoll(o->userdata->sink, i->sink->rtpoll);
pa_sink_attach_within_thread(o->userdata->sink); pa_sink_attach_within_thread(o->userdata->sink);
} }
pa_assert(!o->rtpoll_item); /* Set up the queues from the inputs to the master */
o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq( o->inq_rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
i->sink->rtpoll, i->sink->rtpoll,
PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */ PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */
o->asyncmsgq); o->inq);
} }
/* Called from I/O thread context */ /* Called from I/O thread context */
@ -377,15 +419,27 @@ static void sink_input_detach_cb(pa_sink_input *i) {
struct output *o; struct output *o;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
o = i->userdata; pa_assert_se(o = i->userdata);
pa_assert(o);
pa_assert(o->rtpoll_item); pa_assert(o->inq_rtpoll_item);
pa_rtpoll_item_free(o->rtpoll_item); pa_rtpoll_item_free(o->inq_rtpoll_item);
o->rtpoll_item = NULL; o->inq_rtpoll_item = NULL;
if (o->userdata->master == o) if (o->userdata->master == o) {
struct output *k;
pa_sink_detach_within_thread(o->userdata->sink); pa_sink_detach_within_thread(o->userdata->sink);
for (k = o->userdata->thread_info.outputs; k; k = k->next) {
if (o == k)
continue;
pa_assert(k->outq_rtpoll_item);
pa_rtpoll_item_free(k->outq_rtpoll_item);
k->outq_rtpoll_item = NULL;
}
}
} }
/* Called from main context */ /* Called from main context */
@ -433,6 +487,7 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
return pa_sink_input_process_msg(obj, code, data, offset, chunk); return pa_sink_input_process_msg(obj, code, data, offset, chunk);
} }
/* Called from main context */
static int suspend(struct userdata *u) { static int suspend(struct userdata *u) {
struct output *o; struct output *o;
uint32_t idx; uint32_t idx;
@ -458,6 +513,7 @@ static int suspend(struct userdata *u) {
return 0; return 0;
} }
/* Called from main context */
static int unsuspend(struct userdata *u) { static int unsuspend(struct userdata *u) {
struct output *o; struct output *o;
uint32_t idx; uint32_t idx;
@ -485,12 +541,12 @@ static int unsuspend(struct userdata *u) {
return 0; return 0;
} }
/* Called from main context */
static int sink_set_state(pa_sink *sink, pa_sink_state_t state) { static int sink_set_state(pa_sink *sink, pa_sink_state_t state) {
struct userdata *u; struct userdata *u;
pa_sink_assert_ref(sink); pa_sink_assert_ref(sink);
u = sink->userdata; pa_assert_se(u = sink->userdata);
pa_assert(u);
/* Please note that in contrast to the ALSA modules we call /* Please note that in contrast to the ALSA modules we call
* suspend/unsuspend from main context here! */ * suspend/unsuspend from main context here! */
@ -575,6 +631,10 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
case SINK_MESSAGE_REMOVE_OUTPUT: case SINK_MESSAGE_REMOVE_OUTPUT:
PA_LLIST_REMOVE(struct output, u->thread_info.outputs, (struct output*) data); PA_LLIST_REMOVE(struct output, u->thread_info.outputs, (struct output*) data);
break; break;
case SINK_MESSAGE_NEED:
render_memblock(u, data, (size_t) offset);
break;
} }
return pa_sink_process_msg(o, code, data, offset, chunk); return pa_sink_process_msg(o, code, data, offset, chunk);
@ -765,8 +825,10 @@ static struct output *output_new(struct userdata *u, pa_sink *sink) {
o = pa_xnew(struct output, 1); o = pa_xnew(struct output, 1);
o->userdata = u; o->userdata = u;
o->asyncmsgq = pa_asyncmsgq_new(0); o->inq = pa_asyncmsgq_new(0);
o->rtpoll_item = NULL; o->outq = pa_asyncmsgq_new(0);
o->inq_rtpoll_item = NULL;
o->outq_rtpoll_item = NULL;
o->sink = sink; o->sink = sink;
o->sink_input = NULL; o->sink_input = NULL;
o->memblockq = pa_memblockq_new( o->memblockq = pa_memblockq_new(
@ -809,9 +871,12 @@ fail:
if (o->memblockq) if (o->memblockq)
pa_memblockq_free(o->memblockq); pa_memblockq_free(o->memblockq);
if (o->asyncmsgq) if (o->inq)
pa_asyncmsgq_unref(o->asyncmsgq); pa_asyncmsgq_unref(o->inq);
if (o->outq)
pa_asyncmsgq_unref(o->outq);
pa_xfree(o); pa_xfree(o);
} }
@ -947,7 +1012,6 @@ int pa__init(pa_module*m) {
u->thread_info.master = u->master = NULL; u->thread_info.master = u->master = NULL;
u->time_event = NULL; u->time_event = NULL;
u->adjust_time = DEFAULT_ADJUST_TIME; u->adjust_time = DEFAULT_ADJUST_TIME;
u->mutex = pa_mutex_new(FALSE, TRUE);
pa_thread_mq_init(&u->thread_mq, m->core->mainloop); pa_thread_mq_init(&u->thread_mq, m->core->mainloop);
u->rtpoll = NULL; u->rtpoll = NULL;
u->thread = NULL; u->thread = NULL;
@ -1134,14 +1198,20 @@ static void output_free(struct output *o) {
pa_sink_input_unref(o->sink_input); pa_sink_input_unref(o->sink_input);
} }
if (o->rtpoll_item) if (o->inq_rtpoll_item)
pa_rtpoll_item_free(o->rtpoll_item); pa_rtpoll_item_free(o->inq_rtpoll_item);
if (o->outq_rtpoll_item)
pa_rtpoll_item_free(o->outq_rtpoll_item);
if (o->inq)
pa_asyncmsgq_unref(o->inq);
if (o->outq)
pa_asyncmsgq_unref(o->outq);
if (o->memblockq) if (o->memblockq)
pa_memblockq_free(o->memblockq); pa_memblockq_free(o->memblockq);
if (o->asyncmsgq)
pa_asyncmsgq_unref(o->asyncmsgq);
pa_xfree(o); pa_xfree(o);
} }
@ -1190,8 +1260,6 @@ void pa__done(pa_module*m) {
if (u->time_event) if (u->time_event)
u->core->mainloop->time_free(u->time_event); u->core->mainloop->time_free(u->time_event);
pa_mutex_free(u->mutex);
pa_xfree(u); pa_xfree(u);
} }