memblock, pstream: Allow send/receive of remote writable memblocks

The shared ringbuffer memblock must be writable by both sides.
This makes it possible to send such a memblock over a pstream without
the "both sides writable" information getting lost.

Signed-off-by: David Henningsson <david.henningsson@canonical.com>
This commit is contained in:
David Henningsson 2014-04-25 15:30:41 +02:00
parent 613177919f
commit 710c4b39af
3 changed files with 62 additions and 16 deletions

View file

@ -97,6 +97,7 @@ struct pa_memimport_segment {
pa_shm memory;
pa_memtrap *trap;
unsigned n_blocks;
bool writable;
};
/* A collection of multiple segments */
@ -146,6 +147,7 @@ struct pa_mempool {
pa_shm memory;
size_t block_size;
unsigned n_blocks;
bool is_remote_writable;
pa_atomic_t n_init;
@ -302,6 +304,19 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
}
/* No lock necessary */
bool pa_mempool_is_remote_writable(pa_mempool *p) {
pa_assert(p);
return p->is_remote_writable;
}
/* No lock necessary */
void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable) {
pa_assert(p);
pa_assert(!writable || pa_mempool_is_shared(p));
p->is_remote_writable = writable;
}
/* No lock necessary */
pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
pa_memblock *b = NULL;
@ -415,6 +430,14 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, pa_free
return b;
}
/* No lock necessary */
bool pa_memblock_is_ours(pa_memblock *b) {
pa_assert(b);
pa_assert(PA_REFCNT_VALUE(b) > 0);
return b->type != PA_MEMBLOCK_IMPORTED;
}
/* No lock necessary */
bool pa_memblock_is_read_only(pa_memblock *b) {
pa_assert(b);
@ -905,7 +928,7 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
/* Should be called locked */
static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id, bool writable) {
pa_memimport_segment* seg;
if (pa_hashmap_size(i->segments) >= PA_MEMIMPORT_SEGMENTS_MAX)
@ -913,11 +936,12 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
seg = pa_xnew0(pa_memimport_segment, 1);
if (pa_shm_attach(&seg->memory, shm_id, false) < 0) {
if (pa_shm_attach(&seg->memory, shm_id, writable) < 0) {
pa_xfree(seg);
return NULL;
}
seg->writable = writable;
seg->import = i;
seg->trap = pa_memtrap_add(seg->memory.ptr, seg->memory.size);
@ -973,7 +997,8 @@ void pa_memimport_free(pa_memimport *i) {
}
/* Self-locked */
pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
size_t offset, size_t size, bool writable) {
pa_memblock *b = NULL;
pa_memimport_segment *seg;
@ -990,9 +1015,14 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
goto finish;
if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
if (!(seg = segment_attach(i, shm_id)))
if (!(seg = segment_attach(i, shm_id, writable)))
goto finish;
if (writable != seg->writable) {
pa_log("Cannot open segment - writable status changed!");
goto finish;
}
if (offset+size > seg->memory.size)
goto finish;
@ -1002,7 +1032,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
PA_REFCNT_INIT(b);
b->pool = i->pool;
b->type = PA_MEMBLOCK_IMPORTED;
b->read_only = true;
b->read_only = !writable;
b->is_silence = false;
pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
b->length = size;

View file

@ -104,6 +104,7 @@ function is not multiple caller safe, i.e. needs to be locked
manually if called from more than one thread at the same time. */
void pa_memblock_unref_fixed(pa_memblock*b);
bool pa_memblock_is_ours(pa_memblock *b);
bool pa_memblock_is_read_only(pa_memblock *b);
bool pa_memblock_is_silence(pa_memblock *b);
bool pa_memblock_ref_is_one(pa_memblock *b);
@ -125,12 +126,15 @@ const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p);
void pa_mempool_vacuum(pa_mempool *p);
int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id);
bool pa_mempool_is_shared(pa_mempool *p);
bool pa_mempool_is_remote_writable(pa_mempool *p);
void pa_mempool_set_is_remote_writable(pa_mempool *p, bool writable);
size_t pa_mempool_block_size_max(pa_mempool *p);
/* For receiving blocks from other nodes */
pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void *userdata);
void pa_memimport_free(pa_memimport *i);
pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size);
pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id,
size_t offset, size_t size, bool writable);
int pa_memimport_process_revoke(pa_memimport *i, uint32_t block_id);
/* For sending blocks to other nodes */

View file

@ -50,6 +50,7 @@
#define PA_FLAG_SHMREVOKE 0xC0000000LU
#define PA_FLAG_SHMMASK 0xFF000000LU
#define PA_FLAG_SEEKMASK 0x000000FFLU
#define PA_FLAG_SHMWRITABLE 0x00800000LU
/* The sequence descriptor header consists of 5 32bit integers: */
enum {
@ -504,10 +505,15 @@ static void prepare_next_write_item(pa_pstream *p) {
size_t offset, length;
uint32_t *shm_info = (uint32_t *) &p->write.minibuf[PA_PSTREAM_DESCRIPTOR_SIZE];
size_t shm_size = sizeof(uint32_t) * PA_PSTREAM_SHM_MAX;
pa_mempool *current_pool = pa_memblock_get_pool(p->write.current->chunk.memblock);
pa_memexport *current_export;
pa_assert(p->export);
if (p->mempool == current_pool)
pa_assert_se(current_export = p->export);
else
pa_assert_se(current_export = pa_memexport_new(current_pool, memexport_revoke_cb, p));
if (pa_memexport_put(p->export,
if (pa_memexport_put(current_export,
p->write.current->chunk.memblock,
&block_id,
&shm_id,
@ -515,6 +521,8 @@ static void prepare_next_write_item(pa_pstream *p) {
&length) >= 0) {
flags |= PA_FLAG_SHMDATA;
if (pa_mempool_is_remote_writable(current_pool))
flags |= PA_FLAG_SHMWRITABLE;
send_payload = false;
shm_info[PA_PSTREAM_SHM_BLOCKID] = htonl(block_id);
@ -527,6 +535,9 @@ static void prepare_next_write_item(pa_pstream *p) {
}
/* else */
/* pa_log_warn("Failed to export memory block."); */
if (current_export != p->export)
pa_memexport_free(current_export);
}
if (send_payload) {
@ -824,8 +835,8 @@ static int do_read(pa_pstream *p) {
pa_packet_unref(p->read.packet);
} else {
pa_memblock *b;
pa_assert((ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]) & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
uint32_t flags = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS]);
pa_assert((flags & PA_FLAG_SHMMASK) == PA_FLAG_SHMDATA);
pa_assert(p->import);
@ -833,7 +844,8 @@ static int do_read(pa_pstream *p) {
ntohl(p->read.shm_info[PA_PSTREAM_SHM_BLOCKID]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_SHMID]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_INDEX]),
ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH])))) {
ntohl(p->read.shm_info[PA_PSTREAM_SHM_LENGTH]),
!!(flags & PA_FLAG_SHMWRITABLE)))) {
if (pa_log_ratelimit(PA_LOG_DEBUG))
pa_log_debug("Failed to import memory block.");