mirror of
https://gitlab.freedesktop.org/pulseaudio/pulseaudio.git
synced 2025-11-06 13:29:56 -05:00
add proper refcounting to pa_asyncmsgq objects, to allow destruction from the dispatched callbacks
git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/lennart@1635 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
parent
f7b707b954
commit
c1c59b4675
4 changed files with 44 additions and 19 deletions
|
|
@ -53,6 +53,7 @@ struct asyncmsgq_item {
|
|||
};
|
||||
|
||||
struct pa_asyncmsgq {
|
||||
PA_REFCNT_DECLARE;
|
||||
pa_asyncq *asyncq;
|
||||
pa_mutex *mutex; /* only for the writer side */
|
||||
|
||||
|
|
@ -64,6 +65,7 @@ pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
|
|||
|
||||
a = pa_xnew(pa_asyncmsgq, 1);
|
||||
|
||||
PA_REFCNT_INIT(a);
|
||||
pa_assert_se(a->asyncq = pa_asyncq_new(size));
|
||||
pa_assert_se(a->mutex = pa_mutex_new(0));
|
||||
a->current = NULL;
|
||||
|
|
@ -71,7 +73,7 @@ pa_asyncmsgq *pa_asyncmsgq_new(unsigned size) {
|
|||
return a;
|
||||
}
|
||||
|
||||
void pa_asyncmsgq_free(pa_asyncmsgq *a) {
|
||||
static void asyncmsgq_free(pa_asyncmsgq *a) {
|
||||
struct asyncmsgq_item *i;
|
||||
pa_assert(a);
|
||||
|
||||
|
|
@ -97,9 +99,23 @@ void pa_asyncmsgq_free(pa_asyncmsgq *a) {
|
|||
pa_xfree(a);
|
||||
}
|
||||
|
||||
pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q) {
|
||||
pa_assert(PA_REFCNT_VALUE(q) > 0);
|
||||
|
||||
PA_REFCNT_INC(q);
|
||||
return q;
|
||||
}
|
||||
|
||||
void pa_asyncmsgq_unref(pa_asyncmsgq* q) {
|
||||
pa_assert(PA_REFCNT_VALUE(q) > 0);
|
||||
|
||||
if (PA_REFCNT_DEC(q) <= 0)
|
||||
asyncmsgq_free(q);
|
||||
}
|
||||
|
||||
void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk, pa_free_cb_t free_cb) {
|
||||
struct asyncmsgq_item *i;
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
|
||||
if (!(i = pa_flist_pop(PA_STATIC_FLIST_GET(asyncmsgq))))
|
||||
i = pa_xnew(struct asyncmsgq_item, 1);
|
||||
|
|
@ -125,7 +141,7 @@ void pa_asyncmsgq_post(pa_asyncmsgq *a, pa_msgobject *object, int code, const vo
|
|||
|
||||
int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *chunk) {
|
||||
struct asyncmsgq_item i;
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
|
||||
i.code = code;
|
||||
i.object = object;
|
||||
|
|
@ -152,7 +168,7 @@ int pa_asyncmsgq_send(pa_asyncmsgq *a, pa_msgobject *object, int code, const voi
|
|||
}
|
||||
|
||||
int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **userdata, int64_t *offset, pa_memchunk *chunk, int wait) {
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
pa_assert(code);
|
||||
pa_assert(!a->current);
|
||||
|
||||
|
|
@ -181,6 +197,7 @@ int pa_asyncmsgq_get(pa_asyncmsgq *a, pa_msgobject **object, int *code, void **u
|
|||
}
|
||||
|
||||
void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
pa_assert(a);
|
||||
pa_assert(a->current);
|
||||
|
||||
|
|
@ -207,12 +224,14 @@ void pa_asyncmsgq_done(pa_asyncmsgq *a, int ret) {
|
|||
|
||||
int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
|
||||
int c;
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
|
||||
pa_asyncmsgq_ref(a);
|
||||
|
||||
do {
|
||||
pa_msgobject *o;
|
||||
void *data;
|
||||
int64_t offset;
|
||||
int64_t offset;
|
||||
pa_memchunk chunk;
|
||||
int ret;
|
||||
|
||||
|
|
@ -224,23 +243,25 @@ int pa_asyncmsgq_wait_for(pa_asyncmsgq *a, int code) {
|
|||
|
||||
} while (c != code);
|
||||
|
||||
pa_asyncmsgq_unref(a);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int pa_asyncmsgq_get_fd(pa_asyncmsgq *a) {
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
|
||||
return pa_asyncq_get_fd(a->asyncq);
|
||||
}
|
||||
|
||||
int pa_asyncmsgq_before_poll(pa_asyncmsgq *a) {
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
|
||||
return pa_asyncq_before_poll(a->asyncq);
|
||||
}
|
||||
|
||||
void pa_asyncmsgq_after_poll(pa_asyncmsgq *a) {
|
||||
pa_assert(a);
|
||||
pa_assert(PA_REFCNT_VALUE(a) > 0);
|
||||
|
||||
pa_asyncq_after_poll(a->asyncq);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,13 +49,14 @@
|
|||
* latter waits for completion, synchronously. */
|
||||
|
||||
enum {
|
||||
PA_MESSAGE_SHUTDOWN /* A generic message to inform the handler of this queue to quit */
|
||||
PA_MESSAGE_SHUTDOWN = -1/* A generic message to inform the handler of this queue to quit */
|
||||
};
|
||||
|
||||
typedef struct pa_asyncmsgq pa_asyncmsgq;
|
||||
|
||||
pa_asyncmsgq* pa_asyncmsgq_new(size_t size);
|
||||
void pa_asyncmsgq_free(pa_asyncmsgq* q);
|
||||
pa_asyncmsgq* pa_asyncmsgq_ref(pa_asyncmsgq *q);
|
||||
void pa_asyncmsgq_unref(pa_asyncmsgq* q);
|
||||
|
||||
void pa_asyncmsgq_post(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk, pa_free_cb_t userdata_free_cb);
|
||||
int pa_asyncmsgq_send(pa_asyncmsgq *q, pa_msgobject *object, int code, const void *userdata, int64_t offset, const pa_memchunk *memchunk);
|
||||
|
|
|
|||
|
|
@ -46,11 +46,13 @@ static pa_tls *tls;
|
|||
|
||||
static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) {
|
||||
pa_thread_mq *q = userdata;
|
||||
pa_asyncmsgq *aq;
|
||||
|
||||
pa_assert(pa_asyncmsgq_get_fd(q->outq) == fd);
|
||||
pa_assert(events == PA_IO_EVENT_INPUT);
|
||||
|
||||
pa_asyncmsgq_after_poll(q->outq);
|
||||
pa_asyncmsgq_ref(aq = q->outq);
|
||||
pa_asyncmsgq_after_poll(aq);
|
||||
|
||||
for (;;) {
|
||||
pa_msgobject *object;
|
||||
|
|
@ -60,16 +62,18 @@ static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_even
|
|||
pa_memchunk chunk;
|
||||
|
||||
/* Check whether there is a message for us to process */
|
||||
while (pa_asyncmsgq_get(q->outq, &object, &code, &data, &offset, &chunk, 0) == 0) {
|
||||
while (pa_asyncmsgq_get(aq, &object, &code, &data, &offset, &chunk, 0) == 0) {
|
||||
int ret;
|
||||
|
||||
ret = pa_asyncmsgq_dispatch(object, code, data, offset, &chunk);
|
||||
pa_asyncmsgq_done(q->outq, ret);
|
||||
pa_asyncmsgq_done(aq, ret);
|
||||
}
|
||||
|
||||
if (pa_asyncmsgq_before_poll(q->outq) == 0)
|
||||
if (pa_asyncmsgq_before_poll(aq) == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
pa_asyncmsgq_unref(aq);
|
||||
}
|
||||
|
||||
void pa_thread_mq_init(pa_thread_mq *q, pa_mainloop_api *mainloop) {
|
||||
|
|
@ -90,9 +94,8 @@ void pa_thread_mq_done(pa_thread_mq *q) {
|
|||
q->mainloop->io_free(q->io_event);
|
||||
q->io_event = NULL;
|
||||
|
||||
pa_asyncmsgq_after_poll(q->outq);
|
||||
pa_asyncmsgq_free(q->inq);
|
||||
pa_asyncmsgq_free(q->outq);
|
||||
pa_asyncmsgq_unref(q->inq);
|
||||
pa_asyncmsgq_unref(q->outq);
|
||||
q->inq = q->outq = NULL;
|
||||
|
||||
q->mainloop = NULL;
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
pa_thread_free(t);
|
||||
|
||||
pa_asyncmsgq_free(q);
|
||||
pa_asyncmsgq_unref(q);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue