Revert r1404 and keep it on a development branch until it is fully tested.

git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@1409 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Pierre Ossman 2006-11-06 13:06:01 +00:00
parent 6ca819354c
commit 8dc6214276
36 changed files with 499 additions and 990 deletions

View file

@ -268,7 +268,8 @@ thread_test_CFLAGS = $(AM_CFLAGS)
thread_test_LDADD = $(AM_LDADD) libpulsecore.la
thread_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
flist_test_SOURCES = tests/flist-test.c
flist_test_SOURCES = tests/flist-test.c \
pulsecore/flist.c pulsecore/flist.h
flist_test_CFLAGS = $(AM_CFLAGS)
flist_test_LDADD = $(AM_LDADD) libpulsecore.la
flist_test_LDFLAGS = $(AM_LDFLAGS) $(BINLDFLAGS)
@ -447,8 +448,6 @@ libpulse_la_SOURCES += \
pulsecore/core-error.c pulsecore/core-error.h \
pulsecore/winsock.h pulsecore/creds.h \
pulsecore/shm.c pulsecore/shm.h \
pulsecore/flist.c pulsecore/flist.h \
pulsecore/anotify.c pulsecore/anotify.h \
$(PA_THREAD_OBJS)
if OS_IS_WIN32
@ -629,8 +628,6 @@ libpulsecore_la_SOURCES += \
pulsecore/core-error.c pulsecore/core-error.h \
pulsecore/hook-list.c pulsecore/hook-list.h \
pulsecore/shm.c pulsecore/shm.h \
pulsecore/flist.c pulsecore/flist.h \
pulsecore/anotify.c pulsecore/anotify.h \
$(PA_THREAD_OBJS)
if OS_IS_WIN32

View file

@ -144,7 +144,6 @@ static void do_write(struct userdata *u) {
update_usage(u);
for (;;) {
void *p;
pa_memchunk *memchunk = NULL;
snd_pcm_sframes_t frames;
@ -157,15 +156,9 @@ static void do_write(struct userdata *u) {
memchunk = &u->memchunk;
}
assert(memchunk->memblock);
assert(memchunk->length);
assert((memchunk->length % u->frame_size) == 0);
assert(memchunk->memblock && memchunk->memblock->data && memchunk->length && memchunk->memblock->length && (memchunk->length % u->frame_size) == 0);
p = pa_memblock_acquire(memchunk->memblock);
if ((frames = snd_pcm_writei(u->pcm_handle, (uint8_t*) p + memchunk->index, memchunk->length / u->frame_size)) < 0) {
pa_memblock_release(memchunk->memblock);
if ((frames = snd_pcm_writei(u->pcm_handle, (uint8_t*) memchunk->memblock->data + memchunk->index, memchunk->length / u->frame_size)) < 0) {
if (frames == -EAGAIN)
return;
@ -183,9 +176,6 @@ static void do_write(struct userdata *u) {
return;
}
pa_memblock_release(memchunk->memblock);
if (memchunk == &u->memchunk) {
size_t l = frames * u->frame_size;
memchunk->index += l;

View file

@ -149,7 +149,6 @@ static void do_read(struct userdata *u) {
pa_memchunk post_memchunk;
snd_pcm_sframes_t frames;
size_t l;
void *p;
if (!u->memchunk.memblock) {
u->memchunk.memblock = pa_memblock_new(u->source->core->mempool, u->memchunk.length = u->fragment_size);
@ -158,13 +157,11 @@ static void do_read(struct userdata *u) {
assert(u->memchunk.memblock);
assert(u->memchunk.length);
assert(u->memchunk.memblock->data);
assert(u->memchunk.memblock->length);
assert(u->memchunk.length % u->frame_size == 0);
p = pa_memblock_acquire(u->memchunk.memblock);
if ((frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) p + u->memchunk.index, u->memchunk.length / u->frame_size)) < 0) {
pa_memblock_release(u->memchunk.memblock);
if ((frames = snd_pcm_readi(u->pcm_handle, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length / u->frame_size)) < 0) {
if (frames == -EAGAIN)
return;
@ -181,7 +178,6 @@ static void do_read(struct userdata *u) {
pa_module_unload_request(u->module);
return;
}
pa_memblock_release(u->memchunk.memblock);
l = frames * u->frame_size;

View file

@ -142,25 +142,18 @@ static int do_write(struct userdata *u) {
u->write_index = u->write_length = 0;
}
} else if (u->state == STATE_RUNNING) {
void *p;
pa_module_set_used(u->module, pa_sink_used_by(u->sink));
if (!u->memchunk.length)
if (pa_sink_render(u->sink, 8192, &u->memchunk) < 0)
return 0;
assert(u->memchunk.memblock);
assert(u->memchunk.length);
p = pa_memblock_acquire(u->memchunk.memblock);
assert(u->memchunk.memblock && u->memchunk.length);
if ((r = pa_iochannel_write(u->io, (uint8_t*) p + u->memchunk.index, u->memchunk.length)) < 0) {
pa_memblock_release(u->memchunk.memblock);
if ((r = pa_iochannel_write(u->io, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) {
pa_log("write() failed: %s", pa_cstrerror(errno));
return -1;
}
pa_memblock_release(u->memchunk.memblock);
u->memchunk.index += r;
u->memchunk.length -= r;

View file

@ -135,25 +135,22 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_
unsigned fs;
jack_nframes_t frame_idx;
pa_memchunk chunk;
void *p;
fs = pa_frame_size(&u->sink->sample_spec);
pa_sink_render_full(u->sink, u->frames_requested * fs, &chunk);
p = pa_memblock_acquire(chunk.memblock);
for (frame_idx = 0; frame_idx < u->frames_requested; frame_idx ++) {
unsigned c;
for (c = 0; c < u->channels; c++) {
float *s = ((float*) ((uint8_t*) p + chunk.index)) + (frame_idx * u->channels) + c;
float *s = ((float*) ((uint8_t*) chunk.memblock->data + chunk.index)) + (frame_idx * u->channels) + c;
float *d = ((float*) u->buffer[c]) + frame_idx;
*d = *s;
}
}
pa_memblock_release(chunk.memblock);
pa_memblock_unref(chunk.memblock);
u->frames_requested = 0;

View file

@ -134,28 +134,23 @@ static void io_event_cb(pa_mainloop_api *m, pa_io_event *e, int fd, pa_io_event_
unsigned fs;
jack_nframes_t frame_idx;
pa_memchunk chunk;
void *p;
fs = pa_frame_size(&u->source->sample_spec);
chunk.memblock = pa_memblock_new(u->core->mempool, chunk.length = u->frames_posted * fs);
chunk.index = 0;
p = pa_memblock_acquire(chunk.memblock);
for (frame_idx = 0; frame_idx < u->frames_posted; frame_idx ++) {
unsigned c;
for (c = 0; c < u->channels; c++) {
float *s = ((float*) u->buffer[c]) + frame_idx;
float *d = ((float*) ((uint8_t*) p + chunk.index)) + (frame_idx * u->channels) + c;
float *d = ((float*) ((uint8_t*) chunk.memblock->data + chunk.index)) + (frame_idx * u->channels) + c;
*d = *s;
}
}
pa_memblock_release(chunk.memblock);
pa_source_post(u->source, &chunk);
pa_memblock_unref(chunk.memblock);

View file

@ -170,7 +170,7 @@ static void out_fill_memblocks(struct userdata *u, unsigned n) {
u->out_fragment_size,
1);
assert(chunk.memblock);
chunk.length = pa_memblock_get_length(chunk.memblock);
chunk.length = chunk.memblock->length;
chunk.index = 0;
pa_sink_render_into_full(u->sink, &chunk);
@ -214,7 +214,7 @@ static void in_post_memblocks(struct userdata *u, unsigned n) {
if (!u->in_memblocks[u->in_current]) {
chunk.memblock = u->in_memblocks[u->in_current] = pa_memblock_new_fixed(u->core->mempool, (uint8_t*) u->in_mmap+u->in_fragment_size*u->in_current, u->in_fragment_size, 1);
chunk.length = pa_memblock_get_length(chunk.memblock);
chunk.length = chunk.memblock->length;
chunk.index = 0;
pa_source_post(u->source, &chunk);

View file

@ -155,7 +155,6 @@ static void do_write(struct userdata *u) {
}
do {
void *p;
memchunk = &u->memchunk;
if (!memchunk->length)
@ -163,18 +162,16 @@ static void do_write(struct userdata *u) {
memchunk = &u->silence;
assert(memchunk->memblock);
assert(memchunk->memblock->data);
assert(memchunk->length);
p = pa_memblock_acquire(memchunk->memblock);
if ((r = pa_iochannel_write(u->io, (uint8_t*) p + memchunk->index, memchunk->length)) < 0) {
pa_memblock_release(memchunk->memblock);
if ((r = pa_iochannel_write(u->io, (uint8_t*) memchunk->memblock->data + memchunk->index, memchunk->length)) < 0) {
pa_log("write() failed: %s", pa_cstrerror(errno));
clear_up(u);
pa_module_unload_request(u->module);
break;
}
pa_memblock_release(memchunk->memblock);
if (memchunk == &u->silence)
assert(r % u->sample_size == 0);
@ -220,13 +217,9 @@ static void do_read(struct userdata *u) {
}
do {
void *p;
memchunk.memblock = pa_memblock_new(u->core->mempool, l);
p = pa_memblock_acquire(memchunk.memblock);
if ((r = pa_iochannel_read(u->io, p, pa_memblock_get_length(memchunk.memblock))) < 0) {
pa_memblock_release(memchunk.memblock);
assert(memchunk.memblock);
if ((r = pa_iochannel_read(u->io, memchunk.memblock->data, memchunk.memblock->length)) < 0) {
pa_memblock_unref(memchunk.memblock);
if (errno != EAGAIN) {
pa_log("read() failed: %s", pa_cstrerror(errno));
@ -235,10 +228,9 @@ static void do_read(struct userdata *u) {
}
break;
}
pa_memblock_release(memchunk.memblock);
assert(r <= (ssize_t) pa_memblock_get_length(memchunk.memblock));
memchunk.length = r;
assert(r <= (ssize_t) memchunk.memblock->length);
memchunk.length = memchunk.memblock->length = r;
memchunk.index = 0;
pa_source_post(u->source, &memchunk);

View file

@ -84,8 +84,6 @@ static const char* const valid_modargs[] = {
static void do_write(struct userdata *u) {
ssize_t r;
void *p;
assert(u);
u->core->mainloop->defer_enable(u->defer_event, 0);
@ -99,17 +97,12 @@ static void do_write(struct userdata *u) {
if (pa_sink_render(u->sink, PIPE_BUF, &u->memchunk) < 0)
return;
assert(u->memchunk.memblock);
assert(u->memchunk.length);
p = pa_memblock_acquire(u->memchunk.memblock);
assert(u->memchunk.memblock && u->memchunk.length);
if ((r = pa_iochannel_write(u->io, (uint8_t*) p + u->memchunk.index, u->memchunk.length)) < 0) {
pa_memblock_release(u->memchunk.memblock);
if ((r = pa_iochannel_write(u->io, (uint8_t*) u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) {
pa_log("write(): %s", pa_cstrerror(errno));
return;
}
pa_memblock_release(u->memchunk.memblock);
u->memchunk.index += r;
u->memchunk.length -= r;

View file

@ -82,9 +82,7 @@ static const char* const valid_modargs[] = {
static void do_read(struct userdata *u) {
ssize_t r;
void *p;
pa_memchunk chunk;
assert(u);
if (!pa_iochannel_is_readable(u->io))
@ -97,22 +95,17 @@ static void do_read(struct userdata *u) {
u->chunk.index = chunk.length = 0;
}
assert(u->chunk.memblock);
assert(pa_memblock_get_length(u->chunk.memblock) > u->chunk.index);
p = pa_memblock_acquire(u->chunk.memblock);
if ((r = pa_iochannel_read(u->io, (uint8_t*) p + u->chunk.index, pa_memblock_get_length(u->chunk.memblock) - u->chunk.index)) <= 0) {
pa_memblock_release(u->chunk.memblock);
assert(u->chunk.memblock && u->chunk.memblock->length > u->chunk.index);
if ((r = pa_iochannel_read(u->io, (uint8_t*) u->chunk.memblock->data + u->chunk.index, u->chunk.memblock->length - u->chunk.index)) <= 0) {
pa_log("read(): %s", pa_cstrerror(errno));
return;
}
pa_memblock_release(u->chunk.memblock);
u->chunk.length = r;
pa_source_post(u->source, &u->chunk);
u->chunk.index += r;
if (u->chunk.index >= pa_memblock_get_length(u->chunk.memblock)) {
if (u->chunk.index >= u->chunk.memblock->length) {
u->chunk.index = u->chunk.length = 0;
pa_memblock_unref(u->chunk.memblock);
u->chunk.memblock = NULL;

View file

@ -63,7 +63,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
chunk->memblock = pa_memblock_ref(u->memblock);
chunk->index = u->peek_index;
chunk->length = pa_memblock_get_length(u->memblock) - u->peek_index;
chunk->length = u->memblock->length - u->peek_index;
return 0;
}
@ -72,12 +72,11 @@ static void sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t l
assert(i && chunk && length && i->userdata);
u = i->userdata;
assert(chunk->memblock == u->memblock);
assert(length <= pa_memblock_get_length(u->memblock)-u->peek_index);
assert(chunk->memblock == u->memblock && length <= u->memblock->length-u->peek_index);
u->peek_index += length;
if (u->peek_index >= pa_memblock_get_length(u->memblock))
if (u->peek_index >= u->memblock->length)
u->peek_index = 0;
}
@ -110,7 +109,6 @@ int pa__init(pa_core *c, pa_module*m) {
pa_sample_spec ss;
uint32_t frequency;
char t[256];
void *p;
pa_sink_input_new_data data;
if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
@ -142,10 +140,8 @@ int pa__init(pa_core *c, pa_module*m) {
}
u->memblock = pa_memblock_new(c->mempool, pa_bytes_per_second(&ss));
p = pa_memblock_acquire(u->memblock);
calc_sine(p, pa_memblock_get_length(u->memblock), frequency);
pa_memblock_release(u->memblock);
calc_sine(u->memblock->data, u->memblock->length, frequency);
snprintf(t, sizeof(t), "Sine Generator at %u Hz", frequency);
pa_sink_input_new_data_init(&data);

View file

@ -79,7 +79,7 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
size_t k = n + chunk.length > size ? size - n : chunk.length;
if (chunk.memblock) {
iov[iov_idx].iov_base = (void*)((uint8_t*) pa_memblock_acquire(chunk.memblock) + chunk.index);
iov[iov_idx].iov_base = (void*)((uint8_t*) chunk.memblock->data + chunk.index);
iov[iov_idx].iov_len = k;
mb[iov_idx] = chunk.memblock;
iov_idx ++;
@ -114,10 +114,8 @@ int pa_rtp_send(pa_rtp_context *c, size_t size, pa_memblockq *q) {
k = sendmsg(c->fd, &m, MSG_DONTWAIT);
for (i = 1; i < iov_idx; i++) {
pa_memblock_release(mb[i]);
for (i = 1; i < iov_idx; i++)
pa_memblock_unref(mb[i]);
}
c->sequence++;
} else
@ -174,7 +172,7 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
chunk->memblock = pa_memblock_new(pool, size);
iov.iov_base = pa_memblock_acquire(chunk->memblock);
iov.iov_base = chunk->memblock->data;
iov.iov_len = size;
m.msg_name = NULL;
@ -195,9 +193,9 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
goto fail;
}
memcpy(&header, iov.iov_base, sizeof(uint32_t));
memcpy(&c->timestamp, (uint8_t*) iov.iov_base + 4, sizeof(uint32_t));
memcpy(&c->ssrc, (uint8_t*) iov.iov_base + 8, sizeof(uint32_t));
memcpy(&header, chunk->memblock->data, sizeof(uint32_t));
memcpy(&c->timestamp, (uint8_t*) chunk->memblock->data + 4, sizeof(uint32_t));
memcpy(&c->ssrc, (uint8_t*) chunk->memblock->data + 8, sizeof(uint32_t));
header = ntohl(header);
c->timestamp = ntohl(c->timestamp);
@ -238,10 +236,8 @@ int pa_rtp_recv(pa_rtp_context *c, pa_memchunk *chunk, pa_mempool *pool) {
return 0;
fail:
if (chunk->memblock) {
pa_memblock_release(chunk->memblock);
if (chunk->memblock)
pa_memblock_unref(chunk->memblock);
}
return -1;
}

View file

@ -113,7 +113,6 @@ struct pa_stream {
uint32_t requested_bytes;
pa_memchunk peek_memchunk;
void *peek_data;
pa_memblockq *record_memblockq;
int corked;

View file

@ -88,7 +88,6 @@ pa_stream *pa_stream_new(pa_context *c, const char *name, const pa_sample_spec *
s->peek_memchunk.index = 0;
s->peek_memchunk.length = 0;
s->peek_memchunk.memblock = NULL;
s->peek_data = NULL;
s->record_memblockq = NULL;
@ -123,11 +122,8 @@ static void stream_free(pa_stream *s) {
s->mainloop->time_free(s->auto_timing_update_event);
}
if (s->peek_memchunk.memblock) {
if (s->peek_data)
pa_memblock_release(s->peek_memchunk.memblock);
if (s->peek_memchunk.memblock)
pa_memblock_unref(s->peek_memchunk.memblock);
}
if (s->record_memblockq)
pa_memblockq_free(s->record_memblockq);
@ -609,11 +605,8 @@ int pa_stream_write(
if (free_cb)
chunk.memblock = pa_memblock_new_user(s->context->mempool, (void*) data, length, free_cb, 1);
else {
void *tdata;
chunk.memblock = pa_memblock_new(s->context->mempool, length);
tdata = pa_memblock_acquire(chunk.memblock);
memcpy(tdata, data, length);
pa_memblock_release(chunk.memblock);
memcpy(chunk.memblock->data, data, length);
}
chunk.index = 0;
@ -679,12 +672,9 @@ int pa_stream_peek(pa_stream *s, const void **data, size_t *length) {
*length = 0;
return 0;
}
s->peek_data = pa_memblock_acquire(s->peek_memchunk.memblock);
}
assert(s->peek_data);
*data = (uint8_t*) s->peek_data + s->peek_memchunk.index;
*data = (const char*) s->peek_memchunk.memblock->data + s->peek_memchunk.index;
*length = s->peek_memchunk.length;
return 0;
}
@ -702,9 +692,7 @@ int pa_stream_drop(pa_stream *s) {
/* Fix the simulated local read index */
if (s->timing_info_valid && !s->timing_info.read_index_corrupt)
s->timing_info.read_index += s->peek_memchunk.length;
assert(s->peek_data);
pa_memblock_release(s->peek_memchunk.memblock);
pa_memblock_unref(s->peek_memchunk.memblock);
s->peek_memchunk.length = 0;
s->peek_memchunk.index = 0;

View file

@ -259,20 +259,20 @@ static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_G
stat = pa_mempool_get_stat(c->mempool);
pa_strbuf_printf(buf, "Memory blocks currently allocated: %u, size: %s.\n",
(unsigned) pa_atomic_load(&stat->n_allocated),
pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->allocated_size)));
(unsigned) AO_load_acquire_read((AO_t*) &stat->n_allocated),
pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->allocated_size)));
pa_strbuf_printf(buf, "Memory blocks allocated during the whole lifetime: %u, size: %s.\n",
(unsigned) pa_atomic_load(&stat->n_accumulated),
pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->accumulated_size)));
(unsigned) AO_load_acquire_read((AO_t*) &stat->n_accumulated),
pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->accumulated_size)));
pa_strbuf_printf(buf, "Memory blocks imported from other processes: %u, size: %s.\n",
(unsigned) pa_atomic_load(&stat->n_imported),
pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->imported_size)));
(unsigned) AO_load_acquire_read((AO_t*) &stat->n_imported),
pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->imported_size)));
pa_strbuf_printf(buf, "Memory blocks exported to other processes: %u, size: %s.\n",
(unsigned) pa_atomic_load(&stat->n_exported),
pa_bytes_snprint(s, sizeof(s), (size_t) pa_atomic_load(&stat->exported_size)));
(unsigned) AO_load_acquire_read((AO_t*) &stat->n_exported),
pa_bytes_snprint(s, sizeof(s), (size_t) AO_load_acquire_read((AO_t*) &stat->exported_size)));
pa_strbuf_printf(buf, "Total sample cache size: %s.\n",
pa_bytes_snprint(s, sizeof(s), pa_scache_total_size(c)));
@ -289,8 +289,8 @@ static int pa_cli_command_stat(pa_core *c, pa_tokenizer *t, pa_strbuf *buf, PA_G
pa_strbuf_printf(buf,
"Memory blocks of type %s: %u allocated/%u accumulated.\n",
type_table[k],
(unsigned) pa_atomic_load(&stat->n_allocated_by_type[k]),
(unsigned) pa_atomic_load(&stat->n_accumulated_by_type[k]));
(unsigned) AO_load_acquire_read(&stat->n_allocated_by_type[k]),
(unsigned) AO_load_acquire_read(&stat->n_accumulated_by_type[k]));
return 0;
}

View file

@ -89,7 +89,6 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
} else {
size_t l;
void *lo_data, *m_data;
/* We have to copy */
assert(m->leftover.length < m->base);
@ -101,15 +100,10 @@ void pa_mcalign_push(pa_mcalign *m, const pa_memchunk *c) {
/* Can we use the current block? */
pa_memchunk_make_writable(&m->leftover, m->base);
lo_data = pa_memblock_acquire(m->leftover.memblock);
m_data = pa_memblock_acquire(c->memblock);
memcpy((uint8_t*) lo_data + m->leftover.index + m->leftover.length, (uint8_t*) m_data + c->index, l);
pa_memblock_release(m->leftover.memblock);
pa_memblock_release(c->memblock);
memcpy((uint8_t*) m->leftover.memblock->data + m->leftover.index + m->leftover.length, (uint8_t*) c->memblock->data + c->index, l);
m->leftover.length += l;
assert(m->leftover.length <= m->base);
assert(m->leftover.length <= pa_memblock_get_length(m->leftover.memblock));
assert(m->leftover.length <= m->base && m->leftover.length <= m->leftover.memblock->length);
if (c->length > l) {
/* Save the remainder of the memory block */

View file

@ -30,13 +30,10 @@
#include <unistd.h>
#include <pulse/xmalloc.h>
#include <pulse/def.h>
#include <pulsecore/shm.h>
#include <pulsecore/log.h>
#include <pulsecore/hashmap.h>
#include <pulsecore/mutex.h>
#include <pulsecore/flist.h>
#include "memblock.h"
@ -48,32 +45,6 @@
#define PA_MEMIMPORT_SLOTS_MAX 128
#define PA_MEMIMPORT_SEGMENTS_MAX 16
struct pa_memblock {
PA_REFCNT_DECLARE; /* the reference counter */
pa_mempool *pool;
pa_memblock_type_t type;
int read_only; /* boolean */
pa_atomic_ptr_t data;
size_t length;
pa_atomic_int_t n_acquired;
pa_atomic_int_t please_signal;
union {
struct {
/* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
pa_free_cb_t free_cb;
} user;
struct {
uint32_t id;
pa_memimport_segment *segment;
} imported;
} per_type;
};
struct pa_memimport_segment {
pa_memimport *import;
pa_shm memory;
@ -81,8 +52,6 @@ struct pa_memimport_segment {
};
struct pa_memimport {
pa_mutex *mutex;
pa_mempool *pool;
pa_hashmap *segments;
pa_hashmap *blocks;
@ -101,11 +70,9 @@ struct memexport_slot {
};
struct pa_memexport {
pa_mutex *mutex;
pa_mempool *pool;
struct memexport_slot slots[PA_MEMEXPORT_SLOTS_MAX];
PA_LLIST_HEAD(struct memexport_slot, free_slots);
PA_LLIST_HEAD(struct memexport_slot, used_slots);
unsigned n_init;
@ -125,71 +92,63 @@ struct mempool_slot {
};
struct pa_mempool {
pa_mutex *mutex;
pa_cond *cond;
pa_shm memory;
size_t block_size;
unsigned n_blocks;
pa_atomic_int_t n_init;
unsigned n_blocks, n_init;
PA_LLIST_HEAD(pa_memimport, imports);
PA_LLIST_HEAD(pa_memexport, exports);
/* A list of free slots that may be reused */
pa_flist *free_slots;
PA_LLIST_HEAD(struct mempool_slot, free_slots);
pa_mempool_stat stat;
};
static void segment_detach(pa_memimport_segment *seg);
/* No lock necessary */
static void stat_add(pa_memblock*b) {
assert(b);
assert(b->pool);
pa_atomic_inc(&b->pool->stat.n_allocated);
pa_atomic_add(&b->pool->stat.allocated_size, (int) b->length);
AO_fetch_and_add1_release_write(&b->pool->stat.n_allocated);
AO_fetch_and_add_release_write(&b->pool->stat.allocated_size, (AO_t) b->length);
pa_atomic_inc(&b->pool->stat.n_accumulated);
pa_atomic_add(&b->pool->stat.accumulated_size, (int) b->length);
AO_fetch_and_add1_release_write(&b->pool->stat.n_accumulated);
AO_fetch_and_add_release_write(&b->pool->stat.accumulated_size, (AO_t) b->length);
if (b->type == PA_MEMBLOCK_IMPORTED) {
pa_atomic_inc(&b->pool->stat.n_imported);
pa_atomic_add(&b->pool->stat.imported_size, (int) b->length);
AO_fetch_and_add1_release_write(&b->pool->stat.n_imported);
AO_fetch_and_add_release_write(&b->pool->stat.imported_size, (AO_t) b->length);
}
pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
AO_fetch_and_add1_release_write(&b->pool->stat.n_allocated_by_type[b->type]);
AO_fetch_and_add1_release_write(&b->pool->stat.n_accumulated_by_type[b->type]);
}
/* No lock necessary */
static void stat_remove(pa_memblock *b) {
assert(b);
assert(b->pool);
assert(pa_atomic_load(&b->pool->stat.n_allocated) > 0);
assert(pa_atomic_load(&b->pool->stat.allocated_size) >= (int) b->length);
assert(AO_load_acquire_read(&b->pool->stat.n_allocated) > 0);
assert(AO_load_acquire_read(&b->pool->stat.allocated_size) >= (AO_t) b->length);
pa_atomic_dec(&b->pool->stat.n_allocated);
pa_atomic_add(&b->pool->stat.allocated_size, - (int) b->length);
AO_fetch_and_sub1_release_write(&b->pool->stat.n_allocated);
AO_fetch_and_add_release_write(&b->pool->stat.allocated_size, (AO_t) (-b->length));
if (b->type == PA_MEMBLOCK_IMPORTED) {
assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
assert(AO_load_acquire_read(&b->pool->stat.n_imported) > 0);
assert(AO_load_acquire_read(&b->pool->stat.imported_size) >= (AO_t) b->length);
pa_atomic_dec(&b->pool->stat.n_imported);
pa_atomic_add(&b->pool->stat.imported_size, - (int) b->length);
AO_fetch_and_sub1_release_write(&b->pool->stat.n_imported);
AO_fetch_and_add_release_write(&b->pool->stat.imported_size, (AO_t) (-b->length));
}
pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
AO_fetch_and_sub1_release_write(&b->pool->stat.n_allocated_by_type[b->type]);
}
static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length);
/* No lock necessary */
pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
pa_memblock *b;
@ -202,7 +161,6 @@ pa_memblock *pa_memblock_new(pa_mempool *p, size_t length) {
return b;
}
/* No lock necessary */
static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
pa_memblock *b;
@ -210,61 +168,49 @@ static pa_memblock *memblock_new_appended(pa_mempool *p, size_t length) {
assert(length > 0);
b = pa_xmalloc(sizeof(pa_memblock) + length);
PA_REFCNT_INIT(b);
b->pool = p;
b->type = PA_MEMBLOCK_APPENDED;
b->read_only = 0;
pa_atomic_ptr_store(&b->data, (uint8_t*)b + sizeof(pa_memblock));
PA_REFCNT_INIT(b);
b->length = length;
pa_atomic_store(&b->n_acquired, 0);
pa_atomic_store(&b->please_signal, 0);
b->data = (uint8_t*) b + sizeof(pa_memblock);
b->pool = p;
stat_add(b);
return b;
}
/* No lock necessary */
static struct mempool_slot* mempool_allocate_slot(pa_mempool *p) {
struct mempool_slot *slot;
assert(p);
if (!(slot = pa_flist_pop(p->free_slots))) {
int idx;
/* The free list was empty, we have to allocate a new entry */
if ((unsigned) (idx = pa_atomic_inc(&p->n_init)) >= p->n_blocks)
pa_atomic_dec(&p->n_init);
else
slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * idx));
if (!slot) {
pa_log_debug("Pool full");
pa_atomic_inc(&p->stat.n_pool_full);
}
if (p->free_slots) {
slot = p->free_slots;
PA_LLIST_REMOVE(struct mempool_slot, p->free_slots, slot);
} else if (p->n_init < p->n_blocks)
slot = (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (p->block_size * p->n_init++));
else {
pa_log_debug("Pool full");
AO_fetch_and_add1_release_write(&p->stat.n_pool_full);
return NULL;
}
return slot;
}
/* No lock necessary */
static void* mempool_slot_data(struct mempool_slot *slot) {
assert(slot);
return (uint8_t*) slot + sizeof(struct mempool_slot);
}
/* No lock necessary */
static unsigned mempool_slot_idx(pa_mempool *p, void *ptr) {
assert(p);
assert((uint8_t*) ptr >= (uint8_t*) p->memory.ptr);
assert((uint8_t*) ptr < (uint8_t*) p->memory.ptr + p->memory.size);
return ((uint8_t*) ptr - (uint8_t*) p->memory.ptr) / p->block_size;
}
/* No lock necessary */
static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
unsigned idx;
@ -274,7 +220,6 @@ static struct mempool_slot* mempool_slot_by_ptr(pa_mempool *p, void *ptr) {
return (struct mempool_slot*) ((uint8_t*) p->memory.ptr + (idx * p->block_size));
}
/* No lock necessary */
pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
pa_memblock *b = NULL;
struct mempool_slot *slot;
@ -289,7 +234,7 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
b = mempool_slot_data(slot);
b->type = PA_MEMBLOCK_POOL;
pa_atomic_ptr_store(&b->data, (uint8_t*) b + sizeof(pa_memblock));
b->data = (uint8_t*) b + sizeof(pa_memblock);
} else if (p->block_size - sizeof(struct mempool_slot) >= length) {
@ -298,26 +243,22 @@ pa_memblock *pa_memblock_new_pool(pa_mempool *p, size_t length) {
b = pa_xnew(pa_memblock, 1);
b->type = PA_MEMBLOCK_POOL_EXTERNAL;
pa_atomic_ptr_store(&b->data, mempool_slot_data(slot));
b->data = mempool_slot_data(slot);
} else {
pa_log_debug("Memory block too large for pool: %u > %u", length, p->block_size - sizeof(struct mempool_slot));
pa_atomic_inc(&p->stat.n_too_large_for_pool);
AO_fetch_and_add1_release_write(&p->stat.n_too_large_for_pool);
return NULL;
}
b->length = length;
b->read_only = 0;
PA_REFCNT_INIT(b);
b->pool = p;
b->read_only = 0;
b->length = length;
pa_atomic_store(&b->n_acquired, 0);
pa_atomic_store(&b->please_signal, 0);
stat_add(b);
return b;
}
/* No lock necessary */
pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int read_only) {
pa_memblock *b;
@ -326,20 +267,17 @@ pa_memblock *pa_memblock_new_fixed(pa_mempool *p, void *d, size_t length, int re
assert(length > 0);
b = pa_xnew(pa_memblock, 1);
PA_REFCNT_INIT(b);
b->pool = p;
b->type = PA_MEMBLOCK_FIXED;
b->read_only = read_only;
pa_atomic_ptr_store(&b->data, d);
PA_REFCNT_INIT(b);
b->length = length;
pa_atomic_store(&b->n_acquired, 0);
pa_atomic_store(&b->please_signal, 0);
b->data = d;
b->pool = p;
stat_add(b);
return b;
}
/* No lock necessary */
pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*free_cb)(void *p), int read_only) {
pa_memblock *b;
@ -349,72 +287,18 @@ pa_memblock *pa_memblock_new_user(pa_mempool *p, void *d, size_t length, void (*
assert(free_cb);
b = pa_xnew(pa_memblock, 1);
PA_REFCNT_INIT(b);
b->pool = p;
b->type = PA_MEMBLOCK_USER;
b->read_only = read_only;
pa_atomic_ptr_store(&b->data, d);
PA_REFCNT_INIT(b);
b->length = length;
pa_atomic_store(&b->n_acquired, 0);
pa_atomic_store(&b->please_signal, 0);
b->data = d;
b->per_type.user.free_cb = free_cb;
b->pool = p;
stat_add(b);
return b;
}
/* No lock necessary */
int pa_memblock_is_read_only(pa_memblock *b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
return b->read_only && PA_REFCNT_VALUE(b) == 1;
}
/* No lock necessary */
void* pa_memblock_acquire(pa_memblock *b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
pa_atomic_inc(&b->n_acquired);
return pa_atomic_ptr_load(&b->data);
}
/* No lock necessary, in corner cases locks by its own */
void pa_memblock_release(pa_memblock *b) {
int r;
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
r = pa_atomic_dec(&b->n_acquired);
assert(r >= 1);
if (r == 1 && pa_atomic_load(&b->please_signal)) {
pa_mempool *p = b->pool;
/* Signal a waiting thread that this memblock is no longer used */
pa_mutex_lock(p->mutex);
pa_cond_signal(p->cond, 1);
pa_mutex_unlock(p->mutex);
}
}
size_t pa_memblock_get_length(pa_memblock *b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
return b->length;
}
pa_mempool* pa_memblock_get_pool(pa_memblock *b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
return b->pool;
}
/* No lock necessary */
pa_memblock* pa_memblock_ref(pa_memblock*b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
@ -423,17 +307,19 @@ pa_memblock* pa_memblock_ref(pa_memblock*b) {
return b;
}
static void memblock_free(pa_memblock *b) {
void pa_memblock_unref(pa_memblock*b) {
assert(b);
assert(pa_atomic_load(&b->n_acquired) == 0);
assert(PA_REFCNT_VALUE(b) > 0);
if (PA_REFCNT_DEC(b) > 0)
return;
stat_remove(b);
switch (b->type) {
case PA_MEMBLOCK_USER :
assert(b->per_type.user.free_cb);
b->per_type.user.free_cb(pa_atomic_ptr_load(&b->data));
b->per_type.user.free_cb(b->data);
/* Fall through */
@ -444,23 +330,17 @@ static void memblock_free(pa_memblock *b) {
case PA_MEMBLOCK_IMPORTED : {
pa_memimport_segment *segment;
pa_memimport *import;
/* FIXME! This should be implemented lock-free */
segment = b->per_type.imported.segment;
assert(segment);
import = segment->import;
assert(import);
assert(segment->import);
pa_mutex_lock(import->mutex);
pa_hashmap_remove(import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
pa_hashmap_remove(segment->import->blocks, PA_UINT32_TO_PTR(b->per_type.imported.id));
segment->import->release_cb(segment->import, b->per_type.imported.id, segment->import->userdata);
if (-- segment->n_blocks <= 0)
segment_detach(segment);
pa_mutex_unlock(import->mutex);
import->release_cb(import, b->per_type.imported.id, import->userdata);
pa_xfree(b);
break;
}
@ -468,20 +348,13 @@ static void memblock_free(pa_memblock *b) {
case PA_MEMBLOCK_POOL_EXTERNAL:
case PA_MEMBLOCK_POOL: {
struct mempool_slot *slot;
int call_free;
slot = mempool_slot_by_ptr(b->pool, pa_atomic_ptr_load(&b->data));
slot = mempool_slot_by_ptr(b->pool, b->data);
assert(slot);
call_free = b->type == PA_MEMBLOCK_POOL_EXTERNAL;
/* The free list dimensions should easily allow all slots
* to fit in, hence try harder if pushing this slot into
* the free list fails */
while (pa_flist_push(b->pool->free_slots, slot) < 0)
;
if (call_free)
PA_LLIST_PREPEND(struct mempool_slot, b->pool->free_slots, slot);
if (b->type == PA_MEMBLOCK_POOL_EXTERNAL)
pa_xfree(b);
break;
@ -493,42 +366,10 @@ static void memblock_free(pa_memblock *b) {
}
}
/* No lock necessary */
void pa_memblock_unref(pa_memblock*b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
if (PA_REFCNT_DEC(b) > 0)
return;
memblock_free(b);
}
/* Self locked */
static void memblock_wait(pa_memblock *b) {
assert(b);
if (pa_atomic_load(&b->n_acquired) > 0) {
/* We need to wait until all threads gave up access to the
* memory block before we can go on. Unfortunately this means
* that we have to lock and wait here. Sniff! */
pa_atomic_inc(&b->please_signal);
pa_mutex_lock(b->pool->mutex);
while (pa_atomic_load(&b->n_acquired) > 0)
pa_cond_wait(b->pool->cond, b->pool->mutex);
pa_mutex_unlock(b->pool->mutex);
pa_atomic_dec(&b->please_signal);
}
}
/* No lock necessary. This function is not multiple caller safe! */
static void memblock_make_local(pa_memblock *b) {
assert(b);
pa_atomic_dec(&b->pool->stat.n_allocated_by_type[b->type]);
AO_fetch_and_sub1_release_write(&b->pool->stat.n_allocated_by_type[b->type]);
if (b->length <= b->pool->block_size - sizeof(struct mempool_slot)) {
struct mempool_slot *slot;
@ -537,61 +378,53 @@ static void memblock_make_local(pa_memblock *b) {
void *new_data;
/* We can move it into a local pool, perfect! */
new_data = mempool_slot_data(slot);
memcpy(new_data, pa_atomic_ptr_load(&b->data), b->length);
pa_atomic_ptr_store(&b->data, new_data);
b->type = PA_MEMBLOCK_POOL_EXTERNAL;
b->read_only = 0;
new_data = mempool_slot_data(slot);
memcpy(new_data, b->data, b->length);
b->data = new_data;
goto finish;
}
}
/* Humm, not enough space in the pool, so lets allocate the memory with malloc() */
b->per_type.user.free_cb = pa_xfree;
pa_atomic_ptr_store(&b->data, pa_xmemdup(pa_atomic_ptr_load(&b->data), b->length));
b->type = PA_MEMBLOCK_USER;
b->per_type.user.free_cb = pa_xfree;
b->read_only = 0;
b->data = pa_xmemdup(b->data, b->length);
finish:
pa_atomic_inc(&b->pool->stat.n_allocated_by_type[b->type]);
pa_atomic_inc(&b->pool->stat.n_accumulated_by_type[b->type]);
memblock_wait(b);
AO_fetch_and_add1_release_write(&b->pool->stat.n_allocated_by_type[b->type]);
AO_fetch_and_add1_release_write(&b->pool->stat.n_accumulated_by_type[b->type]);
}
/* No lock necessary. This function is not multiple caller safe*/
void pa_memblock_unref_fixed(pa_memblock *b) {
assert(b);
assert(PA_REFCNT_VALUE(b) > 0);
assert(b->type == PA_MEMBLOCK_FIXED);
if (PA_REFCNT_DEC(b) > 0)
if (PA_REFCNT_VALUE(b) > 1)
memblock_make_local(b);
else
memblock_free(b);
pa_memblock_unref(b);
}
/* Self-locked. This function is not multiple-caller safe */
static void memblock_replace_import(pa_memblock *b) {
pa_memimport_segment *seg;
assert(b);
assert(b->type == PA_MEMBLOCK_IMPORTED);
assert(pa_atomic_load(&b->pool->stat.n_imported) > 0);
assert(pa_atomic_load(&b->pool->stat.imported_size) >= (int) b->length);
pa_atomic_dec(&b->pool->stat.n_imported);
pa_atomic_add(&b->pool->stat.imported_size, (int) - b->length);
assert(AO_load_acquire_read(&b->pool->stat.n_imported) > 0);
assert(AO_load_acquire_read(&b->pool->stat.imported_size) >= (AO_t) b->length);
AO_fetch_and_sub1_release_write(&b->pool->stat.n_imported);
AO_fetch_and_add_release_write(&b->pool->stat.imported_size, (AO_t) - b->length);
seg = b->per_type.imported.segment;
assert(seg);
assert(seg->import);
pa_mutex_lock(seg->import->mutex);
pa_hashmap_remove(
seg->import->blocks,
PA_UINT32_TO_PTR(b->per_type.imported.id));
@ -600,8 +433,6 @@ static void memblock_replace_import(pa_memblock *b) {
if (-- seg->n_blocks <= 0)
segment_detach(seg);
pa_mutex_unlock(seg->import->mutex);
}
pa_mempool* pa_mempool_new(int shared) {
@ -610,15 +441,12 @@ pa_mempool* pa_mempool_new(int shared) {
p = pa_xnew(pa_mempool, 1);
p->mutex = pa_mutex_new(1);
p->cond = pa_cond_new();
#ifdef HAVE_SYSCONF
ps = (size_t) sysconf(_SC_PAGESIZE);
#elif defined(PAGE_SIZE)
ps = (size_t) PAGE_SIZE;
ps = (size_t) PAGE_SIZE;
#else
ps = 4096; /* Let's hope it's like x86. */
ps = 4096; /* Let's hope it's like x86. */
#endif
p->block_size = (PA_MEMPOOL_SLOT_SIZE/ps)*ps;
@ -635,13 +463,13 @@ pa_mempool* pa_mempool_new(int shared) {
return NULL;
}
memset(&p->stat, 0, sizeof(p->stat));
pa_atomic_store(&p->n_init, 0);
p->n_init = 0;
PA_LLIST_HEAD_INIT(pa_memimport, p->imports);
PA_LLIST_HEAD_INIT(pa_memexport, p->exports);
PA_LLIST_HEAD_INIT(struct mempool_slot, p->free_slots);
p->free_slots = pa_flist_new(p->n_blocks*2);
memset(&p->stat, 0, sizeof(p->stat));
return p;
}
@ -649,62 +477,34 @@ pa_mempool* pa_mempool_new(int shared) {
void pa_mempool_free(pa_mempool *p) {
assert(p);
pa_mutex_lock(p->mutex);
while (p->imports)
pa_memimport_free(p->imports);
while (p->exports)
pa_memexport_free(p->exports);
pa_mutex_unlock(p->mutex);
if (pa_atomic_load(&p->stat.n_allocated) > 0)
if (AO_load_acquire_read(&p->stat.n_allocated) > 0)
pa_log_warn("WARNING! Memory pool destroyed but not all memory blocks freed!");
pa_flist_free(p->free_slots, NULL);
pa_shm_free(&p->memory);
pa_mutex_free(p->mutex);
pa_cond_free(p->cond);
pa_xfree(p);
}
/* No lock necessary */
const pa_mempool_stat* pa_mempool_get_stat(pa_mempool *p) {
assert(p);
return &p->stat;
}
/* No lock necessary */
void pa_mempool_vacuum(pa_mempool *p) {
struct mempool_slot *slot;
pa_flist *list;
assert(p);
list = pa_flist_new(p->n_blocks*2);
while ((slot = pa_flist_pop(p->free_slots)))
while (pa_flist_push(list, slot) < 0)
;
while ((slot = pa_flist_pop(list))) {
pa_shm_punch(&p->memory,
(uint8_t*) slot - (uint8_t*) p->memory.ptr + sizeof(struct mempool_slot),
p->block_size - sizeof(struct mempool_slot));
while (pa_flist_push(p->free_slots, slot))
;
}
pa_flist_free(list, NULL);
for (slot = p->free_slots; slot; slot = slot->next)
pa_shm_punch(&p->memory, (uint8_t*) slot + sizeof(struct mempool_slot) - (uint8_t*) p->memory.ptr, p->block_size - sizeof(struct mempool_slot));
}
/* No lock necessary */
int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
assert(p);
@ -716,7 +516,6 @@ int pa_mempool_get_shm_id(pa_mempool *p, uint32_t *id) {
return 0;
}
/* No lock necessary */
int pa_mempool_is_shared(pa_mempool *p) {
assert(p);
@ -731,23 +530,18 @@ pa_memimport* pa_memimport_new(pa_mempool *p, pa_memimport_release_cb_t cb, void
assert(cb);
i = pa_xnew(pa_memimport, 1);
i->mutex = pa_mutex_new(0);
i->pool = p;
i->segments = pa_hashmap_new(NULL, NULL);
i->blocks = pa_hashmap_new(NULL, NULL);
i->release_cb = cb;
i->userdata = userdata;
pa_mutex_lock(p->mutex);
PA_LLIST_PREPEND(pa_memimport, p->imports, i);
pa_mutex_unlock(p->mutex);
return i;
}
static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i);
/* Should be called locked */
static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
pa_memimport_segment* seg;
@ -768,7 +562,6 @@ static pa_memimport_segment* segment_attach(pa_memimport *i, uint32_t shm_id) {
return seg;
}
/* Should be called locked */
static void segment_detach(pa_memimport_segment *seg) {
assert(seg);
@ -777,68 +570,51 @@ static void segment_detach(pa_memimport_segment *seg) {
pa_xfree(seg);
}
/* Self-locked. Not multiple-caller safe */
void pa_memimport_free(pa_memimport *i) {
pa_memexport *e;
pa_memblock *b;
assert(i);
pa_mutex_lock(i->mutex);
/* If we've exported this block further we need to revoke that export */
for (e = i->pool->exports; e; e = e->next)
memexport_revoke_blocks(e, i);
while ((b = pa_hashmap_get_first(i->blocks)))
memblock_replace_import(b);
assert(pa_hashmap_size(i->segments) == 0);
pa_mutex_unlock(i->mutex);
pa_mutex_lock(i->pool->mutex);
/* If we've exported this block further we need to revoke that export */
for (e = i->pool->exports; e; e = e->next)
memexport_revoke_blocks(e, i);
PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
pa_mutex_unlock(i->pool->mutex);
pa_hashmap_free(i->blocks, NULL, NULL);
pa_hashmap_free(i->segments, NULL, NULL);
pa_mutex_free(i->mutex);
PA_LLIST_REMOVE(pa_memimport, i->pool->imports, i);
pa_xfree(i);
}
/* Self-locked */
pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_id, size_t offset, size_t size) {
pa_memblock *b = NULL;
pa_memblock *b;
pa_memimport_segment *seg;
assert(i);
pa_mutex_lock(i->mutex);
if (pa_hashmap_size(i->blocks) >= PA_MEMIMPORT_SLOTS_MAX)
goto finish;
return NULL;
if (!(seg = pa_hashmap_get(i->segments, PA_UINT32_TO_PTR(shm_id))))
if (!(seg = segment_attach(i, shm_id)))
goto finish;
return NULL;
if (offset+size > seg->memory.size)
goto finish;
return NULL;
b = pa_xnew(pa_memblock, 1);
PA_REFCNT_INIT(b);
b->pool = i->pool;
b->type = PA_MEMBLOCK_IMPORTED;
b->read_only = 1;
pa_atomic_ptr_store(&b->data, (uint8_t*) seg->memory.ptr + offset);
PA_REFCNT_INIT(b);
b->length = size;
pa_atomic_store(&b->n_acquired, 0);
pa_atomic_store(&b->please_signal, 0);
b->data = (uint8_t*) seg->memory.ptr + offset;
b->pool = i->pool;
b->per_type.imported.id = block_id;
b->per_type.imported.segment = seg;
@ -846,11 +622,7 @@ pa_memblock* pa_memimport_get(pa_memimport *i, uint32_t block_id, uint32_t shm_i
seg->n_blocks++;
finish:
pa_mutex_unlock(i->mutex);
if (b)
stat_add(b);
stat_add(b);
return b;
}
@ -859,15 +631,10 @@ int pa_memimport_process_revoke(pa_memimport *i, uint32_t id) {
pa_memblock *b;
assert(i);
pa_mutex_lock(i->mutex);
if (!(b = pa_hashmap_get(i->blocks, PA_UINT32_TO_PTR(id))))
return -1;
memblock_replace_import(b);
pa_mutex_unlock(i->mutex);
return 0;
}
@ -882,84 +649,58 @@ pa_memexport* pa_memexport_new(pa_mempool *p, pa_memexport_revoke_cb_t cb, void
return NULL;
e = pa_xnew(pa_memexport, 1);
e->mutex = pa_mutex_new(1);
e->pool = p;
PA_LLIST_HEAD_INIT(struct memexport_slot, e->free_slots);
PA_LLIST_HEAD_INIT(struct memexport_slot, e->used_slots);
e->n_init = 0;
e->revoke_cb = cb;
e->userdata = userdata;
pa_mutex_lock(p->mutex);
PA_LLIST_PREPEND(pa_memexport, p->exports, e);
pa_mutex_unlock(p->mutex);
PA_LLIST_PREPEND(pa_memexport, p->exports, e);
return e;
}
void pa_memexport_free(pa_memexport *e) {
assert(e);
pa_mutex_lock(e->mutex);
while (e->used_slots)
pa_memexport_process_release(e, e->used_slots - e->slots);
pa_mutex_unlock(e->mutex);
pa_mutex_lock(e->pool->mutex);
PA_LLIST_REMOVE(pa_memexport, e->pool->exports, e);
pa_mutex_unlock(e->pool->mutex);
pa_xfree(e);
}
/* Self-locked */
int pa_memexport_process_release(pa_memexport *e, uint32_t id) {
pa_memblock *b;
assert(e);
pa_mutex_lock(e->mutex);
if (id >= e->n_init)
goto fail;
return -1;
if (!e->slots[id].block)
goto fail;
return -1;
b = e->slots[id].block;
/* pa_log("Processing release for %u", id); */
assert(AO_load_acquire_read(&e->pool->stat.n_exported) > 0);
assert(AO_load_acquire_read(&e->pool->stat.exported_size) >= (AO_t) e->slots[id].block->length);
AO_fetch_and_sub1_release_write(&e->pool->stat.n_exported);
AO_fetch_and_add_release_write(&e->pool->stat.exported_size, (AO_t) -e->slots[id].block->length);
pa_memblock_unref(e->slots[id].block);
e->slots[id].block = NULL;
PA_LLIST_REMOVE(struct memexport_slot, e->used_slots, &e->slots[id]);
PA_LLIST_PREPEND(struct memexport_slot, e->free_slots, &e->slots[id]);
pa_mutex_unlock(e->mutex);
/* pa_log("Processing release for %u", id); */
assert(pa_atomic_load(&e->pool->stat.n_exported) > 0);
assert(pa_atomic_load(&e->pool->stat.exported_size) >= (int) b->length);
pa_atomic_dec(&e->pool->stat.n_exported);
pa_atomic_add(&e->pool->stat.exported_size, (int) -b->length);
pa_memblock_unref(b);
return 0;
fail:
pa_mutex_unlock(e->mutex);
return -1;
}
/* Self-locked */
static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
struct memexport_slot *slot, *next;
assert(e);
assert(i);
pa_mutex_lock(e->mutex);
for (slot = e->used_slots; slot; slot = next) {
uint32_t idx;
next = slot->next;
@ -972,11 +713,8 @@ static void memexport_revoke_blocks(pa_memexport *e, pa_memimport *i) {
e->revoke_cb(e, idx, e->userdata);
pa_memexport_process_release(e, idx);
}
pa_mutex_unlock(e->mutex);
}
/* No lock necessary */
static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
pa_memblock *n;
@ -993,16 +731,13 @@ static pa_memblock *memblock_shared_copy(pa_mempool *p, pa_memblock *b) {
if (!(n = pa_memblock_new_pool(p, b->length)))
return NULL;
memcpy(pa_atomic_ptr_load(&n->data), pa_atomic_ptr_load(&b->data), b->length);
memcpy(n->data, b->data, b->length);
return n;
}
/* Self-locked */
int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32_t *shm_id, size_t *offset, size_t * size) {
pa_shm *memory;
struct memexport_slot *slot;
void *data;
size_t length;
assert(e);
assert(b);
@ -1015,15 +750,12 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
if (!(b = memblock_shared_copy(e->pool, b)))
return -1;
pa_mutex_lock(e->mutex);
if (e->free_slots) {
slot = e->free_slots;
PA_LLIST_REMOVE(struct memexport_slot, e->free_slots, slot);
} else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX)
} else if (e->n_init < PA_MEMEXPORT_SLOTS_MAX) {
slot = &e->slots[e->n_init++];
else {
pa_mutex_unlock(e->mutex);
} else {
pa_memblock_unref(b);
return -1;
}
@ -1032,11 +764,8 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
slot->block = b;
*block_id = slot - e->slots;
pa_mutex_unlock(e->mutex);
/* pa_log("Got block id %u", *block_id); */
data = pa_memblock_acquire(b);
if (b->type == PA_MEMBLOCK_IMPORTED) {
assert(b->per_type.imported.segment);
memory = &b->per_type.imported.segment->memory;
@ -1046,17 +775,15 @@ int pa_memexport_put(pa_memexport *e, pa_memblock *b, uint32_t *block_id, uint32
memory = &b->pool->memory;
}
assert(data >= memory->ptr);
assert((uint8_t*) data + length <= (uint8_t*) memory->ptr + memory->size);
assert(b->data >= memory->ptr);
assert((uint8_t*) b->data + b->length <= (uint8_t*) memory->ptr + memory->size);
*shm_id = memory->id;
*offset = (uint8_t*) data - (uint8_t*) memory->ptr;
*size = length;
*offset = (uint8_t*) b->data - (uint8_t*) memory->ptr;
*size = b->length;
pa_memblock_release(b);
pa_atomic_inc(&e->pool->stat.n_exported);
pa_atomic_add(&e->pool->stat.exported_size, (int) length);
AO_fetch_and_add1_release_write(&e->pool->stat.n_exported);
AO_fetch_and_add_release_write(&e->pool->stat.exported_size, (AO_t) b->length);
return 0;
}

View file

@ -25,7 +25,6 @@
#include <sys/types.h>
#include <inttypes.h>
#include <pulse/def.h>
#include <pulsecore/llist.h>
#include <pulsecore/refcnt.h>
@ -55,25 +54,45 @@ typedef struct pa_memexport pa_memexport;
typedef void (*pa_memimport_release_cb_t)(pa_memimport *i, uint32_t block_id, void *userdata);
typedef void (*pa_memexport_revoke_cb_t)(pa_memexport *e, uint32_t block_id, void *userdata);
struct pa_memblock {
pa_memblock_type_t type;
int read_only; /* boolean */
PA_REFCNT_DECLARE; /* the reference counter */
size_t length;
void *data;
pa_mempool *pool;
union {
struct {
void (*free_cb)(void *p); /* If type == PA_MEMBLOCK_USER this points to a function for freeing this memory block */
} user;
struct {
uint32_t id;
pa_memimport_segment *segment;
} imported;
} per_type;
};
/* Please note that updates to this structure are not locked,
* i.e. n_allocated might be updated at a point in time where
* n_accumulated is not yet. Take these values with a grain of salt,
* they are here for purely statistical reasons.*/
* threy are here for purely statistical reasons.*/
struct pa_mempool_stat {
pa_atomic_int_t n_allocated;
pa_atomic_int_t n_accumulated;
pa_atomic_int_t n_imported;
pa_atomic_int_t n_exported;
pa_atomic_int_t allocated_size;
pa_atomic_int_t accumulated_size;
pa_atomic_int_t imported_size;
pa_atomic_int_t exported_size;
AO_t n_allocated;
AO_t n_accumulated;
AO_t n_imported;
AO_t n_exported;
AO_t allocated_size;
AO_t accumulated_size;
AO_t imported_size;
AO_t exported_size;
pa_atomic_int_t n_too_large_for_pool;
pa_atomic_int_t n_pool_full;
AO_t n_too_large_for_pool;
AO_t n_pool_full;
pa_atomic_int_t n_allocated_by_type[PA_MEMBLOCK_TYPE_MAX];
pa_atomic_int_t n_accumulated_by_type[PA_MEMBLOCK_TYPE_MAX];
AO_t n_allocated_by_type[PA_MEMBLOCK_TYPE_MAX];
AO_t n_accumulated_by_type[PA_MEMBLOCK_TYPE_MAX];
};
/* Allocate a new memory block of type PA_MEMBLOCK_MEMPOOL or PA_MEMBLOCK_APPENDED, depending on the size */
@ -97,17 +116,9 @@ pa_memblock* pa_memblock_ref(pa_memblock*b);
/* This special unref function has to be called by the owner of the
memory of a static memory block when he wants to release all
references to the memory. This causes the memory to be copied and
converted into a pool or malloc'ed memory block. Please note that this
function is not multiple caller safe, i.e. needs to be locked
manually if called from more than one thread at the same time. */
converted into a PA_MEMBLOCK_DYNAMIC type memory block */
void pa_memblock_unref_fixed(pa_memblock*b);
int pa_memblock_is_read_only(pa_memblock *b);
void* pa_memblock_acquire(pa_memblock *b);
void pa_memblock_release(pa_memblock *b);
size_t pa_memblock_get_length(pa_memblock *b);
pa_mempool * pa_memblock_get_pool(pa_memblock *b);
/* The memory block manager */
pa_mempool* pa_mempool_new(int shared);
void pa_mempool_free(pa_mempool *p);

View file

@ -176,7 +176,7 @@ int pa_memblockq_push(pa_memblockq* bq, const pa_memchunk *uchunk) {
assert(uchunk);
assert(uchunk->memblock);
assert(uchunk->length > 0);
assert(uchunk->index + uchunk->length <= pa_memblock_get_length(uchunk->memblock));
assert(uchunk->index + uchunk->length <= uchunk->memblock->length);
if (uchunk->length % bq->base)
return -1;
@ -360,8 +360,8 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
if (bq->silence) {
chunk->memblock = pa_memblock_ref(bq->silence);
if (!length || length > pa_memblock_get_length(chunk->memblock))
length = pa_memblock_get_length(chunk->memblock);
if (!length || length > chunk->memblock->length)
length = chunk->memblock->length;
chunk->length = length;
} else {
@ -413,8 +413,8 @@ void pa_memblockq_drop(pa_memblockq *bq, const pa_memchunk *chunk, size_t length
if (bq->silence) {
if (!l || l > pa_memblock_get_length(bq->silence))
l = pa_memblock_get_length(bq->silence);
if (!l || l > bq->silence->length)
l = bq->silence->length;
}

View file

@ -35,25 +35,22 @@
void pa_memchunk_make_writable(pa_memchunk *c, size_t min) {
pa_memblock *n;
size_t l;
void *tdata, *sdata;
assert(c);
assert(c->memblock);
assert(PA_REFCNT_VALUE(c->memblock) > 0);
if (pa_memblock_is_read_only(c->memblock) &&
pa_memblock_get_length(c->memblock) >= c->index+min)
if (PA_REFCNT_VALUE(c->memblock) == 1 &&
!c->memblock->read_only &&
c->memblock->length >= c->index+min)
return;
l = c->length;
if (l < min)
l = min;
n = pa_memblock_new(pa_memblock_get_pool(c->memblock), l);
tdata = pa_memblock_acquire(n);
sdata = pa_memblock_acquire(c->memblock);
memcpy(tdata, (uint8_t*) sdata + c->index, c->length);
pa_memblock_release(n);
pa_memblock_release(c->memblock);
n = pa_memblock_new(c->memblock->pool, l);
memcpy(n->data, (uint8_t*) c->memblock->data + c->index, c->length);
pa_memblock_unref(c->memblock);
c->memblock = n;
c->index = 0;

View file

@ -55,7 +55,7 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
if (c->length <= 0)
return -1;
assert(c->memblock);
assert(c->memblock && c->memblock->length);
*chunk = *c;
pa_memblock_ref(c->memblock);

View file

@ -891,22 +891,14 @@ static int do_read(struct connection *c) {
}
} else if (c->state == ESD_CACHING_SAMPLE) {
ssize_t r;
void *p;
assert(c->scache.memchunk.memblock);
assert(c->scache.name);
assert(c->scache.memchunk.index < c->scache.memchunk.length);
p = pa_memblock_acquire(c->scache.memchunk.memblock);
assert(c->scache.memchunk.memblock && c->scache.name && c->scache.memchunk.index < c->scache.memchunk.length);
if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) {
pa_memblock_release(c->scache.memchunk.memblock);
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->scache.memchunk.memblock->data+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) {
pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
return -1;
}
pa_memblock_release(c->scache.memchunk.memblock);
c->scache.memchunk.index += r;
assert(c->scache.memchunk.index <= c->scache.memchunk.length);
@ -933,7 +925,6 @@ static int do_read(struct connection *c) {
pa_memchunk chunk;
ssize_t r;
size_t l;
void *p;
assert(c->input_memblockq);
@ -946,7 +937,7 @@ static int do_read(struct connection *c) {
l = c->playback.fragment_size;
if (c->playback.current_memblock)
if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) {
if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
pa_memblock_unref(c->playback.current_memblock);
c->playback.current_memblock = NULL;
c->playback.memblock_index = 0;
@ -954,21 +945,15 @@ static int do_read(struct connection *c) {
if (!c->playback.current_memblock) {
c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
assert(c->playback.current_memblock);
assert(pa_memblock_get_length(c->playback.current_memblock) >= l);
assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
c->playback.memblock_index = 0;
}
p = pa_memblock_acquire(c->playback.current_memblock);
if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l)) <= 0) {
pa_memblock_release(c->playback.current_memblock);
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");
return -1;
}
pa_memblock_release(c->playback.current_memblock);
chunk.memblock = c->playback.current_memblock;
chunk.index = c->playback.memblock_index;
chunk.length = r;
@ -1005,26 +990,19 @@ static int do_write(struct connection *c) {
} else if (c->state == ESD_STREAMING_DATA && c->source_output) {
pa_memchunk chunk;
ssize_t r;
void *p;
assert(c->output_memblockq);
if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)
return 0;
assert(chunk.memblock);
assert(chunk.length);
p = pa_memblock_acquire(chunk.memblock);
assert(chunk.memblock && chunk.length);
if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) {
pa_memblock_release(chunk.memblock);
if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
pa_memblock_unref(chunk.memblock);
pa_log("write(): %s", pa_cstrerror(errno));
return -1;
}
pa_memblock_release(chunk.memblock);
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);

View file

@ -2274,7 +2274,6 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
} else {
struct upload_stream *u = (struct upload_stream*) stream;
size_t l;
assert(u->type == UPLOAD_STREAM);
if (!u->memchunk.memblock) {
@ -2294,18 +2293,9 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o
if (l > chunk->length)
l = chunk->length;
if (l > 0) {
void *src, *dst;
dst = pa_memblock_acquire(u->memchunk.memblock);
src = pa_memblock_acquire(chunk->memblock);
memcpy((uint8_t*) dst + u->memchunk.index + u->memchunk.length,
(uint8_t*) src+chunk->index, l);
pa_memblock_release(u->memchunk.memblock);
pa_memblock_release(chunk->memblock);
memcpy((uint8_t*) u->memchunk.memblock->data + u->memchunk.index + u->memchunk.length,
(uint8_t*) chunk->memblock->data+chunk->index, l);
u->memchunk.length += l;
u->length -= l;
}

View file

@ -113,7 +113,6 @@ static int do_read(struct connection *c) {
pa_memchunk chunk;
ssize_t r;
size_t l;
void *p;
if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
return 0;
@ -122,7 +121,7 @@ static int do_read(struct connection *c) {
l = c->playback.fragment_size;
if (c->playback.current_memblock)
if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) {
if (c->playback.current_memblock->length - c->playback.memblock_index < l) {
pa_memblock_unref(c->playback.current_memblock);
c->playback.current_memblock = NULL;
c->playback.memblock_index = 0;
@ -130,20 +129,15 @@ static int do_read(struct connection *c) {
if (!c->playback.current_memblock) {
c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2);
assert(c->playback.current_memblock);
assert(pa_memblock_get_length(c->playback.current_memblock) >= l);
assert(c->playback.current_memblock && c->playback.current_memblock->length >= l);
c->playback.memblock_index = 0;
}
p = pa_memblock_acquire(c->playback.current_memblock);
if ((r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l)) <= 0) {
pa_memblock_release(c->playback.current_memblock);
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno));
return -1;
}
pa_memblock_release(c->playback.current_memblock);
chunk.memblock = c->playback.current_memblock;
chunk.index = c->playback.memblock_index;
chunk.length = r;
@ -162,8 +156,7 @@ static int do_read(struct connection *c) {
static int do_write(struct connection *c) {
pa_memchunk chunk;
ssize_t r;
void *p;
if (!c->source_output)
return 0;
@ -172,17 +165,12 @@ static int do_write(struct connection *c) {
return 0;
assert(chunk.memblock && chunk.length);
p = pa_memblock_acquire(chunk.memblock);
if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) {
pa_memblock_release(chunk.memblock);
if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) {
pa_memblock_unref(chunk.memblock);
pa_log("write(): %s", pa_cstrerror(errno));
return -1;
}
pa_memblock_release(chunk.memblock);
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);

View file

@ -48,7 +48,6 @@
#include <pulsecore/creds.h>
#include <pulsecore/mutex.h>
#include <pulsecore/refcnt.h>
#include <pulsecore/anotify.h>
#include "pstream.h"
@ -114,11 +113,10 @@ struct pa_pstream {
PA_REFCNT_DECLARE;
pa_mainloop_api *mainloop;
pa_defer_event *defer_event;
pa_iochannel *io;
pa_queue *send_queue;
pa_mutex *mutex; /* only for access to the queue */
pa_anotify *anotify;
pa_mutex *mutex;
int dead;
@ -128,7 +126,6 @@ struct pa_pstream {
uint32_t shm_info[PA_PSTREAM_SHM_MAX];
void *data;
size_t index;
pa_memchunk memchunk;
} write;
struct {
@ -173,6 +170,10 @@ static void do_something(pa_pstream *p) {
pa_pstream_ref(p);
pa_mutex_lock(p->mutex);
p->mainloop->defer_enable(p->defer_event, 0);
if (!p->dead && pa_iochannel_is_readable(p->io)) {
if (do_read(p) < 0)
goto fail;
@ -184,6 +185,8 @@ static void do_something(pa_pstream *p) {
goto fail;
}
pa_mutex_unlock(p->mutex);
pa_pstream_unref(p);
return;
@ -194,6 +197,8 @@ fail:
if (p->die_callback)
p->die_callback(p, p->die_callback_userdata);
pa_mutex_unlock(p->mutex);
pa_pstream_unref(p);
}
@ -206,10 +211,13 @@ static void io_callback(pa_iochannel*io, void *userdata) {
do_something(p);
}
static void anotify_callback(uint8_t event, void *userdata) {
static void defer_callback(pa_mainloop_api *m, pa_defer_event *e, void*userdata) {
pa_pstream *p = userdata;
assert(p);
assert(p->defer_event == e);
assert(p->mainloop == m);
do_something(p);
}
@ -229,16 +237,16 @@ pa_pstream *pa_pstream_new(pa_mainloop_api *m, pa_iochannel *io, pa_mempool *poo
p->dead = 0;
p->mutex = pa_mutex_new(1);
p->anotify = pa_anotify_new(m, anotify_callback, p);
p->mainloop = m;
p->defer_event = m->defer_new(m, defer_callback, p);
m->defer_enable(p->defer_event, 0);
p->send_queue = pa_queue_new();
assert(p->send_queue);
p->write.current = NULL;
p->write.index = 0;
pa_memchunk_reset(&p->write.memchunk);
p->read.memblock = NULL;
p->read.packet = NULL;
p->read.index = 0;
@ -301,15 +309,9 @@ static void pstream_free(pa_pstream *p) {
if (p->read.packet)
pa_packet_unref(p->read.packet);
if (p->write.memchunk.memblock)
pa_memblock_unref(p->write.memchunk.memblock);
if (p->mutex)
pa_mutex_free(p->mutex);
if (p->anotify)
pa_anotify_free(p->anotify);
pa_xfree(p);
}
@ -320,6 +322,11 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
assert(PA_REFCNT_VALUE(p) > 0);
assert(packet);
pa_mutex_lock(p->mutex);
if (p->dead)
goto finish;
i = pa_xnew(struct item_info, 1);
i->type = PA_PSTREAM_ITEM_PACKET;
i->packet = pa_packet_ref(packet);
@ -329,11 +336,12 @@ void pa_pstream_send_packet(pa_pstream*p, pa_packet *packet, const pa_creds *cre
i->creds = *creds;
#endif
pa_mutex_lock(p->mutex);
pa_queue_push(p->send_queue, i);
p->mainloop->defer_enable(p->defer_event, 1);
finish:
pa_mutex_unlock(p->mutex);
pa_anotify_signal(p->anotify, 0);
}
void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa_seek_mode_t seek_mode, const pa_memchunk *chunk) {
@ -344,6 +352,12 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
assert(channel != (uint32_t) -1);
assert(chunk);
pa_mutex_lock(p->mutex);
if (p->dead)
goto finish;
length = chunk->length;
idx = 0;
while (length > 0) {
@ -365,15 +379,17 @@ void pa_pstream_send_memblock(pa_pstream*p, uint32_t channel, int64_t offset, pa
i->with_creds = 0;
#endif
pa_mutex_lock(p->mutex);
pa_queue_push(p->send_queue, i);
pa_mutex_unlock(p->mutex);
idx += n;
length -= n;
}
p->mainloop->defer_enable(p->defer_event, 1);
pa_anotify_signal(p->anotify, 0);
finish:
pa_mutex_unlock(p->mutex);
}
static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userdata) {
@ -383,6 +399,11 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
if (p->dead)
goto finish;
/* pa_log("Releasing block %u", block_id); */
item = pa_xnew(struct item_info, 1);
@ -392,11 +413,12 @@ static void memimport_release_cb(pa_memimport *i, uint32_t block_id, void *userd
item->with_creds = 0;
#endif
pa_mutex_lock(p->mutex);
pa_queue_push(p->send_queue, item);
pa_mutex_unlock(p->mutex);
p->mainloop->defer_enable(p->defer_event, 1);
pa_anotify_signal(p->anotify, 0);
finish:
pa_mutex_unlock(p->mutex);
}
static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userdata) {
@ -406,6 +428,11 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
if (p->dead)
goto finish;
/* pa_log("Revoking block %u", block_id); */
item = pa_xnew(struct item_info, 1);
@ -415,27 +442,23 @@ static void memexport_revoke_cb(pa_memexport *e, uint32_t block_id, void *userda
item->with_creds = 0;
#endif
pa_mutex_lock(p->mutex);
pa_queue_push(p->send_queue, item);
pa_mutex_unlock(p->mutex);
p->mainloop->defer_enable(p->defer_event, 1);
pa_anotify_signal(p->anotify, 0);
finish:
pa_mutex_unlock(p->mutex);
}
static void prepare_next_write_item(pa_pstream *p) {
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
p->write.current = pa_queue_pop(p->send_queue);
pa_mutex_unlock(p->mutex);
if (!p->write.current)
if (!(p->write.current = pa_queue_pop(p->send_queue)))
return;
p->write.index = 0;
p->write.data = NULL;
pa_memchunk_reset(&p->write.memchunk);
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = 0;
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
@ -502,9 +525,7 @@ static void prepare_next_write_item(pa_pstream *p) {
if (send_payload) {
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->chunk.length);
p->write.memchunk = p->write.current->chunk;
pa_memblock_ref(p->write.memchunk.memblock);
p->write.data = NULL;
p->write.data = (uint8_t*) p->write.current->chunk.memblock->data + p->write.current->chunk.index;
}
p->write.descriptor[PA_PSTREAM_DESCRIPTOR_FLAGS] = htonl(flags);
@ -520,7 +541,6 @@ static int do_write(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
pa_memblock *release_memblock = NULL;
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
@ -535,16 +555,9 @@ static int do_write(pa_pstream *p) {
d = (uint8_t*) p->write.descriptor + p->write.index;
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->write.index;
} else {
assert(p->write.data || p->write.memchunk.memblock);
if (p->write.data)
d = p->write.data;
else {
d = (uint8_t*) pa_memblock_acquire(p->write.memchunk.memblock) + p->write.memchunk.index;
release_memblock = p->write.memchunk.memblock;
}
assert(p->write.data);
d = (uint8_t*) d + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
d = (uint8_t*) p->write.data + p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE;
l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
@ -554,17 +567,14 @@ static int do_write(pa_pstream *p) {
if (p->send_creds_now) {
if ((r = pa_iochannel_write_with_creds(p->io, d, l, &p->write_creds)) < 0)
goto fail;
return -1;
p->send_creds_now = 0;
} else
#endif
if ((r = pa_iochannel_write(p->io, d, l)) < 0)
goto fail;
if (release_memblock)
pa_memblock_release(release_memblock);
return -1;
p->write.index += r;
@ -578,20 +588,12 @@ static int do_write(pa_pstream *p) {
}
return 0;
fail:
if (release_memblock)
pa_memblock_release(release_memblock);
return -1;
}
static int do_read(pa_pstream *p) {
void *d;
size_t l;
ssize_t r;
pa_memblock *release_memblock = NULL;
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
@ -600,16 +602,8 @@ static int do_read(pa_pstream *p) {
d = (uint8_t*) p->read.descriptor + p->read.index;
l = PA_PSTREAM_DESCRIPTOR_SIZE - p->read.index;
} else {
assert(p->read.data || p->read.memblock);
if (p->read.data)
d = p->read.data;
else {
d = pa_memblock_acquire(p->read.memblock);
release_memblock = p->read.memblock;
}
d = (uint8_t*) d + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
assert(p->read.data);
d = (uint8_t*) p->read.data + p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE;
l = ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE);
}
@ -618,17 +612,14 @@ static int do_read(pa_pstream *p) {
int b = 0;
if ((r = pa_iochannel_read_with_creds(p->io, d, l, &p->read_creds, &b)) <= 0)
goto fail;
return -1;
p->read_creds_valid = p->read_creds_valid || b;
}
#else
if ((r = pa_iochannel_read(p->io, d, l)) <= 0)
goto fail;
return -1;
#endif
if (release_memblock)
pa_memblock_release(release_memblock);
p->read.index += r;
@ -710,7 +701,7 @@ static int do_read(pa_pstream *p) {
/* Frame is a memblock frame */
p->read.memblock = pa_memblock_new(p->mempool, length);
p->read.data = NULL;
p->read.data = p->read.memblock->data;
} else {
pa_log_warn("Recieved memblock frame with invalid flags value.");
@ -797,7 +788,7 @@ static int do_read(pa_pstream *p) {
chunk.memblock = b;
chunk.index = 0;
chunk.length = pa_memblock_get_length(b);
chunk.length = b->length;
offset = (int64_t) (
(((uint64_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_OFFSET_HI])) << 32) |
@ -825,51 +816,52 @@ frame_done:
p->read.memblock = NULL;
p->read.packet = NULL;
p->read.index = 0;
p->read.data = NULL;
#ifdef HAVE_CREDS
p->read_creds_valid = 0;
#endif
return 0;
fail:
if (release_memblock)
pa_memblock_release(release_memblock);
return -1;
}
void pa_pstream_set_die_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
p->die_callback = cb;
p->die_callback_userdata = userdata;
pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_drain_callback(pa_pstream *p, pa_pstream_notify_cb_t cb, void *userdata) {
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
p->drain_callback = cb;
p->drain_callback_userdata = userdata;
pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_recieve_packet_callback(pa_pstream *p, pa_pstream_packet_cb_t cb, void *userdata) {
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
p->recieve_packet_callback = cb;
p->recieve_packet_callback_userdata = userdata;
pa_mutex_unlock(p->mutex);
}
void pa_pstream_set_recieve_memblock_callback(pa_pstream *p, pa_pstream_memblock_cb_t cb, void *userdata) {
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
p->recieve_memblock_callback = cb;
p->recieve_memblock_callback_userdata = userdata;
pa_mutex_unlock(p->mutex);
}
int pa_pstream_is_pending(pa_pstream *p) {
@ -909,6 +901,8 @@ pa_pstream* pa_pstream_ref(pa_pstream*p) {
void pa_pstream_close(pa_pstream *p) {
assert(p);
pa_mutex_lock(p->mutex);
p->dead = 1;
if (p->import) {
@ -926,16 +920,25 @@ void pa_pstream_close(pa_pstream *p) {
p->io = NULL;
}
if (p->defer_event) {
p->mainloop->defer_free(p->defer_event);
p->defer_event = NULL;
}
p->die_callback = NULL;
p->drain_callback = NULL;
p->recieve_packet_callback = NULL;
p->recieve_memblock_callback = NULL;
pa_mutex_unlock(p->mutex);
}
void pa_pstream_use_shm(pa_pstream *p, int enable) {
assert(p);
assert(PA_REFCNT_VALUE(p) > 0);
pa_mutex_lock(p->mutex);
p->use_shm = enable;
if (enable) {
@ -950,4 +953,6 @@ void pa_pstream_use_shm(pa_pstream *p, int enable) {
p->export = NULL;
}
}
pa_mutex_unlock(p->mutex);
}

View file

@ -51,7 +51,8 @@ struct pa_resampler {
};
struct impl_libsamplerate {
pa_memchunk buf1, buf2, buf3, buf4;
pa_memblock *buf1_block, *buf2_block, *buf3_block, *buf4_block;
float* buf1, *buf2, *buf3, *buf4;
unsigned buf1_samples, buf2_samples, buf3_samples, buf4_samples;
pa_convert_to_float32ne_func_t to_float32ne_func;
@ -223,14 +224,14 @@ static void libsamplerate_free(pa_resampler *r) {
if (u->src_state)
src_delete(u->src_state);
if (u->buf1.memblock)
pa_memblock_unref(u->buf1.memblock);
if (u->buf2.memblock)
pa_memblock_unref(u->buf2.memblock);
if (u->buf3.memblock)
pa_memblock_unref(u->buf3.memblock);
if (u->buf4.memblock)
pa_memblock_unref(u->buf4.memblock);
if (u->buf1_block)
pa_memblock_unref(u->buf1_block);
if (u->buf2_block)
pa_memblock_unref(u->buf2_block);
if (u->buf3_block)
pa_memblock_unref(u->buf3_block);
if (u->buf4_block)
pa_memblock_unref(u->buf4_block);
pa_xfree(u);
}
@ -269,80 +270,64 @@ static void calc_map_table(pa_resampler *r) {
}
}
static pa_memchunk* convert_to_float(pa_resampler *r, pa_memchunk *input) {
static float * convert_to_float(pa_resampler *r, void *input, unsigned n_frames) {
struct impl_libsamplerate *u;
unsigned n_samples;
void *src, *dst;
assert(r);
assert(input);
assert(input->memblock);
assert(r->impl_data);
u = r->impl_data;
/* Convert the incoming sample into floats and place them in buf1 */
if (!u->to_float32ne_func || !input->length)
if (!u->to_float32ne_func)
return input;
n_samples = (input->length / r->i_fz) * r->i_ss.channels;
n_samples = n_frames * r->i_ss.channels;
if (!u->buf1.memblock || u->buf1_samples < n_samples) {
if (u->buf1.memblock)
pa_memblock_unref(u->buf1.memblock);
if (u->buf1_samples < n_samples) {
if (u->buf1_block)
pa_memblock_unref(u->buf1_block);
u->buf1_samples = n_samples;
u->buf1.memblock = pa_memblock_new(r->mempool, u->buf1.length = sizeof(float) * n_samples);
u->buf1.index = 0;
u->buf1_block = pa_memblock_new(r->mempool, sizeof(float) * n_samples);
u->buf1 = u->buf1_block->data;
}
u->to_float32ne_func(n_samples, input, u->buf1);
src = (uint8_t*) pa_memblock_acquire(input->memblock) + input->index;
dst = (uint8_t*) pa_memblock_acquire(u->buf1.memblock);
u->to_float32ne_func(n_samples, src, dst);
pa_memblock_release(input->memblock);
pa_memblock_release(u->buf1.memblock);
u->buf1.length = sizeof(float) * n_samples;
return &u->buf1;
return u->buf1;
}
static pa_memchunk *remap_channels(pa_resampler *r, pa_memchunk *input) {
static float *remap_channels(pa_resampler *r, float *input, unsigned n_frames) {
struct impl_libsamplerate *u;
unsigned n_samples, n_frames;
unsigned n_samples;
int i_skip, o_skip;
unsigned oc;
float *src, *dst;
assert(r);
assert(input);
assert(input->memblock);
assert(r->impl_data);
u = r->impl_data;
/* Remap channels and place the result int buf2 */
if (!u->map_required || !input->length)
if (!u->map_required)
return input;
n_samples = input->length / sizeof(float);
n_frames = n_samples / r->o_ss.channels;
n_samples = n_frames * r->o_ss.channels;
if (!u->buf2.memblock || u->buf2_samples < n_samples) {
if (u->buf2.memblock)
pa_memblock_unref(u->buf2.memblock);
if (u->buf2_samples < n_samples) {
if (u->buf2_block)
pa_memblock_unref(u->buf2_block);
u->buf2_samples = n_samples;
u->buf2.memblock = pa_memblock_new(r->mempool, u->buf2.length = sizeof(float) * n_samples);
u->buf2.index = 0;
u->buf2_block = pa_memblock_new(r->mempool, sizeof(float) * n_samples);
u->buf2 = u->buf2_block->data;
}
src = (float*) ((uint8_t*) pa_memblock_acquire(input->memblock) + input->index);
dst = (float*) pa_memblock_acquire(u->buf2.memblock);
memset(dst, 0, n_samples * sizeof(float));
memset(u->buf2, 0, n_samples * sizeof(float));
o_skip = sizeof(float) * r->o_ss.channels;
i_skip = sizeof(float) * r->i_ss.channels;
@ -353,57 +338,49 @@ static pa_memchunk *remap_channels(pa_resampler *r, pa_memchunk *input) {
for (i = 0; i < PA_CHANNELS_MAX && u->map_table[oc][i] >= 0; i++)
oil_vectoradd_f32(
dst + oc, o_skip,
dst + oc, o_skip,
src + u->map_table[oc][i], i_skip,
u->buf2 + oc, o_skip,
u->buf2 + oc, o_skip,
input + u->map_table[oc][i], i_skip,
n_frames,
&one, &one);
}
pa_memblock_release(input->memblock);
pa_memblock_release(u->buf2.memblock);
u->buf2.length = n_frames * sizeof(float) * r->o_ss.channels;
return &u->buf2;
return u->buf2;
}
static pa_memchunk *resample(pa_resampler *r, pa_memchunk *input) {
static float *resample(pa_resampler *r, float *input, unsigned *n_frames) {
struct impl_libsamplerate *u;
SRC_DATA data;
unsigned in_n_frames, in_n_samples;
unsigned out_n_frames, out_n_samples;
int ret;
assert(r);
assert(input);
assert(n_frames);
assert(r->impl_data);
u = r->impl_data;
/* Resample the data and place the result in buf3 */
if (!u->src_state || !input->length)
if (!u->src_state)
return input;
in_n_samples = input->length / sizeof(float);
in_n_frames = in_n_samples * r->o_ss.channels;
out_n_frames = (in_n_frames*r->o_ss.rate/r->i_ss.rate)+1024;
out_n_frames = (*n_frames*r->o_ss.rate/r->i_ss.rate)+1024;
out_n_samples = out_n_frames * r->o_ss.channels;
if (!u->buf3.memblock || u->buf3_samples < out_n_samples) {
if (u->buf3.memblock)
pa_memblock_unref(u->buf3.memblock);
if (u->buf3_samples < out_n_samples) {
if (u->buf3_block)
pa_memblock_unref(u->buf3_block);
u->buf3_samples = out_n_samples;
u->buf3.memblock = pa_memblock_new(r->mempool, u->buf3.length = sizeof(float) * out_n_samples);
u->buf3.index = 0;
u->buf3_block = pa_memblock_new(r->mempool, sizeof(float) * out_n_samples);
u->buf3 = u->buf3_block->data;
}
data.data_in = (float*) ((uint8_t*) pa_memblock_acquire(input->memblock) + input->index);
data.input_frames = in_n_frames;
data.data_in = input;
data.input_frames = *n_frames;
data.data_out = (float*) pa_memblock_acquire(u->buf3.memblock);
data.data_out = u->buf3;
data.output_frames = out_n_frames;
data.src_ratio = (double) r->o_ss.rate / r->i_ss.rate;
@ -411,20 +388,16 @@ static pa_memchunk *resample(pa_resampler *r, pa_memchunk *input) {
ret = src_process(u->src_state, &data);
assert(ret == 0);
assert((unsigned) data.input_frames_used == in_n_frames);
assert((unsigned) data.input_frames_used == *n_frames);
pa_memblock_release(input->memblock);
pa_memblock_release(u->buf3.memblock);
u->buf3.length = data.output_frames_gen * sizeof(float) * r->o_ss.channels;
*n_frames = data.output_frames_gen;
return &u->buf3;
return u->buf3;
}
static pa_memchunk *convert_from_float(pa_resampler *r, pa_memchunk *input) {
static void *convert_from_float(pa_resampler *r, float *input, unsigned n_frames) {
struct impl_libsamplerate *u;
unsigned n_samples, n_frames;
void *src, *dst;
unsigned n_samples;
assert(r);
assert(input);
@ -433,35 +406,30 @@ static pa_memchunk *convert_from_float(pa_resampler *r, pa_memchunk *input) {
/* Convert the data into the correct sample type and place the result in buf4 */
if (!u->from_float32ne_func || !input->length)
if (!u->from_float32ne_func)
return input;
n_frames = input->length / sizeof(float) / r->o_ss.channels;
n_samples = n_frames * r->o_ss.channels;
if (u->buf4_samples < n_samples) {
if (u->buf4.memblock)
pa_memblock_unref(u->buf4.memblock);
if (u->buf4_block)
pa_memblock_unref(u->buf4_block);
u->buf4_samples = n_samples;
u->buf4.memblock = pa_memblock_new(r->mempool, u->buf4.length = r->o_fz * n_frames);
u->buf4.index = 0;
u->buf4_block = pa_memblock_new(r->mempool, sizeof(float) * n_samples);
u->buf4 = u->buf4_block->data;
}
u->from_float32ne_func(n_samples, input, u->buf4);
src = (uint8_t*) pa_memblock_acquire(input->memblock) + input->length;
dst = pa_memblock_acquire(u->buf4.memblock);
u->from_float32ne_func(n_samples, src, dst);
pa_memblock_release(input->memblock);
pa_memblock_release(u->buf4.memblock);
u->buf4.length = r->o_fz * n_frames;
return &u->buf4;
return u->buf4;
}
static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out) {
struct impl_libsamplerate *u;
pa_memchunk *buf;
float *buf;
void *input, *output;
unsigned n_frames;
assert(r);
assert(in);
@ -473,23 +441,55 @@ static void libsamplerate_run(pa_resampler *r, const pa_memchunk *in, pa_memchun
u = r->impl_data;
buf = convert_to_float(r, (pa_memchunk*) in);
buf = remap_channels(r, buf);
buf = resample(r, buf);
if (buf->length) {
buf = convert_from_float(r, buf);
*out = *buf;
if (buf == in)
pa_memblock_ref(buf->memblock);
else
pa_memchunk_reset(buf);
} else
pa_memchunk_reset(out);
pa_memblock_release(in->memblock);
input = ((uint8_t*) in->memblock->data + in->index);
n_frames = in->length / r->i_fz;
assert(n_frames > 0);
buf = convert_to_float(r, input, n_frames);
buf = remap_channels(r, buf, n_frames);
buf = resample(r, buf, &n_frames);
if (n_frames) {
output = convert_from_float(r, buf, n_frames);
if (output == input) {
/* Mm, no adjustment has been necessary, so let's return the original block */
out->memblock = pa_memblock_ref(in->memblock);
out->index = in->index;
out->length = in->length;
} else {
out->length = n_frames * r->o_fz;
out->index = 0;
out->memblock = NULL;
if (output == u->buf1) {
u->buf1 = NULL;
u->buf1_samples = 0;
out->memblock = u->buf1_block;
u->buf1_block = NULL;
} else if (output == u->buf2) {
u->buf2 = NULL;
u->buf2_samples = 0;
out->memblock = u->buf2_block;
u->buf2_block = NULL;
} else if (output == u->buf3) {
u->buf3 = NULL;
u->buf3_samples = 0;
out->memblock = u->buf3_block;
u->buf3_block = NULL;
} else if (output == u->buf4) {
u->buf4 = NULL;
u->buf4_samples = 0;
out->memblock = u->buf4_block;
u->buf4_block = NULL;
}
assert(out->memblock);
}
} else {
out->memblock = NULL;
out->index = out->length = 0;
}
}
static void libsamplerate_update_input_rate(pa_resampler *r, uint32_t rate) {
@ -516,10 +516,8 @@ static int libsamplerate_init(pa_resampler *r) {
r->impl_data = u = pa_xnew(struct impl_libsamplerate, 1);
pa_memchunk_reset(&u->buf1);
pa_memchunk_reset(&u->buf2);
pa_memchunk_reset(&u->buf3);
pa_memchunk_reset(&u->buf4);
u->buf1 = u->buf2 = u->buf3 = u->buf4 = NULL;
u->buf1_block = u->buf2_block = u->buf3_block = u->buf4_block = NULL;
u->buf1_samples = u->buf2_samples = u->buf3_samples = u->buf4_samples = 0;
if (r->i_ss.format == PA_SAMPLE_FLOAT32NE)
@ -580,16 +578,12 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out
/* Do real resampling */
size_t l;
unsigned o_index;
void *src, *dst;
/* The length of the new memory block rounded up */
l = ((((n_frames+1) * r->o_ss.rate) / r->i_ss.rate) + 1) * fz;
out->index = 0;
out->memblock = pa_memblock_new(r->mempool, l);
src = (uint8_t*) pa_memblock_acquire(in->memblock) + in->index;
dst = pa_memblock_acquire(out->memblock);
for (o_index = 0;; o_index++, u->o_counter++) {
unsigned j;
@ -600,16 +594,13 @@ static void trivial_run(pa_resampler *r, const pa_memchunk *in, pa_memchunk *out
if (j >= n_frames)
break;
assert(o_index*fz < pa_memblock_get_length(out->memblock));
assert(o_index*fz < out->memblock->length);
memcpy((uint8_t*) dst + fz*o_index,
(uint8_t*) src + fz*j, fz);
memcpy((uint8_t*) out->memblock->data + fz*o_index,
(uint8_t*) in->memblock->data + in->index + fz*j, fz);
}
pa_memblock_release(in->memblock);
pa_memblock_release(out->memblock);
out->length = o_index*fz;
}

View file

@ -46,27 +46,15 @@ pa_memblock *pa_silence_memblock_new(pa_mempool *pool, const pa_sample_spec *spe
}
pa_memblock *pa_silence_memblock(pa_memblock* b, const pa_sample_spec *spec) {
void *data;
assert(b);
assert(spec);
data = pa_memblock_acquire(b);
pa_silence_memory(data, pa_memblock_get_length(b), spec);
pa_memblock_release(b);
assert(b && b->data && spec);
pa_silence_memory(b->data, b->length, spec);
return b;
}
void pa_silence_memchunk(pa_memchunk *c, const pa_sample_spec *spec) {
void *data;
assert(c);
assert(c->memblock);
assert(spec);
assert(c && c->memblock && c->memblock->data && spec && c->length);
data = pa_memblock_acquire(c->memblock);
pa_silence_memory((uint8_t*) data+c->index, c->length, spec);
pa_memblock_release(c->memblock);
pa_silence_memory((uint8_t*) c->memblock->data+c->index, c->length, spec);
}
void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec) {
@ -94,38 +82,26 @@ void pa_silence_memory(void *p, size_t length, const pa_sample_spec *spec) {
}
size_t pa_mix(
pa_mix_info streams[],
unsigned nstreams,
void *data,
size_t length,
const pa_sample_spec *spec,
const pa_cvolume *volume,
int mute) {
pa_cvolume full_volume;
size_t d = 0;
unsigned k;
const pa_mix_info streams[],
unsigned nstreams,
void *data,
size_t length,
const pa_sample_spec *spec,
const pa_cvolume *volume,
int mute) {
assert(streams);
assert(data);
assert(length);
assert(spec);
assert(streams && data && length && spec);
if (!volume)
volume = pa_cvolume_reset(&full_volume, spec->channels);
for (k = 0; k < nstreams; k++)
streams[k].internal = pa_memblock_acquire(streams[k].chunk.memblock);
switch (spec->format) {
case PA_SAMPLE_S16NE:{
size_t d;
unsigned channel = 0;
for (d = 0;; d += sizeof(int16_t)) {
int32_t sum = 0;
if (d >= length)
goto finish;
return d;
if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {
unsigned i;
@ -135,12 +111,12 @@ size_t pa_mix(
pa_volume_t cvolume = streams[i].volume.values[channel];
if (d >= streams[i].chunk.length)
goto finish;
return d;
if (cvolume == PA_VOLUME_MUTED)
v = 0;
else {
v = *((int16_t*) ((uint8_t*) streams[i].internal + streams[i].chunk.index + d));
v = *((int16_t*) ((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d));
if (cvolume != PA_VOLUME_NORM)
v = (int32_t) (v * pa_sw_volume_to_linear(cvolume));
@ -163,18 +139,17 @@ size_t pa_mix(
if (++channel >= spec->channels)
channel = 0;
}
break;
}
case PA_SAMPLE_S16RE:{
size_t d;
unsigned channel = 0;
for (d = 0;; d += sizeof(int16_t)) {
int32_t sum = 0;
if (d >= length)
goto finish;
return d;
if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {
unsigned i;
@ -184,12 +159,12 @@ size_t pa_mix(
pa_volume_t cvolume = streams[i].volume.values[channel];
if (d >= streams[i].chunk.length)
goto finish;
return d;
if (cvolume == PA_VOLUME_MUTED)
v = 0;
else {
v = INT16_SWAP(*((int16_t*) ((uint8_t*) streams[i].internal + streams[i].chunk.index + d)));
v = INT16_SWAP(*((int16_t*) ((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d)));
if (cvolume != PA_VOLUME_NORM)
v = (int32_t) (v * pa_sw_volume_to_linear(cvolume));
@ -212,18 +187,17 @@ size_t pa_mix(
if (++channel >= spec->channels)
channel = 0;
}
break;
}
case PA_SAMPLE_U8: {
size_t d;
unsigned channel = 0;
for (d = 0;; d ++) {
int32_t sum = 0;
if (d >= length)
goto finish;
return d;
if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {
unsigned i;
@ -233,12 +207,12 @@ size_t pa_mix(
pa_volume_t cvolume = streams[i].volume.values[channel];
if (d >= streams[i].chunk.length)
goto finish;
return d;
if (cvolume == PA_VOLUME_MUTED)
v = 0;
else {
v = (int32_t) *((uint8_t*) streams[i].internal + streams[i].chunk.index + d) - 0x80;
v = (int32_t) *((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d) - 0x80;
if (cvolume != PA_VOLUME_NORM)
v = (int32_t) (v * pa_sw_volume_to_linear(cvolume));
@ -261,18 +235,17 @@ size_t pa_mix(
if (++channel >= spec->channels)
channel = 0;
}
break;
}
case PA_SAMPLE_FLOAT32NE: {
size_t d;
unsigned channel = 0;
for (d = 0;; d += sizeof(float)) {
float sum = 0;
if (d >= length)
goto finish;
return d;
if (!mute && volume->values[channel] != PA_VOLUME_MUTED) {
unsigned i;
@ -282,12 +255,12 @@ size_t pa_mix(
pa_volume_t cvolume = streams[i].volume.values[channel];
if (d >= streams[i].chunk.length)
goto finish;
return d;
if (cvolume == PA_VOLUME_MUTED)
v = 0;
else {
v = *((float*) ((uint8_t*) streams[i].internal + streams[i].chunk.index + d));
v = *((float*) ((uint8_t*) streams[i].chunk.memblock->data + streams[i].chunk.index + d));
if (cvolume != PA_VOLUME_NORM)
v *= pa_sw_volume_to_linear(cvolume);
@ -306,34 +279,17 @@ size_t pa_mix(
if (++channel >= spec->channels)
channel = 0;
}
break;
}
default:
pa_log_error("ERROR: Unable to mix audio data of format %s.", pa_sample_format_to_string(spec->format));
abort();
}
finish:
for (k = 0; k < nstreams; k++)
pa_memblock_release(streams[k].chunk.memblock);
return d;
}
void pa_volume_memchunk(
pa_memchunk*c,
const pa_sample_spec *spec,
const pa_cvolume *volume) {
void *ptr;
assert(c);
assert(spec);
assert(c->length % pa_frame_size(spec) == 0);
void pa_volume_memchunk(pa_memchunk*c, const pa_sample_spec *spec, const pa_cvolume *volume) {
assert(c && spec && (c->length % pa_frame_size(spec) == 0));
assert(volume);
if (pa_cvolume_channels_equal_to(volume, PA_VOLUME_NORM))
@ -344,8 +300,6 @@ void pa_volume_memchunk(
return;
}
ptr = pa_memblock_acquire(c->memblock);
switch (spec->format) {
case PA_SAMPLE_S16NE: {
int16_t *d;
@ -356,7 +310,7 @@ void pa_volume_memchunk(
for (channel = 0; channel < spec->channels; channel++)
linear[channel] = pa_sw_volume_to_linear(volume->values[channel]);
for (channel = 0, d = (int16_t*) ((uint8_t*) ptr + c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) {
for (channel = 0, d = (int16_t*) ((uint8_t*) c->memblock->data+c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) {
int32_t t = (int32_t)(*d);
t = (int32_t) (t * linear[channel]);
@ -381,7 +335,7 @@ void pa_volume_memchunk(
for (channel = 0; channel < spec->channels; channel++)
linear[channel] = pa_sw_volume_to_linear(volume->values[channel]);
for (channel = 0, d = (int16_t*) ((uint8_t*) ptr + c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) {
for (channel = 0, d = (int16_t*) ((uint8_t*) c->memblock->data+c->index), n = c->length/sizeof(int16_t); n > 0; d++, n--) {
int32_t t = (int32_t)(INT16_SWAP(*d));
t = (int32_t) (t * linear[channel]);
@ -403,7 +357,7 @@ void pa_volume_memchunk(
size_t n;
unsigned channel = 0;
for (d = (uint8_t*) ptr + c->index, n = c->length; n > 0; d++, n--) {
for (d = (uint8_t*) c->memblock->data + c->index, n = c->length; n > 0; d++, n--) {
int32_t t = (int32_t) *d - 0x80;
t = (int32_t) (t * pa_sw_volume_to_linear(volume->values[channel]));
@ -425,7 +379,7 @@ void pa_volume_memchunk(
unsigned n;
unsigned channel;
d = (float*) ((uint8_t*) ptr + c->index);
d = (float*) ((uint8_t*) c->memblock->data + c->index);
skip = spec->channels * sizeof(float);
n = c->length/sizeof(float)/spec->channels;
@ -448,7 +402,5 @@ void pa_volume_memchunk(
pa_sample_format_to_string(spec->format));
abort();
}
pa_memblock_release(c->memblock);
}

View file

@ -36,11 +36,10 @@ typedef struct pa_mix_info {
pa_memchunk chunk;
pa_cvolume volume;
void *userdata;
void *internal; /* Used internally by pa_mix(), should not be initialised when calling pa_mix() */
} pa_mix_info;
size_t pa_mix(
pa_mix_info channels[],
const pa_mix_info channels[],
unsigned nchannels,
void *data,
size_t length,

View file

@ -294,7 +294,6 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
assert(i->state == PA_SINK_INPUT_RUNNING || i->state == PA_SINK_INPUT_DRAINED);
if (i->move_silence > 0) {
size_t l;
/* We have just been moved and shall play some silence for a
* while until the old sink has drained its playback buffer */
@ -304,8 +303,7 @@ int pa_sink_input_peek(pa_sink_input *i, pa_memchunk *chunk, pa_cvolume *volume)
chunk->memblock = pa_memblock_ref(i->silence_memblock);
chunk->index = 0;
l = pa_memblock_get_length(chunk->memblock);
chunk->length = i->move_silence < l ? i->move_silence : l;
chunk->length = i->move_silence < chunk->memblock->length ? i->move_silence : chunk->memblock->length;
ret = 0;
do_volume_adj_here = 1;
@ -391,13 +389,10 @@ void pa_sink_input_drop(pa_sink_input *i, const pa_memchunk *chunk, size_t lengt
if (i->move_silence > 0) {
if (chunk) {
size_t l;
l = pa_memblock_get_length(i->silence_memblock);
if (chunk->memblock != i->silence_memblock ||
chunk->index != 0 ||
(chunk->memblock && (chunk->length != (l < i->move_silence ? l : i->move_silence))))
(chunk->memblock && (chunk->length != (i->silence_memblock->length < i->move_silence ? i->silence_memblock->length : i->move_silence))))
return;
}

View file

@ -237,6 +237,7 @@ static unsigned fill_mix_info(pa_sink *s, pa_mix_info *info, unsigned maxinfo) {
info->userdata = i;
assert(info->chunk.memblock);
assert(info->chunk.memblock->data);
assert(info->chunk.length);
info++;
@ -304,16 +305,13 @@ int pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
pa_volume_memchunk(result, &s->sample_spec, &volume);
}
} else {
void *ptr;
result->memblock = pa_memblock_new(s->core->mempool, length);
assert(result->memblock);
/* pa_log("mixing %i", n); */
ptr = pa_memblock_acquire(result->memblock);
result->length = pa_mix(info, n, ptr, length, &s->sample_spec, &s->sw_volume, s->sw_muted);
pa_memblock_release(result->memblock);
result->length = pa_mix(info, n, result->memblock->data, length,
&s->sample_spec, &s->sw_volume, s->sw_muted);
result->index = 0;
}
@ -334,13 +332,13 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
pa_mix_info info[MAX_MIX_CHANNELS];
unsigned n;
int r = -1;
void *ptr;
assert(s);
assert(s->ref >= 1);
assert(target);
assert(target->memblock);
assert(target->length);
assert(target->memblock->data);
pa_sink_ref(s);
@ -349,23 +347,16 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
if (n <= 0)
goto finish;
ptr = pa_memblock_acquire(target->memblock);
if (n == 1) {
void *src;
pa_cvolume volume;
if (target->length > info[0].chunk.length)
target->length = info[0].chunk.length;
src = pa_memblock_acquire(info[0].chunk.memblock);
memcpy((uint8_t*) ptr + target->index,
(uint8_t*) src + info[0].chunk.index,
memcpy((uint8_t*) target->memblock->data + target->index,
(uint8_t*) info[0].chunk.memblock->data + info[0].chunk.index,
target->length);
pa_memblock_release(info[0].chunk.memblock);
pa_sw_cvolume_multiply(&volume, &s->sw_volume, &info[0].volume);
if (s->sw_muted)
@ -374,13 +365,11 @@ int pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
pa_volume_memchunk(target, &s->sample_spec, &volume);
} else
target->length = pa_mix(info, n,
(uint8_t*) ptr + target->index,
(uint8_t*) target->memblock->data + target->index,
target->length,
&s->sample_spec,
&s->sw_volume,
s->sw_muted);
pa_memblock_release(target->memblock);
inputs_drop(s, info, n, target->length);
@ -404,6 +393,7 @@ void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target) {
assert(target);
assert(target->memblock);
assert(target->length);
assert(target->memblock->data);
pa_sink_ref(s);

View file

@ -74,26 +74,21 @@ static int sink_input_peek(pa_sink_input *i, pa_memchunk *chunk) {
if (!u->memchunk.memblock) {
uint32_t fs = pa_frame_size(&i->sample_spec);
sf_count_t n;
void *p;
u->memchunk.memblock = pa_memblock_new(i->sink->core->mempool, BUF_SIZE);
u->memchunk.index = 0;
p = pa_memblock_acquire(u->memchunk.memblock);
if (u->readf_function) {
if ((n = u->readf_function(u->sndfile, p, BUF_SIZE/fs)) <= 0)
if ((n = u->readf_function(u->sndfile, u->memchunk.memblock->data, BUF_SIZE/fs)) <= 0)
n = 0;
u->memchunk.length = n * fs;
} else {
if ((n = sf_read_raw(u->sndfile, p, BUF_SIZE)) <= 0)
if ((n = sf_read_raw(u->sndfile, u->memchunk.memblock->data, BUF_SIZE)) <= 0)
n = 0;
u->memchunk.length = n;
}
pa_memblock_release(u->memchunk.memblock);
if (!u->memchunk.length) {
free_userdata(u);

View file

@ -40,11 +40,7 @@ int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss,
int ret = -1;
size_t l;
sf_count_t (*readf_function)(SNDFILE *sndfile, void *ptr, sf_count_t frames) = NULL;
void *ptr = NULL;
assert(fname);
assert(ss);
assert(chunk);
assert(fname && ss && chunk);
chunk->memblock = NULL;
chunk->index = chunk->length = 0;
@ -101,10 +97,8 @@ int pa_sound_file_load(pa_mempool *pool, const char *fname, pa_sample_spec *ss,
chunk->index = 0;
chunk->length = l;
ptr = pa_memblock_acquire(chunk->memblock);
if ((readf_function && readf_function(sf, ptr, sfinfo.frames) != sfinfo.frames) ||
(!readf_function && sf_read_raw(sf, ptr, l) != l)) {
if ((readf_function && readf_function(sf, chunk->memblock->data, sfinfo.frames) != sfinfo.frames) ||
(!readf_function && sf_read_raw(sf, chunk->memblock->data, l) != l)) {
pa_log("Premature file end");
goto finish;
}
@ -116,9 +110,6 @@ finish:
if (sf)
sf_close(sf);
if (ptr)
pa_memblock_release(chunk->memblock);
if (ret != 0 && chunk->memblock)
pa_memblock_unref(chunk->memblock);

View file

@ -54,7 +54,7 @@ static void thread_func(void *data) {
int b = 1;
while (!quit) {
char *text;
char *text, *t;
/* Allocate some memory, if possible take it from the flist */
if (b && (text = pa_flist_pop(flist)))

View file

@ -59,27 +59,24 @@ int main(PA_GCC_UNUSED int argc, PA_GCC_UNUSED char *argv[]) {
c.index = c.length = 0;
}
assert(c.index < pa_memblock_get_length(c.memblock));
assert(c.index < c.memblock->length);
l = pa_memblock_get_length(c.memblock) - c.index;
l = c.memblock->length - c.index;
l = l <= 1 ? l : rand() % (l-1) +1 ;
p = pa_memblock_acquire(c.memblock);
if ((r = read(STDIN_FILENO, (uint8_t*) p + c.index, l)) <= 0) {
pa_memblock_release(c.memblock);
if ((r = read(STDIN_FILENO, (uint8_t*) c.memblock->data + c.index, l)) <= 0) {
fprintf(stderr, "read() failed: %s\n", r < 0 ? strerror(errno) : "EOF");
break;
}
pa_memblock_release(c.memblock);
c.length = r;
pa_mcalign_push(a, &c);
fprintf(stderr, "Read %ld bytes\n", (long)r);
c.index += r;
if (c.index >= pa_memblock_get_length(c.memblock)) {
if (c.index >= c.memblock->length) {
pa_memblock_unref(c.memblock);
pa_memchunk_reset(&c);
}
@ -90,9 +87,7 @@ int main(PA_GCC_UNUSED int argc, PA_GCC_UNUSED char *argv[]) {
if (pa_mcalign_pop(a, &t) < 0)
break;
p = pa_memblock_acquire(t.memblock);
pa_loop_write(STDOUT_FILENO, (uint8_t*) p + t.index, t.length, NULL);
pa_memblock_release(t.memblock);
pa_loop_write(STDOUT_FILENO, (uint8_t*) t.memblock->data + t.index, t.length, NULL);
fprintf(stderr, "Wrote %lu bytes.\n", (unsigned long) t.length);
pa_memblock_unref(t.memblock);

View file

@ -76,7 +76,6 @@ int main(int argc, char *argv[]) {
pa_memblock* blocks[5];
uint32_t id, shm_id;
size_t offset, size;
char *x;
const char txt[] = "This is a test!";
@ -91,17 +90,10 @@ int main(int argc, char *argv[]) {
assert(pool_a && pool_b && pool_c);
blocks[0] = pa_memblock_new_fixed(pool_a, (void*) txt, sizeof(txt), 1);
blocks[1] = pa_memblock_new(pool_a, sizeof(txt));
x = pa_memblock_acquire(blocks[1]);
snprintf(x, pa_memblock_get_length(blocks[1]), "%s", txt);
pa_memblock_release(blocks[1]);
snprintf(blocks[1]->data, blocks[1]->length, "%s", txt);
blocks[2] = pa_memblock_new_pool(pool_a, sizeof(txt));
x = pa_memblock_acquire(blocks[2]);
snprintf(x, pa_memblock_get_length(blocks[2]), "%s", txt);
pa_memblock_release(blocks[1]);
snprintf(blocks[2]->data, blocks[2]->length, "%s", txt);
blocks[3] = pa_memblock_new_malloced(pool_a, pa_xstrdup(txt), sizeof(txt));
blocks[4] = NULL;
@ -138,18 +130,14 @@ int main(int argc, char *argv[]) {
mb_c = pa_memimport_get(import_c, id, shm_id, offset, size);
assert(mb_c);
x = pa_memblock_acquire(mb_c);
printf("1 data=%s\n", x);
pa_memblock_release(mb_c);
printf("1 data=%s\n", (char*) mb_c->data);
print_stats(pool_a, "A");
print_stats(pool_b, "B");
print_stats(pool_c, "C");
pa_memexport_free(export_b);
x = pa_memblock_acquire(mb_c);
printf("2 data=%s\n", x);
pa_memblock_release(mb_c);
printf("2 data=%s\n", (char*) mb_c->data);
pa_memblock_unref(mb_c);
pa_memimport_free(import_b);

View file

@ -131,10 +131,8 @@ int main(int argc, char *argv[]) {
if (pa_memblockq_peek(bq, &out) < 0)
break;
p = pa_memblock_acquire(out.memblock);
for (e = (char*) p + out.index, n = 0; n < out.length; n++)
for (e = (char*) out.memblock->data + out.index, n = 0; n < out.length; n++)
printf("%c", *e);
pa_memblock_release(out.memblock);
pa_memblock_unref(out.memblock);
pa_memblockq_drop(bq, &out, out.length);