srbchannel: Trivial whitespace and style fixes

Mostly to improve readability and make things a bit more consistent.
This commit is contained in:
Arun Raghavan 2014-08-06 07:34:24 +05:30
parent 096c8903a9
commit a27e6d6d9e
2 changed files with 53 additions and 18 deletions

View file

@ -31,11 +31,12 @@
/* #define DEBUG_SRBCHANNEL */
/* This ringbuffer might be useful in other contexts too, but
right now it's only used inside the srbchannel, so let's keep it here
for the time being. */
* right now it's only used inside the srbchannel, so let's keep it here
* for the time being. */
typedef struct pa_ringbuffer pa_ringbuffer;
struct pa_ringbuffer {
pa_atomic_t *count;
pa_atomic_t *count; /* amount of data in the buffer, can be negative */
int capacity;
uint8_t *memory;
int readindex, writeindex;
@ -43,24 +44,30 @@ struct pa_ringbuffer {
static void *pa_ringbuffer_peek(pa_ringbuffer *r, int *count) {
int c = pa_atomic_load(r->count);
if (r->readindex + c > r->capacity)
*count = r->capacity - r->readindex;
else
*count = c;
return r->memory + r->readindex;
}
/* Returns true only if the buffer was completely full before the drop. */
static bool pa_ringbuffer_drop(pa_ringbuffer *r, int count) {
bool b = pa_atomic_sub(r->count, count) >= r->capacity;
r->readindex += count;
r->readindex %= r->capacity;
return b;
}
static void *pa_ringbuffer_begin_write(pa_ringbuffer *r, int *count) {
int c = pa_atomic_load(r->count);
*count = PA_MIN(r->capacity - r->writeindex, r->capacity - c);
return r->memory + r->writeindex;
}
@ -74,34 +81,40 @@ struct pa_srbchannel {
pa_ringbuffer rb_read, rb_write;
pa_fdsem *sem_read, *sem_write;
pa_memblock *memblock;
void *cb_userdata;
pa_srbchannel_cb_t callback;
pa_io_event *read_event;
pa_mainloop_api *mainloop;
};
/* We always listen to sem_read, and always signal on sem_write.
This means we signal the same semaphore for two scenarios:
1) We have written something to our send buffer, and want the other
side to read it
2) We have read something from our receive buffer that was previously
completely full, and want the other side to continue writing
*
* This means we signal the same semaphore for two scenarios:
* 1) We have written something to our send buffer, and want the other
* side to read it
* 2) We have read something from our receive buffer that was previously
* completely full, and want the other side to continue writing
*/
size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
size_t written = 0;
while (l > 0) {
int towrite;
void *ptr = pa_ringbuffer_begin_write(&sr->rb_write, &towrite);
if ((size_t) towrite > l)
towrite = l;
if (towrite == 0) {
#ifdef DEBUG_SRBCHANNEL
pa_log("srbchannel output buffer full");
#endif
break;
}
memcpy(ptr, data, towrite);
pa_ringbuffer_end_write(&sr->rb_write, towrite);
written += towrite;
@ -111,20 +124,26 @@ size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l) {
#ifdef DEBUG_SRBCHANNEL
pa_log("Wrote %d bytes to srbchannel, signalling fdsem", (int) written);
#endif
pa_fdsem_post(sr->sem_write);
return written;
}
size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
size_t isread = 0;
while (l > 0) {
int toread;
void *ptr = pa_ringbuffer_peek(&sr->rb_read, &toread);
if ((size_t) toread > l)
toread = l;
if (toread == 0)
break;
memcpy(data, ptr, toread);
if (pa_ringbuffer_drop(&sr->rb_read, toread)) {
#ifdef DEBUG_SRBCHANNEL
pa_log("Read from full output buffer, signalling fdsem");
@ -136,9 +155,11 @@ size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
data = (uint8_t*) data + toread;
l -= toread;
}
#ifdef DEBUG_SRBCHANNEL
pa_log("Read %d bytes from srbchannel", (int) isread);
#endif
return isread;
}
@ -147,11 +168,14 @@ size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l) {
struct srbheader {
pa_atomic_t read_count;
pa_atomic_t write_count;
pa_fdsem_data read_semdata;
pa_fdsem_data write_semdata;
int capacity;
int readbuf_offset;
int writebuf_offset;
/* TODO: Maybe a marker here to make sure we talk to a server with equally sized struct */
};
@ -163,13 +187,14 @@ static void srbchannel_rwloop(pa_srbchannel* sr) {
pa_log("In rw loop from srbchannel, before callback, count = %d", q);
#endif
if (sr->callback)
if (sr->callback) {
if (!sr->callback(sr, sr->cb_userdata)) {
#ifdef DEBUG_SRBCHANNEL
pa_log("Aborting read loop from srbchannel");
#endif
return;
}
}
#ifdef DEBUG_SRBCHANNEL
pa_ringbuffer_peek(&sr->rb_read, &q);
@ -199,14 +224,19 @@ pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
sr->rb_read.memory = (uint8_t*) srh + PA_ALIGN(sizeof(*srh));
srh->readbuf_offset = sr->rb_read.memory - (uint8_t*) srh;
capacity = (pa_memblock_get_length(sr->memblock) - srh->readbuf_offset) / 2;
sr->rb_write.memory = PA_ALIGN_PTR(sr->rb_read.memory + capacity);
srh->writebuf_offset = sr->rb_write.memory - (uint8_t*) srh;
capacity = PA_MIN(capacity, srh->writebuf_offset - srh->readbuf_offset);
pa_log_debug("SHM block is %d bytes, ringbuffer capacity is 2 * %d bytes",
(int) pa_memblock_get_length(sr->memblock), capacity);
srh->capacity = sr->rb_read.capacity = sr->rb_write.capacity = capacity;
sr->rb_read.count = &srh->read_count;
sr->rb_write.count = &srh->write_count;
@ -219,9 +249,11 @@ pa_srbchannel* pa_srbchannel_new(pa_mainloop_api *m, pa_mempool *p) {
goto fail;
readfd = pa_fdsem_get(sr->sem_read);
#ifdef DEBUG_SRBCHANNEL
pa_log("Enabling io event on fd %d", readfd);
#endif
sr->read_event = m->io_new(m, readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);
@ -235,6 +267,7 @@ fail:
static void pa_srbchannel_swap(pa_srbchannel *sr) {
pa_srbchannel temp = *sr;
sr->sem_read = temp.sem_write;
sr->sem_write = temp.sem_read;
sr->rb_read = temp.rb_write;
@ -255,6 +288,7 @@ pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel
sr->rb_read.capacity = sr->rb_write.capacity = srh->capacity;
sr->rb_read.count = &srh->read_count;
sr->rb_write.count = &srh->write_count;
sr->rb_read.memory = (uint8_t*) srh + srh->readbuf_offset;
sr->rb_write.memory = (uint8_t*) srh + srh->writebuf_offset;
@ -272,6 +306,7 @@ pa_srbchannel* pa_srbchannel_new_from_template(pa_mainloop_api *m, pa_srbchannel
#ifdef DEBUG_SRBCHANNEL
pa_log("Enabling io event on fd %d", t->readfd);
#endif
sr->read_event = m->io_new(m, t->readfd, PA_IO_EVENT_INPUT, semread_cb, sr);
m->io_enable(sr->read_event, PA_IO_EVENT_INPUT);

View file

@ -27,7 +27,7 @@
#include <pulsecore/memblock.h>
/* An shm ringbuffer that is used for low overhead server-client communication.
Signaling is done through eventfd semaphores (pa_fdsem). */
* Signaling is done through eventfd semaphores (pa_fdsem). */
typedef struct pa_srbchannel pa_srbchannel;
@ -48,13 +48,13 @@ size_t pa_srbchannel_write(pa_srbchannel *sr, const void *data, size_t l);
size_t pa_srbchannel_read(pa_srbchannel *sr, void *data, size_t l);
/* Set the callback function that is called whenever data becomes available for reading.
It can also be called if the output buffer was full and can now be written to.
Return false to abort all processing (e g if the srbchannel has been freed during the callback).
Otherwise return true.
Note that the callback will be called immediately, to be able to process stuff that
might already be in the buffer.
* It can also be called if the output buffer was full and can now be written to.
*
* Return false to abort all processing (e g if the srbchannel has been freed during the callback).
* Otherwise return true.
*
* Note that the callback will be called immediately, to be able to process stuff that
* might already be in the buffer.
*/
typedef bool (*pa_srbchannel_cb_t)(pa_srbchannel *sr, void *userdata);
void pa_srbchannel_set_callback(pa_srbchannel *sr, pa_srbchannel_cb_t callback, void *userdata);