pulsecore: Reference count mempools

In future commits, server-wide SHMs will be replaced with per-client
ones that will be dynamically created and freed according to clients
connections open and close.

Meanwhile, current PA design does not guarantee that the per-client
mempool blocks are referenced only by client-specific objects.

Thus reference count the pools and let each memblock inside the pool
itself, or just attached to it, increment the pool's refcount upon
allocation. This way, per-client mempools will only be freed when no
further component in the system holds any references to its blocks.

DiscussionLink: https://goo.gl/qesVMV
Suggested-by: Tanu Kaskinen <tanuk@iki.fi>
Suggested-by: David Henningsson <david.henningsson@canonical.com>
Signed-off-by: Ahmed S. Darwish <darwish.07@gmail.com>
This commit is contained in:
Ahmed S. Darwish 2016-03-13 00:51:12 +02:00 committed by David Henningsson
parent 1f5e72264e
commit 9bda6e344a
17 changed files with 84 additions and 20 deletions

View file

@ -218,8 +218,8 @@ static void core_free(pa_object *o) {
pa_silence_cache_done(&c->silence_cache);
if (c->rw_mempool)
pa_mempool_free(c->rw_mempool);
pa_mempool_free(c->mempool);
pa_mempool_unref(c->rw_mempool);
pa_mempool_unref(c->mempool);
for (j = 0; j < PA_CORE_HOOK_MAX; j++)
pa_hook_done(&c->hooks[j]);

View file

@ -110,6 +110,7 @@ static void process_block(pa_lfe_filter_t *f, pa_memchunk *buf, bool store_resul
}
pa_memchunk * pa_lfe_filter_process(pa_lfe_filter_t *f, pa_memchunk *buf) {
pa_mempool *pool;
struct saved_state *s, *s2;
void *data;
@ -129,10 +130,12 @@ pa_memchunk * pa_lfe_filter_process(pa_lfe_filter_t *f, pa_memchunk *buf) {
/* TODO: This actually memcpys the entire chunk into a new allocation, because we need to retain the original
in case of rewinding. Investigate whether this can be avoided. */
data = pa_memblock_acquire_chunk(buf);
s->chunk.memblock = pa_memblock_new_malloced(pa_memblock_get_pool(buf->memblock), pa_xmemdup(data, buf->length), buf->length);
pool = pa_memblock_get_pool(buf->memblock);
s->chunk.memblock = pa_memblock_new_malloced(pool, pa_xmemdup(data, buf->length), buf->length);
s->chunk.length = buf->length;
s->chunk.index = 0;
pa_memblock_release(buf->memblock);
pa_mempool_unref(pool), pool = NULL;
s->index = f->index;
memcpy(s->lr4, f->lr4, sizeof(struct lr4) * f->cm.channels);

View file

@ -142,6 +142,23 @@ struct pa_memexport {
};
struct pa_mempool {
/* Reference count the mempool
*
* Any block allocation from the pool itself, or even just imported from
* another process through SHM and attached to it (PA_MEMBLOCK_IMPORTED),
* shall increase the refcount.
*
* This is done for per-client mempools: global references to blocks in
* the pool, or just to attached ones, can still be lingering around when
* the client connection dies and all per-client objects are to be freed.
* That is, current PulseAudio design does not guarantee that the client
* mempool blocks are referenced only by client-specific objects.
*
* For further details, please check:
* https://lists.freedesktop.org/archives/pulseaudio-discuss/2016-February/025587.html
*/
PA_REFCNT_DECLARE;
pa_semaphore *semaphore;
pa_mutex *mutex;
@ -237,6 +254,7 @@ static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
b = pa_xmalloc(PA_ALIGN(sizeof(pa_memblock)) + length);
PA_REFCNT_INIT(b);
b->pool = p;
pa_mempool_ref(b->pool);
b->type = PA_MEMBLOCK_APPENDED;
b->read_only = b->is_silence = false;
pa_atomic_ptr_store(&b->data, (uint8_t*) b + PA_ALIGN(sizeof(pa_memblock)));
@ -367,6 +385,7 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
PA_REFCNT_INIT(b);
b->pool = p;
pa_mempool_ref(b->pool);
b->read_only = b->is_silence = false;
b->length = length;
pa_atomic_store(&b->n_acquired, 0);
@ -390,6 +409,7 @@ pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, bool r
PA_REFCNT_INIT(b);
b->pool = p;
pa_mempool_ref(b->pool);
b->type = PA_MEMBLOCK_FIXED;
b->read_only = read_only;
b->is_silence = false;
@ -423,6 +443,7 @@ pa_memblock *pa_memblock_new_user(
PA_REFCNT_INIT(b);
b->pool = p;
pa_mempool_ref(b->pool);
b->type = PA_MEMBLOCK_USER;
b->read_only = read_only;
b->is_silence = false;
@ -518,10 +539,13 @@ size_t pa_memblock_get_length(pa_memblock *b) {
return b->length;
}
/* Note! Always unref the returned pool after use */
pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
pa_assert(b);
pa_assert(PA_REFCNT_VALUE(b) > 0);
pa_assert(b->pool);
pa_mempool_ref(b->pool);
return b->pool;
}
@ -535,10 +559,13 @@ pa_memblock* pa_memblock_ref(pa_memblock*b) {
}
static void memblock_free(pa_memblock *b) {
pa_assert(b);
pa_mempool *pool;
pa_assert(b);
pa_assert(b->pool);
pa_assert(pa_atomic_load(&b->n_acquired) == 0);
pool = b->pool;
stat_remove(b);
switch (b->type) {
@ -620,6 +647,8 @@ static void memblock_free(pa_memblock *b) {
default:
pa_assert_not_reached();
}
pa_mempool_unref(pool);
}
/* No lock necessary */
@ -749,6 +778,7 @@ pa_mempool* pa_mempool_new(bool shared, size_t size) {
char t1[PA_BYTES_SNPRINT_MAX], t2[PA_BYTES_SNPRINT_MAX];
p = pa_xnew0(pa_mempool, 1);
PA_REFCNT_INIT(p);
p->block_size = PA_PAGE_ALIGN(PA_MEMPOOL_SLOT_SIZE);
if (p->block_size < PA_PAGE_SIZE)
@ -788,7 +818,7 @@ pa_mempool* pa_mempool_new(bool shared, size_t size) {
return p;
}
void pa_mempool_free(pa_mempool *p) {
static void mempool_free(pa_mempool *p) {
pa_assert(p);
pa_mutex_lock(p->mutex);
@ -911,6 +941,22 @@ bool pa_mempool_is_shared(pa_mempool *p) {
return p->memory.shared;
}
pa_mempool* pa_mempool_ref(pa_mempool *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
PA_REFCNT_INC(p);
return p;
}
void pa_mempool_unref(pa_mempool *p) {
pa_assert(p);
pa_assert(PA_REFCNT_VALUE(p) > 0);
if (PA_REFCNT_DEC(p) <= 0)
mempool_free(p);
}
/* For receiving blocks from other nodes */
pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata) {
pa_memimport *i;
@ -921,6 +967,7 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
i = pa_xnew(pa_memimport, 1);
i->mutex = pa_mutex_new(true, true);
i->pool = p;
pa_mempool_ref(i->pool);
i->segments = pa_hashmap_new(NULL, NULL);
i->blocks = pa_hashmap_new(NULL, NULL);
i->release_cb = cb;
@ -996,6 +1043,7 @@ void pa_memimport_free(pa_memimport *i) {
pa_mutex_unlock(i->pool->mutex);
pa_mempool_unref(i->pool);
pa_hashmap_free(i->blocks);
pa_hashmap_free(i->segments);
@ -1039,6 +1087,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
PA_REFCNT_INIT(b);
b->pool = i->pool;
pa_mempool_ref(b->pool);
b->type = PA_MEMBLOCK_IMPORTED;
b->read_only = !writable;
b->is_silence = false;
@ -1096,6 +1145,7 @@ pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void
e = pa_xnew(pa_memexport, 1);
e->mutex = pa_mutex_new(true, true);
e->pool = p;
pa_mempool_ref(e->pool);
PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
e->n_init = 0;
@ -1123,6 +1173,7 @@ void pa_memexport_free(pa_memexport *e) {
PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
pa_mutex_unlock(e->pool->mutex);
pa_mempool_unref(e->pool);
pa_mutex_free(e->mutex);
pa_xfree(e);
}

View file

@ -116,13 +116,16 @@ void *pa_memblock_acquire_chunk(const pa_memchunk *c);
void pa_memblock_release(pa_memblock *b);
size_t pa_memblock_get_length(pa_memblock *b);
/* Note! Always unref the returned pool after use */
pa_mempool * pa_memblock_get_pool(pa_memblock *b);
pa_memblock *pa_memblock_will_need(pa_memblock *b);
/* The memory block manager */
pa_mempool* pa_mempool_new(bool shared, size_t size);
void pa_mempool_free(pa_mempool *p);
void pa_mempool_unref(pa_mempool *p);
pa_mempool* pa_mempool_ref(pa_mempool *p);
const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
void pa_mempool_vacuum(pa_mempool *p);
int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);

View file

@ -530,6 +530,7 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
}
int pa_memblockq_peek_fixed_size(pa_memblockq *bq, size_t block_size, pa_memchunk *chunk) {
pa_mempool *pool;
pa_memchunk tchunk, rchunk;
int64_t ri;
struct list_item *item;
@ -548,9 +549,11 @@ int pa_memblockq_peek_fixed_size(pa_memblockq *bq, size_t block_size, pa_memchun
return 0;
}
rchunk.memblock = pa_memblock_new(pa_memblock_get_pool(tchunk.memblock), block_size);
pool = pa_memblock_get_pool(tchunk.memblock);
rchunk.memblock = pa_memblock_new(pool, block_size);
rchunk.index = 0;
rchunk.length = tchunk.length;
pa_mempool_unref(pool), pool = NULL;
pa_memchunk_memcpy(&rchunk, &tchunk);
pa_memblock_unref(tchunk.memblock);

View file

@ -32,6 +32,7 @@
#include "memchunk.h"
pa_memchunk* pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
pa_mempool *pool;
pa_memblock *n;
size_t l;
void *tdata, *sdata;
@ -46,7 +47,9 @@ pa_memchunk* pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
l = PA_MAX(c->length, min);
n = pa_memblock_new(pa_memblock_get_pool(c->memblock), l);
pool = pa_memblock_get_pool(c->memblock);
n = pa_memblock_new(pool, l);
pa_mempool_unref(pool), pool = NULL;
sdata = pa_memblock_acquire(c->memblock);
tdata = pa_memblock_acquire(n);

View file

@ -573,6 +573,7 @@ static void prepare_next_write_item(pa_pstream *p) {
if (current_export != p->export)
pa_memexport_free(current_export);
pa_mempool_unref(current_pool);
}
if (send_payload) {