mirror of
https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
synced 2025-11-11 13:30:02 -05:00
simplify rt loops a bit by moving more code into pa_rtpoll. It is now possible to attach "work" functions to a pa_rtpoll_item, which will be called in each loop iteration. This allows us to hide the message processing in the RT loops and to drop the seperate sink_input->process hooks. Basically, only the driver-specific code remains in the RT loops.
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1822 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
parent
f0b9dce32e
commit
3396b65f15
19 changed files with 182 additions and 272 deletions
|
|
@ -630,24 +630,13 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Now give the sink inputs some to time to process their data */
|
||||
if ((ret = pa_sink_process_inputs(u->sink)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
|
||||
/* Tell ALSA about this and process its response */
|
||||
if (PA_SINK_OPENED(u->sink->thread_info.state)) {
|
||||
struct pollfd *pollfd;
|
||||
|
|
@ -676,8 +665,8 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
|
|||
|
|
@ -612,24 +612,13 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Now give the source outputs some to time to process their data */
|
||||
if ((ret = pa_source_process_outputs(u->source)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
|
||||
/* Tell ALSA about this and process its response */
|
||||
if (PA_SOURCE_OPENED(u->source->thread_info.state)) {
|
||||
struct pollfd *pollfd;
|
||||
|
|
@ -658,8 +647,8 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
|
|||
|
|
@ -139,6 +139,10 @@ enum {
|
|||
SINK_MESSAGE_REMOVE_OUTPUT
|
||||
};
|
||||
|
||||
enum {
|
||||
SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX
|
||||
};
|
||||
|
||||
static void output_free(struct output *o);
|
||||
static int output_create_sink_input(struct userdata *u, struct output *o);
|
||||
static int update_master(struct userdata *u, struct output *o);
|
||||
|
|
@ -255,28 +259,17 @@ static void thread_func(void *userdata) {
|
|||
} else
|
||||
pa_rtpoll_set_timer_disabled(u->rtpoll);
|
||||
|
||||
/* Now give the sink inputs some to time to process their data */
|
||||
if ((ret = pa_sink_process_inputs(u->sink)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
@ -294,10 +287,8 @@ static void request_memblock(struct output *o) {
|
|||
/* If another thread already prepared some data we received
|
||||
* the data over the asyncmsgq, hence let's first process
|
||||
* it. */
|
||||
while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
|
||||
pa_memblockq_push_align(o->memblockq, &chunk);
|
||||
pa_asyncmsgq_done(o->asyncmsgq, 0);
|
||||
}
|
||||
while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
|
||||
;
|
||||
|
||||
/* Check whether we're now readable */
|
||||
if (pa_memblockq_is_readable(o->memblockq))
|
||||
|
|
@ -309,10 +300,8 @@ static void request_memblock(struct output *o) {
|
|||
if (PA_SINK_OPENED(o->userdata->sink->thread_info.state)) {
|
||||
|
||||
/* Maybe there's some data now? */
|
||||
while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
|
||||
pa_memblockq_push_align(o->memblockq, &chunk);
|
||||
pa_asyncmsgq_done(o->asyncmsgq, 0);
|
||||
}
|
||||
while (pa_asyncmsgq_process_one(o->asyncmsgq) > 0)
|
||||
;
|
||||
|
||||
/* Ok, now let's prepare some data if we really have to */
|
||||
while (!pa_memblockq_is_readable(o->memblockq)) {
|
||||
|
|
@ -324,7 +313,7 @@ static void request_memblock(struct output *o) {
|
|||
/* OK, let's send this data to the other threads */
|
||||
for (j = o->userdata->thread_info.outputs; j; j = j->next)
|
||||
if (j != o && j->sink_input)
|
||||
pa_asyncmsgq_post(j->asyncmsgq, NULL, 0, NULL, 0, &chunk, NULL);
|
||||
pa_asyncmsgq_post(j->asyncmsgq, PA_MSGOBJECT(j->sink_input), SINK_INPUT_MESSAGE_POST, NULL, 0, &chunk, NULL);
|
||||
|
||||
/* And push it into our own queue */
|
||||
pa_memblockq_push_align(o->memblockq, &chunk);
|
||||
|
|
@ -361,37 +350,6 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) {
|
|||
pa_memblockq_drop(o->memblockq, length);
|
||||
}
|
||||
|
||||
/* Called from I/O thread context */
|
||||
static int sink_input_process_cb(pa_sink_input *i) {
|
||||
struct output *o;
|
||||
pa_memchunk chunk;
|
||||
int r = 0;
|
||||
|
||||
pa_sink_input_assert_ref(i);
|
||||
o = i->userdata;
|
||||
pa_assert(o);
|
||||
|
||||
/* Move all data in the asyncmsgq into our memblockq */
|
||||
|
||||
while (pa_asyncmsgq_get(o->asyncmsgq, NULL, NULL, NULL, NULL, &chunk, 0) == 0) {
|
||||
if (PA_SINK_OPENED(i->sink->thread_info.state))
|
||||
pa_memblockq_push_align(o->memblockq, &chunk);
|
||||
pa_asyncmsgq_done(o->asyncmsgq, 0);
|
||||
}
|
||||
|
||||
/* If the sink is suspended, flush our queue */
|
||||
if (!PA_SINK_OPENED(i->sink->thread_info.state))
|
||||
pa_memblockq_flush(o->memblockq);
|
||||
|
||||
if (o == o->userdata->thread_info.master) {
|
||||
pa_mutex_lock(o->userdata->mutex);
|
||||
r = pa_sink_process_inputs(o->userdata->sink);
|
||||
pa_mutex_unlock(o->userdata->mutex);
|
||||
}
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
/* Called from I/O thread context */
|
||||
static void sink_input_attach_cb(pa_sink_input *i) {
|
||||
struct output *o;
|
||||
|
|
@ -401,7 +359,10 @@ static void sink_input_attach_cb(pa_sink_input *i) {
|
|||
pa_assert(o);
|
||||
|
||||
pa_assert(!o->rtpoll_item);
|
||||
o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(i->sink->rtpoll, PA_RTPOLL_NORMAL, o->asyncmsgq);
|
||||
o->rtpoll_item = pa_rtpoll_item_new_asyncmsgq(
|
||||
i->sink->rtpoll,
|
||||
PA_RTPOLL_NORMAL, /* This one has a lower priority than the normal message handling */
|
||||
o->asyncmsgq);
|
||||
}
|
||||
|
||||
/* Called from I/O thread context */
|
||||
|
|
@ -448,6 +409,15 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64
|
|||
break;
|
||||
}
|
||||
|
||||
case SINK_INPUT_MESSAGE_POST: {
|
||||
|
||||
if (PA_SINK_OPENED(o->sink_input->sink->thread_info.state))
|
||||
pa_memblockq_push_align(o->memblockq, chunk);
|
||||
else
|
||||
pa_memblockq_flush(o->memblockq);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return pa_sink_input_process_msg(obj, code, data, offset, chunk);
|
||||
|
|
@ -784,7 +754,6 @@ static int output_create_sink_input(struct userdata *u, struct output *o) {
|
|||
o->sink_input->parent.process_msg = sink_input_process_msg;
|
||||
o->sink_input->peek = sink_input_peek_cb;
|
||||
o->sink_input->drop = sink_input_drop_cb;
|
||||
o->sink_input->process = sink_input_process_cb;
|
||||
o->sink_input->attach = sink_input_attach_cb;
|
||||
o->sink_input->detach = sink_input_detach_cb;
|
||||
o->sink_input->kill = sink_input_kill_cb;
|
||||
|
|
|
|||
|
|
@ -145,28 +145,17 @@ static void thread_func(void *userdata) {
|
|||
} else
|
||||
pa_rtpoll_set_timer_disabled(u->rtpoll);
|
||||
|
||||
/* Now give the sink inputs some to time to process their data */
|
||||
if ((ret = pa_sink_process_inputs(u->sink)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
|
|||
|
|
@ -999,28 +999,6 @@ static void thread_func(void *userdata) {
|
|||
|
||||
/* pa_log("loop2"); */
|
||||
|
||||
/* Now give the sink inputs some to time to process their data */
|
||||
if (u->sink) {
|
||||
if ((ret = pa_sink_process_inputs(u->sink)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Now give the source outputs some to time to process their data */
|
||||
if (u->source) {
|
||||
if ((ret = pa_source_process_outputs(u->source)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
if (u->fd >= 0) {
|
||||
struct pollfd *pollfd;
|
||||
|
||||
|
|
@ -1031,11 +1009,12 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
|
||||
if (u->fd >= 0) {
|
||||
struct pollfd *pollfd;
|
||||
|
||||
|
|
@ -1052,8 +1031,8 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
|
|||
|
|
@ -126,8 +126,8 @@ static void thread_func(void *userdata) {
|
|||
pa_rtpoll_install(u->rtpoll);
|
||||
|
||||
for (;;) {
|
||||
int ret;
|
||||
struct pollfd *pollfd;
|
||||
int ret;
|
||||
|
||||
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
|
||||
|
||||
|
|
@ -170,36 +170,26 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Now give the sink inputs some to time to process their data */
|
||||
if ((ret = pa_sink_process_inputs(u->sink)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
pollfd->events = u->sink->thread_info.state == PA_SINK_RUNNING ? POLLOUT : 0;
|
||||
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
|
||||
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
|
||||
|
||||
if (pollfd->revents & ~POLLOUT) {
|
||||
pa_log("FIFO shutdown.");
|
||||
goto fail;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
|
|||
|
|
@ -149,26 +149,15 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Now give the source outputs some to time to process their data */
|
||||
if ((ret = pa_source_process_outputs(u->source)) < 0)
|
||||
goto fail;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
if ((ret = pa_thread_mq_process(&u->thread_mq) < 0))
|
||||
goto finish;
|
||||
if (ret > 0)
|
||||
continue;
|
||||
|
||||
/* Hmm, nothing to do. Let's sleep */
|
||||
pollfd->events = u->source->thread_info.state == PA_SOURCE_RUNNING ? POLLIN : 0;
|
||||
|
||||
if (pa_rtpoll_run(u->rtpoll, 1) < 0) {
|
||||
pa_log("poll() failed: %s", pa_cstrerror(errno));
|
||||
if ((ret = pa_rtpoll_run(u->rtpoll, 1)) < 0)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (ret == 0)
|
||||
goto finish;
|
||||
|
||||
pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL);
|
||||
if (pollfd->revents & ~POLLIN) {
|
||||
pa_log("FIFO shutdown.");
|
||||
|
|
@ -177,8 +166,8 @@ static void thread_func(void *userdata) {
|
|||
}
|
||||
|
||||
fail:
|
||||
/* We have to continue processing messages until we receive the
|
||||
* SHUTDOWN message */
|
||||
/* If this was no regular exit from the loop we have to continue
|
||||
* processing messages until we received PA_MESSAGE_SHUTDOWN */
|
||||
pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
|
||||
pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue