draining ind native protocol

fixes in callback code on object destruction
simple protocol


git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@52 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2004-07-07 00:22:46 +00:00
parent f8cbde54da
commit e8d1185c42
38 changed files with 843 additions and 364 deletions

View file

@ -220,6 +220,7 @@ libpolyp_la_SOURCES = polyp.c polyp.h \
protocol-native-spec.h \ protocol-native-spec.h \
mainloop-api.c mainloop-api.h \ mainloop-api.c mainloop-api.h \
mainloop.c mainloop.h \ mainloop.c mainloop.h \
mainloop-signal.c mainloop-signal.h \
idxset.c idxset.h \ idxset.c idxset.h \
util.c util.h \ util.c util.h \
memblock.c memblock.h \ memblock.c memblock.h \
@ -237,12 +238,13 @@ libpolyp_error_la_CFLAGS = $(AM_CFLAGS)
libpolyp_simple_la_SOURCES = simple.c simple.h libpolyp_simple_la_SOURCES = simple.c simple.h
libpolyp_simple_la_CFLAGS = $(AM_CFLAGS) libpolyp_simple_la_CFLAGS = $(AM_CFLAGS)
libpolyp_simple_la_LIBADD = libpolyp.la #libpolyp-error.la libpolyp_simple_la_LIBADD = libpolyp.la
#libpolyp-error.la
pacat_SOURCES = pacat.c #$(libpolyp_la_SOURCES) pacat_SOURCES = pacat.c $(libpolyp_la_SOURCES) $(libpolyp_error_la_SOURCES)
pacat_LDADD = libpolyp.la #pacat_LDADD = libpolyp.la
pacat_CFLAGS = $(AM_CFLAGS) pacat_CFLAGS = $(AM_CFLAGS)
pacat_simple_SOURCES = pacat-simple.c pacat_simple_SOURCES = pacat-simple.c $(libpolyp_la_SOURCES) $(libpolyp_simple_la_SOURCES) $(libpolyp_error_la_SOURCES)
pacat_simple_LDADD = libpolyp-simple.a #pacat_simple_LDADD = libpolyp-simple.la libpolyp-error.la
pacat_simple_CFLAGS = $(AM_CFLAGS) pacat_simple_CFLAGS = $(AM_CFLAGS)

View file

@ -59,7 +59,7 @@ char *pa_client_list_to_string(struct pa_core *c) {
pa_strbuf_printf(s, "%u client(s).\n", pa_idxset_ncontents(c->clients)); pa_strbuf_printf(s, "%u client(s).\n", pa_idxset_ncontents(c->clients));
for (client = pa_idxset_first(c->clients, &index); client; client = pa_idxset_next(c->clients, &index)) for (client = pa_idxset_first(c->clients, &index); client; client = pa_idxset_next(c->clients, &index))
pa_strbuf_printf(s, " index: %u, name: <%s>, protocol_name: <%s>\n", client->index, client->name, client->protocol_name); pa_strbuf_printf(s, " index: %u\n\tname: <%s>\n\tprotocol_name: <%s>\n", client->index, client->name, client->protocol_name);
return pa_strbuf_tostring_free(s); return pa_strbuf_tostring_free(s);
} }

View file

@ -7,6 +7,7 @@
#include "sink.h" #include "sink.h"
#include "source.h" #include "source.h"
#include "namereg.h" #include "namereg.h"
#include "util.h"
struct pa_core* pa_core_new(struct pa_mainloop_api *m) { struct pa_core* pa_core_new(struct pa_mainloop_api *m) {
struct pa_core* c; struct pa_core* c;
@ -24,6 +25,8 @@ struct pa_core* pa_core_new(struct pa_mainloop_api *m) {
c->modules = NULL; c->modules = NULL;
c->namereg = NULL; c->namereg = NULL;
pa_check_for_sigpipe();
return c; return c;
}; };

View file

@ -4,6 +4,8 @@
#include <sys/types.h> #include <sys/types.h>
#include "mainloop-api.h" #include "mainloop-api.h"
/* It is safe to destroy the calling iochannel object from the callback */
struct pa_iochannel; struct pa_iochannel;
struct pa_iochannel* pa_iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd); struct pa_iochannel* pa_iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd);

View file

@ -12,10 +12,16 @@
static struct pa_mainloop *mainloop; static struct pa_mainloop *mainloop;
static void signal_callback(void *id, int sig, void *userdata) { static void exit_signal_callback(void *id, int sig, void *userdata) {
struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop); struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop);
m->quit(m, 1); m->quit(m, 1);
fprintf(stderr, "main: got signal.\n"); fprintf(stderr, __FILE__": got signal.\n");
}
static void aux_signal_callback(void *id, int sig, void *userdata) {
struct pa_core *c = userdata;
assert(c);
pa_module_load(c, sig == SIGUSR1 ? "module-cli" : "module-cli-protocol-unix", NULL);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
@ -30,12 +36,12 @@ int main(int argc, char *argv[]) {
r = pa_signal_init(pa_mainloop_get_api(mainloop)); r = pa_signal_init(pa_mainloop_get_api(mainloop));
assert(r == 0); assert(r == 0);
pa_signal_register(SIGINT, signal_callback, NULL); pa_signal_register(SIGINT, exit_signal_callback, NULL);
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
c = pa_core_new(pa_mainloop_get_api(mainloop)); c = pa_core_new(pa_mainloop_get_api(mainloop));
assert(c); assert(c);
pa_module_load(c, "module-oss", "/dev/dsp"); pa_module_load(c, "module-oss", "/dev/dsp");
/* pa_module_load(c, "module-pipe-sink", NULL);*/ /* pa_module_load(c, "module-pipe-sink", NULL);*/
pa_module_load(c, "module-simple-protocol-tcp", NULL); pa_module_load(c, "module-simple-protocol-tcp", NULL);
@ -46,6 +52,9 @@ int main(int argc, char *argv[]) {
pa_module_load(c, "module-native-protocol-unix", NULL); pa_module_load(c, "module-native-protocol-unix", NULL);
pa_module_load(c, "module-esound-protocol-tcp", NULL); pa_module_load(c, "module-esound-protocol-tcp", NULL);
pa_module_load(c, "module-cli", NULL); pa_module_load(c, "module-cli", NULL);
pa_signal_register(SIGUSR1, aux_signal_callback, c);
pa_signal_register(SIGUSR2, aux_signal_callback, c);
fprintf(stderr, "main: mainloop entry.\n"); fprintf(stderr, "main: mainloop entry.\n");
if (pa_mainloop_run(mainloop, &retval) < 0) if (pa_mainloop_run(mainloop, &retval) < 0)

View file

@ -15,30 +15,45 @@ struct memblock_list {
struct pa_memblockq { struct pa_memblockq {
struct memblock_list *blocks, *blocks_tail; struct memblock_list *blocks, *blocks_tail;
unsigned n_blocks; unsigned n_blocks;
size_t total_length, maxlength, base, prebuf; size_t current_length, maxlength, tlength, base, prebuf, minreq;
int measure_delay; int measure_delay;
uint32_t delay; uint32_t delay;
struct pa_mcalign *mcalign; struct pa_mcalign *mcalign;
}; };
struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t base, size_t prebuf) { struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t tlength, size_t base, size_t prebuf, size_t minreq) {
struct pa_memblockq* bq; struct pa_memblockq* bq;
assert(maxlength && base); assert(maxlength && base && maxlength);
bq = malloc(sizeof(struct pa_memblockq)); bq = malloc(sizeof(struct pa_memblockq));
assert(bq); assert(bq);
bq->blocks = bq->blocks_tail = 0; bq->blocks = bq->blocks_tail = 0;
bq->n_blocks = 0; bq->n_blocks = 0;
bq->total_length = 0;
bq->base = base; bq->current_length = 0;
bq->maxlength = ((maxlength+base-1)/base)*base;
bq->prebuf = prebuf == (size_t) -1 ? bq->maxlength/2 : prebuf; fprintf(stderr, "memblockq requested: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", maxlength, tlength, base, prebuf, minreq);
bq->base = base;
bq->maxlength = ((maxlength+base-1)/base)*base;
assert(bq->maxlength >= base);
bq->tlength = ((tlength+base-1)/base)*base;
if (bq->tlength == 0 || bq->tlength >= bq->maxlength)
bq->tlength = bq->maxlength;
bq->prebuf = (prebuf == (size_t) -1) ? bq->maxlength/2 : prebuf;
bq->prebuf = (bq->prebuf/base)*base;
if (bq->prebuf > bq->maxlength) if (bq->prebuf > bq->maxlength)
bq->prebuf = bq->maxlength; bq->prebuf = bq->maxlength;
assert(bq->maxlength >= base); bq->minreq = (minreq/base)*base;
if (bq->minreq == 0)
bq->minreq = 1;
fprintf(stderr, "memblockq sanitized: maxlength=%u, tlength=%u, base=%u, prebuf=%u, minreq=%u\n", bq->maxlength, bq->tlength, bq->base, bq->prebuf, bq->minreq);
bq->measure_delay = 0; bq->measure_delay = 0;
bq->delay = 0; bq->delay = 0;
@ -88,7 +103,7 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
bq->blocks_tail = q; bq->blocks_tail = q;
bq->n_blocks++; bq->n_blocks++;
bq->total_length += chunk->length; bq->current_length += chunk->length;
pa_memblockq_shorten(bq, bq->maxlength); pa_memblockq_shorten(bq, bq->maxlength);
} }
@ -96,7 +111,7 @@ void pa_memblockq_push(struct pa_memblockq* bq, const struct pa_memchunk *chunk,
int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) { int pa_memblockq_peek(struct pa_memblockq* bq, struct pa_memchunk *chunk) {
assert(bq && chunk); assert(bq && chunk);
if (!bq->blocks || bq->total_length < bq->prebuf) if (!bq->blocks || bq->current_length < bq->prebuf)
return -1; return -1;
bq->prebuf = 0; bq->prebuf = 0;
@ -116,7 +131,7 @@ int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
assert(bq && chunk); assert(bq && chunk);
if (!bq->blocks || bq->total_length < bq->prebuf) if (!bq->blocks || bq->current_length < bq->prebuf)
return -1; return -1;
bq->prebuf = 0; bq->prebuf = 0;
@ -127,7 +142,7 @@ int memblockq_pop(struct memblockq* bq, struct pa_memchunk *chunk) {
*chunk = q->chunk; *chunk = q->chunk;
bq->n_blocks--; bq->n_blocks--;
bq->total_length -= chunk->length; bq->current_length -= chunk->length;
free(q); free(q);
return 0; return 0;
@ -159,7 +174,7 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
while (length > 0) { while (length > 0) {
size_t l = length; size_t l = length;
assert(bq->blocks && bq->total_length >= length); assert(bq->blocks && bq->current_length >= length);
if (l > bq->blocks->chunk.length) if (l > bq->blocks->chunk.length)
l = bq->blocks->chunk.length; l = bq->blocks->chunk.length;
@ -169,7 +184,7 @@ void pa_memblockq_drop(struct pa_memblockq *bq, size_t length) {
bq->blocks->chunk.index += l; bq->blocks->chunk.index += l;
bq->blocks->chunk.length -= l; bq->blocks->chunk.length -= l;
bq->total_length -= l; bq->current_length -= l;
if (bq->blocks->chunk.length == 0) { if (bq->blocks->chunk.length == 0) {
struct memblock_list *q; struct memblock_list *q;
@ -192,12 +207,12 @@ void pa_memblockq_shorten(struct pa_memblockq *bq, size_t length) {
size_t l; size_t l;
assert(bq); assert(bq);
if (bq->total_length <= length) if (bq->current_length <= length)
return; return;
fprintf(stderr, "Warning! pa_memblockq_shorten()\n"); fprintf(stderr, "Warning! pa_memblockq_shorten()\n");
l = bq->total_length - length; l = bq->current_length - length;
l /= bq->base; l /= bq->base;
l *= bq->base; l *= bq->base;
@ -213,14 +228,13 @@ void pa_memblockq_empty(struct pa_memblockq *bq) {
int pa_memblockq_is_readable(struct pa_memblockq *bq) { int pa_memblockq_is_readable(struct pa_memblockq *bq) {
assert(bq); assert(bq);
return bq->total_length >= bq->prebuf; return bq->current_length >= bq->prebuf;
} }
int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) { int pa_memblockq_is_writable(struct pa_memblockq *bq, size_t length) {
assert(bq); assert(bq);
assert(length <= bq->maxlength); return bq->current_length + length <= bq->tlength;
return bq->total_length + length <= bq->maxlength;
} }
uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) { uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
@ -230,16 +244,20 @@ uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq) {
uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) { uint32_t pa_memblockq_get_length(struct pa_memblockq *bq) {
assert(bq); assert(bq);
return bq->total_length; return bq->current_length;
} }
uint32_t pa_memblockq_missing_to(struct pa_memblockq *bq, size_t qlen) { uint32_t pa_memblockq_missing(struct pa_memblockq *bq) {
assert(bq && qlen); size_t l;
assert(bq);
if (bq->total_length >= qlen) if (bq->current_length >= bq->tlength)
return 0; return 0;
return qlen - bq->total_length; l = bq->tlength - bq->current_length;
assert(l);
return (l >= bq->minreq) ? l : 0;
} }
void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta) { void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *chunk, size_t delta) {
@ -264,3 +282,8 @@ void pa_memblockq_push_align(struct pa_memblockq* bq, const struct pa_memchunk *
delta = 0; delta = 0;
} }
} }
uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq) {
assert(bq);
return bq->minreq;
}

View file

@ -8,11 +8,18 @@
struct pa_memblockq; struct pa_memblockq;
/* Parameters: the maximum length of the memblock queue, a base value /* Parameters:
for all operations (that is, all byte operations shall work on - maxlength: maximum length of queue. If more data is pushed into the queue, data from the front is dropped
multiples of this base value) and an amount of bytes to prebuffer - length: the target length of the queue.
before having pa_memblockq_peek() succeed. */ - base: a base value for all metrics. Only multiples of this value are popped from the queue
struct pa_memblockq* pa_memblockq_new(size_t maxlength, size_t base, size_t prebuf); - prebuf: before passing the first byte out, make sure that enough bytes are in the queue
- minreq: pa_memblockq_missing() will only return values greater than this value
*/
struct pa_memblockq* pa_memblockq_new(size_t maxlength,
size_t tlength,
size_t base,
size_t prebuf,
size_t minreq);
void pa_memblockq_free(struct pa_memblockq*bq); void pa_memblockq_free(struct pa_memblockq*bq);
/* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */ /* Push a new memory chunk into the queue. Optionally specify a value for future cancellation. This is currently not implemented, however! */
@ -46,6 +53,9 @@ uint32_t pa_memblockq_get_delay(struct pa_memblockq *bq);
uint32_t pa_memblockq_get_length(struct pa_memblockq *bq); uint32_t pa_memblockq_get_length(struct pa_memblockq *bq);
/* Return how many bytes are missing in queue to the specified fill amount */ /* Return how many bytes are missing in queue to the specified fill amount */
uint32_t pa_memblockq_missing_to(struct pa_memblockq *bq, size_t qlen); uint32_t pa_memblockq_missing(struct pa_memblockq *bq);
uint32_t pa_memblockq_get_minreq(struct pa_memblockq *bq);
#endif #endif

View file

@ -14,7 +14,7 @@ static void eof_cb(struct pa_cli*c, void *userdata) {
pa_module_unload_request(m->core, m); pa_module_unload_request(m->core, m);
} }
int module_init(struct pa_core *c, struct pa_module*m) { int pa_module_init(struct pa_core *c, struct pa_module*m) {
struct pa_iochannel *io; struct pa_iochannel *io;
assert(c && m); assert(c && m);
@ -35,7 +35,7 @@ int module_init(struct pa_core *c, struct pa_module*m) {
return 0; return 0;
} }
void module_done(struct pa_core *c, struct pa_module*m) { void pa_module_done(struct pa_core *c, struct pa_module*m) {
assert(c && m); assert(c && m);
pa_cli_free(m->userdata); pa_cli_free(m->userdata);

View file

@ -180,7 +180,7 @@ static uint32_t sink_get_latency_cb(struct pa_sink *s) {
return pa_samples_usec(u->out_fill, &s->sample_spec); return pa_samples_usec(u->out_fill, &s->sample_spec);
} }
int module_init(struct pa_core *c, struct pa_module*m) { int pa_module_init(struct pa_core *c, struct pa_module*m) {
struct audio_buf_info info; struct audio_buf_info info;
struct userdata *u = NULL; struct userdata *u = NULL;
char *p; char *p;

View file

@ -111,7 +111,7 @@ static uint32_t sink_get_latency_cb(struct pa_sink *s) {
return pa_samples_usec(arg, &s->sample_spec); return pa_samples_usec(arg, &s->sample_spec);
} }
int module_init(struct pa_core *c, struct pa_module*m) { int pa_module_init(struct pa_core *c, struct pa_module*m) {
struct audio_buf_info info; struct audio_buf_info info;
struct userdata *u = NULL; struct userdata *u = NULL;
char *p; char *p;
@ -224,7 +224,7 @@ fail:
return -1; return -1;
} }
void module_done(struct pa_core *c, struct pa_module*m) { void pa_module_done(struct pa_core *c, struct pa_module*m) {
struct userdata *u; struct userdata *u;
assert(c && m); assert(c && m);

View file

@ -73,7 +73,7 @@ static void io_callback(struct pa_iochannel *io, void*userdata) {
do_write(u); do_write(u);
} }
int module_init(struct pa_core *c, struct pa_module*m) { int pa_module_init(struct pa_core *c, struct pa_module*m) {
struct userdata *u = NULL; struct userdata *u = NULL;
struct stat st; struct stat st;
char *p; char *p;
@ -137,7 +137,7 @@ fail:
return -1; return -1;
} }
void module_done(struct pa_core *c, struct pa_module*m) { void pa_module_done(struct pa_core *c, struct pa_module*m) {
struct userdata *u; struct userdata *u;
assert(c && m); assert(c && m);

View file

@ -47,7 +47,7 @@
#endif #endif
#endif #endif
int module_init(struct pa_core *c, struct pa_module*m) { int pa_module_init(struct pa_core *c, struct pa_module*m) {
struct pa_socket_server *s; struct pa_socket_server *s;
assert(c && m); assert(c && m);
@ -91,7 +91,7 @@ int module_init(struct pa_core *c, struct pa_module*m) {
return 0; return 0;
} }
void module_done(struct pa_core *c, struct pa_module*m) { void pa_module_done(struct pa_core *c, struct pa_module*m) {
assert(c && m); assert(c && m);
protocol_free(m->userdata); protocol_free(m->userdata);

View file

@ -23,10 +23,10 @@ struct pa_module* pa_module_load(struct pa_core *c, const char *name, const char
if (!(m->dl = lt_dlopenext(name))) if (!(m->dl = lt_dlopenext(name)))
goto fail; goto fail;
if (!(m->init = lt_dlsym(m->dl, "module_init"))) if (!(m->init = lt_dlsym(m->dl, "pa_module_init")))
goto fail; goto fail;
if (!(m->done = lt_dlsym(m->dl, "module_done"))) if (!(m->done = lt_dlsym(m->dl, "pa_module_done")))
goto fail; goto fail;
m->userdata = NULL; m->userdata = NULL;
@ -124,7 +124,7 @@ char *pa_module_list_to_string(struct pa_core *c) {
pa_strbuf_printf(s, "%u module(s) loaded.\n", pa_idxset_ncontents(c->modules)); pa_strbuf_printf(s, "%u module(s) loaded.\n", pa_idxset_ncontents(c->modules));
for (m = pa_idxset_first(c->modules, &index); m; m = pa_idxset_next(c->modules, &index)) for (m = pa_idxset_first(c->modules, &index); m; m = pa_idxset_next(c->modules, &index))
pa_strbuf_printf(s, " index: %u, name: <%s>, argument: <%s>\n", m->index, m->name, m->argument); pa_strbuf_printf(s, " index: %u\n\tname: <%s>\n\targument: <%s>\n", m->index, m->name, m->argument);
return pa_strbuf_tostring_free(s); return pa_strbuf_tostring_free(s);
} }

View file

@ -30,4 +30,8 @@ char *pa_module_list_to_string(struct pa_core *c);
void pa_module_unload_request(struct pa_core *c, struct pa_module *m); void pa_module_unload_request(struct pa_core *c, struct pa_module *m);
/* These to following prototypes are for module entrypoints and not implemented by the core */
int pa_module_init(struct pa_core *c, struct pa_module*m);
void pa_module_done(struct pa_core *c, struct pa_module*m);
#endif #endif

View file

@ -19,7 +19,7 @@ int main(int argc, char*argv[]) {
int error; int error;
if (!(s = pa_simple_new(NULL, argv[0], PA_STREAM_PLAYBACK, NULL, "playback", &ss, NULL, &error))) { if (!(s = pa_simple_new(NULL, argv[0], PA_STREAM_PLAYBACK, NULL, "playback", &ss, NULL, &error))) {
fprintf(stderr, "Failed to connect to server: %s\n", pa_strerror(error)); fprintf(stderr, __FILE__": pa_simple_new() failed: %s\n", pa_strerror(error));
goto finish; goto finish;
} }
@ -31,16 +31,16 @@ int main(int argc, char*argv[]) {
if (r == 0) /* eof */ if (r == 0) /* eof */
break; break;
fprintf(stderr, "read() failed: %s\n", strerror(errno)); fprintf(stderr, __FILE__": read() failed: %s\n", strerror(errno));
goto finish; goto finish;
} }
if (pa_simple_write(s, buf, r, &error) < 0) { if (pa_simple_write(s, buf, r, &error) < 0) {
fprintf(stderr, "Failed to write data: %s\n", pa_strerror(error)); fprintf(stderr, __FILE__": pa_simple_write() failed: %s\n", pa_strerror(error));
goto finish; goto finish;
} }
} }
ret = 0; ret = 0;
finish: finish:

View file

@ -1,3 +1,4 @@
#include <signal.h>
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
@ -6,7 +7,9 @@
#include <stdlib.h> #include <stdlib.h>
#include "polyp.h" #include "polyp.h"
#include "polyp-error.h"
#include "mainloop.h" #include "mainloop.h"
#include "mainloop-signal.h"
static struct pa_context *context = NULL; static struct pa_context *context = NULL;
static struct pa_stream *stream = NULL; static struct pa_stream *stream = NULL;
@ -17,21 +20,29 @@ static size_t buffer_length = 0, buffer_index = 0;
static void* stdin_source = NULL; static void* stdin_source = NULL;
static void quit(int ret) {
assert(mainloop_api);
mainloop_api->quit(mainloop_api, ret);
}
static void context_die_callback(struct pa_context *c, void *userdata) { static void context_die_callback(struct pa_context *c, void *userdata) {
assert(c); assert(c);
fprintf(stderr, "Connection to server shut down, exiting.\n"); fprintf(stderr, "Connection to server shut down, exiting.\n");
mainloop_api->quit(mainloop_api, 1); quit(1);
} }
static void stream_die_callback(struct pa_stream *s, void *userdata) { static void stream_die_callback(struct pa_stream *s, void *userdata) {
assert(s); assert(s);
fprintf(stderr, "Stream deleted, exiting.\n"); fprintf(stderr, "Stream deleted, exiting.\n");
mainloop_api->quit(mainloop_api, 1); quit(1);
} }
static void do_write(size_t length) { static void do_write(size_t length) {
size_t l; size_t l;
assert(buffer && buffer_length); assert(length);
if (!buffer || !buffer_length)
return;
l = length; l = length;
if (l > buffer_length) if (l > buffer_length)
@ -50,8 +61,9 @@ static void do_write(size_t length) {
static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) { static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
assert(s && length); assert(s && length);
mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT); if (stdin_source)
mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_INPUT);
if (!buffer) if (!buffer)
return; return;
@ -63,13 +75,12 @@ static void stream_complete_callback(struct pa_stream*s, int success, void *user
assert(s); assert(s);
if (!success) { if (!success) {
fprintf(stderr, "Stream creation failed.\n"); fprintf(stderr, "Stream creation failed: %s\n", pa_strerror(pa_context_errno(pa_stream_get_context(s))));
mainloop_api->quit(mainloop_api, 1); quit(1);
return; return;
} }
pa_stream_set_die_callback(s, stream_die_callback, NULL); fprintf(stderr, "Stream created.\n");
pa_stream_set_write_callback(s, stream_write_callback, NULL);
} }
static void context_complete_callback(struct pa_context *c, int success, void *userdata) { static void context_complete_callback(struct pa_context *c, int success, void *userdata) {
@ -82,43 +93,59 @@ static void context_complete_callback(struct pa_context *c, int success, void *u
assert(c && !stream); assert(c && !stream);
if (!success) { if (!success) {
fprintf(stderr, "Connection failed\n"); fprintf(stderr, "Connection failed: %s\n", pa_strerror(pa_context_errno(c)));
goto fail;
}
if (!(stream = pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL))) {
fprintf(stderr, "pa_stream_new() failed.\n");
goto fail; goto fail;
} }
fprintf(stderr, "Connection established.\n");
if (!(stream = pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL))) {
fprintf(stderr, "pa_stream_new() failed: %s\n", pa_strerror(pa_context_errno(c)));
goto fail;
}
pa_stream_set_die_callback(stream, stream_die_callback, NULL);
pa_stream_set_write_callback(stream, stream_write_callback, NULL);
return; return;
fail: fail:
mainloop_api->quit(mainloop_api, 1); quit(1);
}
static void context_drain_complete(struct pa_context*c, void *userdata) {
quit(0);
} }
static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) { static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
size_t l, w = 0; size_t l, w = 0;
ssize_t r; ssize_t r;
assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT); assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT && stdin_source == id);
if (buffer) { if (buffer) {
mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL); mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL);
return; return;
} }
if (!stream || !(l = w = pa_stream_writable_size(stream))) if (!stream || !pa_stream_is_ready(stream) || !(l = w = pa_stream_writable_size(stream)))
l = 4096; l = 4096;
buffer = malloc(l); buffer = malloc(l);
assert(buffer); assert(buffer);
if ((r = read(fd, buffer, l)) <= 0) { if ((r = read(fd, buffer, l)) <= 0) {
if (r == 0) if (r == 0) {
mainloop_api->quit(mainloop_api, 0); fprintf(stderr, "Got EOF.\n");
else { if (pa_context_drain(context, context_drain_complete, NULL) < 0)
quit(0);
else
fprintf(stderr, "Draining connection to server.\n");
} else {
fprintf(stderr, "read() failed: %s\n", strerror(errno)); fprintf(stderr, "read() failed: %s\n", strerror(errno));
mainloop_api->quit(mainloop_api, 1); quit(1);
} }
mainloop_api->cancel_io(mainloop_api, stdin_source);
stdin_source = NULL;
return; return;
} }
@ -129,9 +156,15 @@ static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_m
do_write(w); do_write(w);
} }
static void exit_signal_callback(void *id, int sig, void *userdata) {
fprintf(stderr, "Got SIGINT, exiting.\n");
quit(0);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
struct pa_mainloop* m; struct pa_mainloop* m;
int ret = 1; int ret = 1, r;
if (!(m = pa_mainloop_new())) { if (!(m = pa_mainloop_new())) {
fprintf(stderr, "pa_mainloop_new() failed.\n"); fprintf(stderr, "pa_mainloop_new() failed.\n");
@ -140,6 +173,11 @@ int main(int argc, char *argv[]) {
mainloop_api = pa_mainloop_get_api(m); mainloop_api = pa_mainloop_get_api(m);
r = pa_signal_init(mainloop_api);
assert(r == 0);
pa_signal_register(SIGINT, exit_signal_callback, NULL);
signal(SIGPIPE, SIG_IGN);
if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) { if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) {
fprintf(stderr, "source_io() failed.\n"); fprintf(stderr, "source_io() failed.\n");
goto quit; goto quit;

View file

@ -4,10 +4,26 @@
#include "pdispatch.h" #include "pdispatch.h"
#include "protocol-native-spec.h" #include "protocol-native-spec.h"
static const char *command_names[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = "ERROR",
[PA_COMMAND_TIMEOUT] = "TIMEOUT",
[PA_COMMAND_REPLY] = "REPLY",
[PA_COMMAND_CREATE_PLAYBACK_STREAM] = "CREATE_PLAYBACK_STREAM",
[PA_COMMAND_DELETE_PLAYBACK_STREAM] = "DELETE_PLAYBACK_STREAM",
[PA_COMMAND_CREATE_RECORD_STREAM] = "CREATE_RECORD_STREAM",
[PA_COMMAND_DELETE_RECORD_STREAM] = "DELETE_RECORD_STREAM",
[PA_COMMAND_AUTH] = "AUTH",
[PA_COMMAND_REQUEST] = "REQUEST",
[PA_COMMAND_EXIT] = "EXIT",
[PA_COMMAND_SET_NAME] = "SET_NAME",
[PA_COMMAND_LOOKUP_SINK] = "LOOKUP_SINK",
[PA_COMMAND_LOOKUP_SOURCE] = "LOOKUP_SOURCE",
};
struct reply_info { struct reply_info {
struct pa_pdispatch *pdispatch; struct pa_pdispatch *pdispatch;
struct reply_info *next, *previous; struct reply_info *next, *previous;
int (*callback)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); void (*callback)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
void *userdata; void *userdata;
uint32_t tag; uint32_t tag;
void *mainloop_timeout; void *mainloop_timeout;
@ -18,6 +34,9 @@ struct pa_pdispatch {
const struct pa_pdispatch_command *command_table; const struct pa_pdispatch_command *command_table;
unsigned n_commands; unsigned n_commands;
struct reply_info *replies; struct reply_info *replies;
void (*drain_callback)(struct pa_pdispatch *pd, void *userdata);
void *drain_userdata;
int in_use, shall_free;
}; };
static void reply_info_free(struct reply_info *r) { static void reply_info_free(struct reply_info *r) {
@ -49,11 +68,21 @@ struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *mainloop, const st
pd->command_table = table; pd->command_table = table;
pd->n_commands = entries; pd->n_commands = entries;
pd->replies = NULL; pd->replies = NULL;
pd->drain_callback = NULL;
pd->drain_userdata = NULL;
pd->in_use = pd->shall_free = 0;
return pd; return pd;
} }
void pa_pdispatch_free(struct pa_pdispatch *pd) { void pa_pdispatch_free(struct pa_pdispatch *pd) {
assert(pd); assert(pd);
if (pd->in_use) {
pd->shall_free = 1;
return;
}
while (pd->replies) while (pd->replies)
reply_info_free(pd->replies); reply_info_free(pd->replies);
free(pd); free(pd);
@ -61,60 +90,61 @@ void pa_pdispatch_free(struct pa_pdispatch *pd) {
int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*packet, void *userdata) { int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*packet, void *userdata) {
uint32_t tag, command; uint32_t tag, command;
assert(pd && packet);
struct pa_tagstruct *ts = NULL; struct pa_tagstruct *ts = NULL;
assert(pd && packet && packet->data); int ret = -1;
assert(pd && packet && packet->data && !pd->in_use);
if (packet->length <= 8) if (packet->length <= 8)
goto fail; goto finish;
ts = pa_tagstruct_new(packet->data, packet->length); ts = pa_tagstruct_new(packet->data, packet->length);
assert(ts); assert(ts);
if (pa_tagstruct_getu32(ts, &command) < 0 || if (pa_tagstruct_getu32(ts, &command) < 0 ||
pa_tagstruct_getu32(ts, &tag) < 0) pa_tagstruct_getu32(ts, &tag) < 0)
goto fail; goto finish;
/*fprintf(stderr, __FILE__": Recieved opcode <%s>\n", command_names[command]);*/
if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) { if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
struct reply_info *r; struct reply_info *r;
int done = 0;
for (r = pd->replies; r; r = r->next) { for (r = pd->replies; r; r = r->next) {
if (r->tag == tag) { if (r->tag != tag)
int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata); continue;
reply_info_free(r);
pd->in_use = 1;
if (ret < 0) assert(r->callback);
goto fail; r->callback(r->pdispatch, command, tag, ts, r->userdata);
pd->in_use = 0;
done = 1; reply_info_free(r);
if (pd->shall_free) {
pa_pdispatch_free(pd);
break; break;
} }
}
if (!done) if (pd->drain_callback && !pa_pdispatch_is_pending(r->pdispatch))
goto fail; pd->drain_callback(r->pdispatch, r->pdispatch->drain_userdata);
break;
}
} else if (pd->command_table && command < pd->n_commands) { } else if (pd->command_table && command < pd->n_commands) {
const struct pa_pdispatch_command *c = pd->command_table+command; const struct pa_pdispatch_command *c = pd->command_table+command;
if (!c->proc) if (c->proc)
goto fail; c->proc(pd, command, tag, ts, userdata);
if (c->proc(pd, command, tag, ts, userdata) < 0)
goto fail;
} else } else
goto fail; goto finish;
pa_tagstruct_free(ts);
return 0;
fail: ret = 0;
finish:
if (ts) if (ts)
pa_tagstruct_free(ts); pa_tagstruct_free(ts);
return -1; return ret;
} }
static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) { static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) {
@ -123,9 +153,12 @@ static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct ti
r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata); r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata);
reply_info_free(r); reply_info_free(r);
if (r->pdispatch->drain_callback && !pa_pdispatch_is_pending(r->pdispatch))
r->pdispatch->drain_callback(r->pdispatch, r->pdispatch->drain_userdata);
} }
void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata) { void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, void (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata) {
struct reply_info *r; struct reply_info *r;
struct timeval tv; struct timeval tv;
assert(pd && cb); assert(pd && cb);
@ -149,3 +182,17 @@ void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int time
r->next->previous = r; r->next->previous = r;
pd->replies = r; pd->replies = r;
} }
int pa_pdispatch_is_pending(struct pa_pdispatch *pd) {
assert(pd);
return !!pd->replies;
}
void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata) {
assert(pd);
assert(!cb || pa_pdispatch_is_pending(pd));
pd->drain_callback = cb;
pd->drain_userdata = userdata;
}

View file

@ -8,8 +8,10 @@
struct pa_pdispatch; struct pa_pdispatch;
/* It is safe to destroy the calling pdispatch object from all callbacks */
struct pa_pdispatch_command { struct pa_pdispatch_command {
int (*proc)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); void (*proc)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
}; };
struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *m, const struct pa_pdispatch_command*table, unsigned entries); struct pa_pdispatch* pa_pdispatch_new(struct pa_mainloop_api *m, const struct pa_pdispatch_command*table, unsigned entries);
@ -17,6 +19,10 @@ void pa_pdispatch_free(struct pa_pdispatch *pd);
int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*p, void *userdata); int pa_pdispatch_run(struct pa_pdispatch *pd, struct pa_packet*p, void *userdata);
void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata); void pa_pdispatch_register_reply(struct pa_pdispatch *pd, uint32_t tag, int timeout, void (*cb)(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata), void *userdata);
int pa_pdispatch_is_pending(struct pa_pdispatch *pd);
void pa_pdispatch_set_drain_callback(struct pa_pdispatch *pd, void (*cb)(struct pa_pdispatch *pd, void *userdata), void *userdata);
#endif #endif

View file

@ -11,10 +11,13 @@
#include "socket-client.h" #include "socket-client.h"
#include "pstream-util.h" #include "pstream-util.h"
#include "authkey.h" #include "authkey.h"
#include "util.h"
#define DEFAULT_QUEUE_LENGTH 10240 #define DEFAULT_MAXLENGTH 20480
#define DEFAULT_MAX_LENGTH 20480 #define DEFAULT_TLENGTH 10240
#define DEFAULT_PREBUF 4096 #define DEFAULT_PREBUF 4096
#define DEFAULT_MINREQ 1024
#define DEFAULT_TIMEOUT (5*60) #define DEFAULT_TIMEOUT (5*60)
#define DEFAULT_SERVER "/tmp/polypaudio/native" #define DEFAULT_SERVER "/tmp/polypaudio/native"
@ -28,25 +31,40 @@ struct pa_context {
struct pa_stream *first_stream; struct pa_stream *first_stream;
uint32_t ctag; uint32_t ctag;
uint32_t error; uint32_t error;
enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_AUTHORIZING, CONTEXT_SETTING_NAME, CONTEXT_READY, CONTEXT_DEAD} state; enum {
CONTEXT_UNCONNECTED,
CONTEXT_CONNECTING,
CONTEXT_AUTHORIZING,
CONTEXT_SETTING_NAME,
CONTEXT_READY,
CONTEXT_DEAD
} state;
void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata); void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
void *connect_complete_userdata; void *connect_complete_userdata;
void (*drain_complete_callback)(struct pa_context*c, void *userdata);
void *drain_complete_userdata;
void (*die_callback)(struct pa_context*c, void *userdata); void (*die_callback)(struct pa_context*c, void *userdata);
void *die_userdata; void *die_userdata;
uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH]; uint8_t auth_cookie[PA_NATIVE_COOKIE_LENGTH];
}; };
struct pa_stream { struct pa_stream {
struct pa_context *context; struct pa_context *context;
struct pa_stream *next, *previous; struct pa_stream *next, *previous;
char *name;
struct pa_buffer_attr buffer_attr;
struct pa_sample_spec sample_spec;
uint32_t device_index; uint32_t device_index;
uint32_t channel; uint32_t channel;
int channel_valid; int channel_valid;
enum pa_stream_direction direction; enum pa_stream_direction direction;
enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
enum { STREAM_LOOKING_UP, STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
uint32_t requested_bytes; uint32_t requested_bytes;
void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata); void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
@ -62,7 +80,7 @@ struct pa_stream {
void *die_userdata; void *die_userdata;
}; };
static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = { NULL }, [PA_COMMAND_ERROR] = { NULL },
@ -76,8 +94,9 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
}; };
struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) { struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
assert(mainloop && name);
struct pa_context *c; struct pa_context *c;
assert(mainloop && name);
c = malloc(sizeof(struct pa_context)); c = malloc(sizeof(struct pa_context));
assert(c); assert(c);
c->name = strdup(name); c->name = strdup(name);
@ -95,9 +114,13 @@ struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *
c->connect_complete_callback = NULL; c->connect_complete_callback = NULL;
c->connect_complete_userdata = NULL; c->connect_complete_userdata = NULL;
c->drain_complete_callback = NULL;
c->drain_complete_userdata = NULL;
c->die_callback = NULL; c->die_callback = NULL;
c->die_userdata = NULL; c->die_userdata = NULL;
pa_check_for_sigpipe();
return c; return c;
} }
@ -121,84 +144,105 @@ void pa_context_free(struct pa_context *c) {
} }
static void stream_dead(struct pa_stream *s) { static void stream_dead(struct pa_stream *s) {
assert(s);
if (s->state == STREAM_DEAD) if (s->state == STREAM_DEAD)
return; return;
s->state = STREAM_DEAD; if (s->state == STREAM_READY) {
if (s->die_callback) s->state = STREAM_DEAD;
s->die_callback(s, s->die_userdata); if (s->die_callback)
s->die_callback(s, s->die_userdata);
} else
s->state = STREAM_DEAD;
} }
static void context_dead(struct pa_context *c) { static void context_dead(struct pa_context *c) {
struct pa_stream *s; struct pa_stream *s;
assert(c); assert(c);
if (c->state == CONTEXT_DEAD)
return;
if (c->pdispatch)
pa_pdispatch_free(c->pdispatch);
c->pdispatch = NULL;
if (c->pstream)
pa_pstream_free(c->pstream);
c->pstream = NULL;
if (c->client)
pa_socket_client_free(c->client);
c->client = NULL;
for (s = c->first_stream; s; s = s->next) for (s = c->first_stream; s; s = s->next)
stream_dead(s); stream_dead(s);
if (c->state == CONTEXT_DEAD) if (c->state == CONTEXT_READY) {
return; c->state = CONTEXT_DEAD;
if (c->die_callback)
c->state = CONTEXT_DEAD; c->die_callback(c, c->die_userdata);
if (c->die_callback) } else
c->die_callback(c, c->die_userdata); s->state = CONTEXT_DEAD;
} }
static void pstream_die_callback(struct pa_pstream *p, void *userdata) { static void pstream_die_callback(struct pa_pstream *p, void *userdata) {
struct pa_context *c = userdata; struct pa_context *c = userdata;
assert(p && c); assert(p && c);
assert(c->state != CONTEXT_DEAD);
c->state = CONTEXT_DEAD;
context_dead(c); context_dead(c);
} }
static int pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { static void pstream_packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
struct pa_context *c = userdata; struct pa_context *c = userdata;
assert(p && packet && c); assert(p && packet && c);
if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
fprintf(stderr, "polyp.c: invalid packet.\n"); fprintf(stderr, "polyp.c: invalid packet.\n");
context_dead(c); context_dead(c);
return -1;
} }
return 0;
} }
static int pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { static void pstream_memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
struct pa_context *c = userdata; struct pa_context *c = userdata;
struct pa_stream *s; struct pa_stream *s;
assert(p && chunk && c && chunk->memblock && chunk->memblock->data); assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
if (!(s = pa_dynarray_get(c->streams, channel))) if (!(s = pa_dynarray_get(c->streams, channel)))
return 0; return;
if (s->read_callback) if (s->read_callback)
s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata); s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
return 0;
} }
static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static int handle_error(struct pa_context *c, uint32_t command, struct pa_tagstruct *t) {
assert(c && t);
if (command == PA_COMMAND_ERROR) {
if (pa_tagstruct_getu32(t, &c->error) < 0) {
c->error = PA_ERROR_PROTOCOL;
return -1;
}
return 0;
}
c->error = (command == PA_COMMAND_TIMEOUT) ? PA_ERROR_TIMEOUT : PA_ERROR_INTERNAL;
return -1;
}
static void setup_complete_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct pa_context *c = userdata; struct pa_context *c = userdata;
assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME)); assert(pd && c && (c->state == CONTEXT_AUTHORIZING || c->state == CONTEXT_SETTING_NAME));
if (command != PA_COMMAND_REPLY) { if (command != PA_COMMAND_REPLY) {
if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &c->error) < 0) handle_error(c, command, t);
c->error = PA_ERROR_PROTOCOL; context_dead(c);
else if (command == PA_COMMAND_TIMEOUT)
c->error = PA_ERROR_TIMEOUT;
c->state = CONTEXT_DEAD;
if (c->connect_complete_callback) if (c->connect_complete_callback)
c->connect_complete_callback(c, 0, c->connect_complete_userdata); c->connect_complete_callback(c, 0, c->connect_complete_userdata);
return -1; return;
} }
if (c->state == CONTEXT_AUTHORIZING) { if (c->state == CONTEXT_AUTHORIZING) {
@ -210,7 +254,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin
pa_tagstruct_putu32(t, tag = c->ctag++); pa_tagstruct_putu32(t, tag = c->ctag++);
pa_tagstruct_puts(t, c->name); pa_tagstruct_puts(t, c->name);
pa_pstream_send_tagstruct(c->pstream, t); pa_pstream_send_tagstruct(c->pstream, t);
pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c); pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
} else { } else {
assert(c->state == CONTEXT_SETTING_NAME); assert(c->state == CONTEXT_SETTING_NAME);
@ -220,7 +264,7 @@ static int auth_complete_callback(struct pa_pdispatch *pd, uint32_t command, uin
c->connect_complete_callback(c, 1, c->connect_complete_userdata); c->connect_complete_callback(c, 1, c->connect_complete_userdata);
} }
return 0; return;
} }
static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) { static void on_connection(struct pa_socket_client *client, struct pa_iochannel*io, void *userdata) {
@ -234,7 +278,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i
if (!io) { if (!io) {
c->error = PA_ERROR_CONNECTIONREFUSED; c->error = PA_ERROR_CONNECTIONREFUSED;
c->state = CONTEXT_UNCONNECTED; context_dead(c);
if (c->connect_complete_callback) if (c->connect_complete_callback)
c->connect_complete_callback(c, 0, c->connect_complete_userdata); c->connect_complete_callback(c, 0, c->connect_complete_userdata);
@ -257,7 +301,7 @@ static void on_connection(struct pa_socket_client *client, struct pa_iochannel*i
pa_tagstruct_putu32(t, tag = c->ctag++); pa_tagstruct_putu32(t, tag = c->ctag++);
pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie)); pa_tagstruct_put_arbitrary(t, c->auth_cookie, sizeof(c->auth_cookie));
pa_pstream_send_tagstruct(c->pstream, t); pa_pstream_send_tagstruct(c->pstream, t);
pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, auth_complete_callback, c); pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, setup_complete_callback, c);
c->state = CONTEXT_AUTHORIZING; c->state = CONTEXT_AUTHORIZING;
} }
@ -305,7 +349,7 @@ void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_cont
c->die_userdata = userdata; c->die_userdata = userdata;
} }
static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct pa_stream *s; struct pa_stream *s;
struct pa_context *c = userdata; struct pa_context *c = userdata;
uint32_t bytes, channel; uint32_t bytes, channel;
@ -315,63 +359,122 @@ static int command_request(struct pa_pdispatch *pd, uint32_t command, uint32_t t
pa_tagstruct_getu32(t, &bytes) < 0 || pa_tagstruct_getu32(t, &bytes) < 0 ||
!pa_tagstruct_eof(t)) { !pa_tagstruct_eof(t)) {
c->error = PA_ERROR_PROTOCOL; c->error = PA_ERROR_PROTOCOL;
return -1; context_dead(c);
return;
} }
if (!(s = pa_dynarray_get(c->streams, channel))) { if (!(s = pa_dynarray_get(c->streams, channel)))
c->error = PA_ERROR_PROTOCOL; return;
return -1;
}
/*fprintf(stderr, "Requested %u bytes\n", bytes);*/ if (s->state != STREAM_READY)
return;
s->requested_bytes += bytes; s->requested_bytes += bytes;
if (s->requested_bytes && s->write_callback) if (s->requested_bytes && s->write_callback)
s->write_callback(s, s->requested_bytes, s->write_userdata); s->write_callback(s, s->requested_bytes, s->write_userdata);
return 0;
} }
static int create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void create_playback_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
int ret = 0;
struct pa_stream *s = userdata; struct pa_stream *s = userdata;
assert(pd && s && s->state == STREAM_CREATING); assert(pd && s && s->state == STREAM_CREATING);
if (command != PA_COMMAND_REPLY) { if (command != PA_COMMAND_REPLY) {
struct pa_context *c = s->context; if (handle_error(s->context, command, t) < 0) {
assert(c); context_dead(s->context);
return;
}
if (command == PA_COMMAND_ERROR && pa_tagstruct_getu32(t, &s->context->error) < 0) stream_dead(s);
s->context->error = PA_ERROR_PROTOCOL; if (s->create_complete_callback)
else if (command == PA_COMMAND_TIMEOUT) s->create_complete_callback(s, 0, s->create_complete_userdata);
s->context->error = PA_ERROR_TIMEOUT;
return;
ret = -1;
goto fail;
} }
if (pa_tagstruct_getu32(t, &s->channel) < 0 || if (pa_tagstruct_getu32(t, &s->channel) < 0 ||
pa_tagstruct_getu32(t, &s->device_index) < 0 || pa_tagstruct_getu32(t, &s->device_index) < 0 ||
!pa_tagstruct_eof(t)) { !pa_tagstruct_eof(t)) {
s->context->error = PA_ERROR_PROTOCOL; s->context->error = PA_ERROR_PROTOCOL;
ret = -1; context_dead(s->context);
goto fail; return;
} }
s->channel_valid = 1; s->channel_valid = 1;
pa_dynarray_put(s->context->streams, s->channel, s); pa_dynarray_put(s->context->streams, s->channel, s);
s->state = STREAM_READY; s->state = STREAM_READY;
assert(s->create_complete_callback); if (s->create_complete_callback)
s->create_complete_callback(s, 1, s->create_complete_userdata); s->create_complete_callback(s, 1, s->create_complete_userdata);
return 0; }
fail: static void create_stream(struct pa_stream *s, uint32_t tdev_index) {
assert(s->create_complete_callback); struct pa_tagstruct *t;
s->create_complete_callback(s, 0, s->create_complete_userdata); uint32_t tag;
pa_stream_free(s); assert(s);
return ret;
s->state = STREAM_CREATING;
t = pa_tagstruct_new(NULL, 0);
assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_puts(t, s->name);
pa_tagstruct_put_sample_spec(t, &s->sample_spec);
pa_tagstruct_putu32(t, tdev_index);
pa_tagstruct_putu32(t, s->buffer_attr.maxlength);
pa_tagstruct_putu32(t, s->buffer_attr.tlength);
pa_tagstruct_putu32(t, s->buffer_attr.prebuf);
pa_tagstruct_putu32(t, s->buffer_attr.minreq);
pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
}
static void lookup_device_callback(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct pa_stream *s = userdata;
uint32_t tdev;
assert(pd && s && s->state == STREAM_LOOKING_UP);
if (command != PA_COMMAND_REPLY) {
if (handle_error(s->context, command, t) < 0) {
context_dead(s->context);
return;
}
stream_dead(s);
if (s->create_complete_callback)
s->create_complete_callback(s, 0, s->create_complete_userdata);
return;
}
if (pa_tagstruct_getu32(t, &tdev) < 0 ||
!pa_tagstruct_eof(t)) {
s->context->error = PA_ERROR_PROTOCOL;
context_dead(s->context);
return;
}
create_stream(s, tdev);
}
static void lookup_device(struct pa_stream *s, const char *tdev) {
struct pa_tagstruct *t;
uint32_t tag;
assert(s);
s->state = STREAM_LOOKING_UP;
t = pa_tagstruct_new(NULL, 0);
assert(t);
pa_tagstruct_putu32(t, s->direction == PA_STREAM_PLAYBACK ? PA_COMMAND_LOOKUP_SINK : PA_COMMAND_LOOKUP_SOURCE);
pa_tagstruct_putu32(t, tag = s->context->ctag++);
pa_tagstruct_puts(t, tdev);
pa_pstream_send_tagstruct(s->context->pstream, t);
pa_pdispatch_register_reply(s->context->pdispatch, tag, DEFAULT_TIMEOUT, lookup_device_callback, s);
} }
struct pa_stream* pa_stream_new( struct pa_stream* pa_stream_new(
@ -385,10 +488,8 @@ struct pa_stream* pa_stream_new(
void *userdata) { void *userdata) {
struct pa_stream *s; struct pa_stream *s;
struct pa_tagstruct *t;
uint32_t tag;
assert(c && name && ss && c->state == CONTEXT_READY && complete); assert(c && name && ss && c->state == CONTEXT_READY);
s = malloc(sizeof(struct pa_stream)); s = malloc(sizeof(struct pa_stream));
assert(s); assert(s);
@ -403,42 +504,43 @@ struct pa_stream* pa_stream_new(
s->create_complete_callback = complete; s->create_complete_callback = complete;
s->create_complete_userdata = NULL; s->create_complete_userdata = NULL;
s->name = strdup(name);
s->state = STREAM_CREATING; s->state = STREAM_CREATING;
s->requested_bytes = 0; s->requested_bytes = 0;
s->channel = 0; s->channel = 0;
s->channel_valid = 0; s->channel_valid = 0;
s->device_index = (uint32_t) -1; s->device_index = (uint32_t) -1;
s->direction = dir; s->direction = dir;
s->sample_spec = *ss;
if (attr)
s->buffer_attr = *attr;
else {
s->buffer_attr.maxlength = DEFAULT_MAXLENGTH;
s->buffer_attr.tlength = DEFAULT_TLENGTH;
s->buffer_attr.prebuf = DEFAULT_PREBUF;
s->buffer_attr.minreq = DEFAULT_MINREQ;
}
t = pa_tagstruct_new(NULL, 0);
assert(t);
pa_tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
pa_tagstruct_putu32(t, tag = c->ctag++);
pa_tagstruct_puts(t, name);
pa_tagstruct_put_sample_spec(t, ss);
pa_tagstruct_putu32(t, (uint32_t) -1);
pa_tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
pa_tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH);
pa_tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
pa_pstream_send_tagstruct(c->pstream, t);
pa_pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
s->next = c->first_stream; s->next = c->first_stream;
if (s->next) if (s->next)
s->next->previous = s; s->next->previous = s;
s->previous = NULL; s->previous = NULL;
c->first_stream = s; c->first_stream = s;
return 0; if (dev)
lookup_device(s, dev);
else
create_stream(s, (uint32_t) -1);
return s;
} }
void pa_stream_free(struct pa_stream *s) { void pa_stream_free(struct pa_stream *s) {
assert(s && s->context); assert(s && s->context);
free(s->name);
if (s->channel_valid) { if (s->channel_valid && s->context->state == CONTEXT_READY) {
struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0); struct pa_tagstruct *t = pa_tagstruct_new(NULL, 0);
assert(t); assert(t);
@ -469,7 +571,7 @@ void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stre
void pa_stream_write(struct pa_stream *s, const void *data, size_t length) { void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
struct pa_memchunk chunk; struct pa_memchunk chunk;
assert(s && s->context && data && length); assert(s && s->context && data && length && s->state == STREAM_READY);
chunk.memblock = pa_memblock_new(length); chunk.memblock = pa_memblock_new(length);
assert(chunk.memblock && chunk.memblock->data); assert(chunk.memblock && chunk.memblock->data);
@ -489,7 +591,7 @@ void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
} }
size_t pa_stream_writable_size(struct pa_stream *s) { size_t pa_stream_writable_size(struct pa_stream *s) {
assert(s); assert(s && s->state == STREAM_READY);
return s->requested_bytes; return s->requested_bytes;
} }
@ -512,3 +614,72 @@ void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream
s->die_callback = cb; s->die_callback = cb;
s->die_userdata = userdata; s->die_userdata = userdata;
} }
int pa_context_is_pending(struct pa_context *c) {
assert(c);
if (c->state != CONTEXT_READY)
return 0;
return pa_pstream_is_pending(c->pstream) || pa_pdispatch_is_pending(c->pdispatch);
}
struct pa_context* pa_stream_get_context(struct pa_stream *p) {
assert(p);
return p->context;
}
static void set_dispatch_callbacks(struct pa_context *c);
static void pdispatch_drain_callback(struct pa_pdispatch*pd, void *userdata) {
set_dispatch_callbacks(userdata);
}
static void pstream_drain_callback(struct pa_pstream *s, void *userdata) {
set_dispatch_callbacks(userdata);
}
static void set_dispatch_callbacks(struct pa_context *c) {
assert(c && c->state == CONTEXT_READY);
pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
if (pa_pdispatch_is_pending(c->pdispatch)) {
pa_pdispatch_set_drain_callback(c->pdispatch, pdispatch_drain_callback, c);
return;
}
if (pa_pstream_is_pending(c->pstream)) {
pa_pstream_set_drain_callback(c->pstream, pstream_drain_callback, c);
return;
}
assert(c->drain_complete_callback);
c->drain_complete_callback(c, c->drain_complete_userdata);
}
int pa_context_drain(
struct pa_context *c,
void (*complete) (struct pa_context*c, void *userdata),
void *userdata) {
assert(c && c->state == CONTEXT_READY);
if (complete == NULL) {
c->drain_complete_callback = NULL;
pa_pstream_set_drain_callback(c->pstream, NULL, NULL);
pa_pdispatch_set_drain_callback(c->pdispatch, NULL, NULL);
return 0;
}
if (!pa_context_is_pending(c))
return -1;
c->drain_complete_callback = complete;
c->drain_complete_userdata = userdata;
set_dispatch_callbacks(c);
return 0;
}

View file

@ -17,6 +17,11 @@ int pa_context_connect(
void (*complete) (struct pa_context*c, int success, void *userdata), void (*complete) (struct pa_context*c, int success, void *userdata),
void *userdata); void *userdata);
int pa_context_drain(
struct pa_context *c,
void (*complete) (struct pa_context*c, void *userdata),
void *userdata);
void pa_context_free(struct pa_context *c); void pa_context_free(struct pa_context *c);
void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata); void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata);
@ -25,6 +30,8 @@ int pa_context_is_dead(struct pa_context *c);
int pa_context_is_ready(struct pa_context *c); int pa_context_is_ready(struct pa_context *c);
int pa_context_errno(struct pa_context *c); int pa_context_errno(struct pa_context *c);
int pa_context_is_pending(struct pa_context *c);
struct pa_stream; struct pa_stream;
struct pa_stream* pa_stream_new( struct pa_stream* pa_stream_new(
@ -50,4 +57,6 @@ void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_strea
int pa_stream_is_dead(struct pa_stream *p); int pa_stream_is_dead(struct pa_stream *p);
int pa_stream_is_ready(struct pa_stream*p); int pa_stream_is_ready(struct pa_stream*p);
struct pa_context* pa_stream_get_context(struct pa_stream *p);
#endif #endif

View file

@ -9,9 +9,10 @@ enum pa_stream_direction {
}; };
struct pa_buffer_attr { struct pa_buffer_attr {
uint32_t queue_length; uint32_t maxlength;
uint32_t max_length; uint32_t tlength;
uint32_t prebuf; uint32_t prebuf;
uint32_t minreq;
}; };

View file

@ -17,9 +17,7 @@
#define COOKIE_FILE ".esd_auth" #define COOKIE_FILE ".esd_auth"
#define MEMBLOCKQ_LENGTH (10*1204) #define BUFFER_SECONDS (0.5)
#define MEMBLOCKQ_PREBUF (2*1024)
#define BUFSIZE (1024)
/* This is heavily based on esound's code */ /* This is heavily based on esound's code */
@ -196,6 +194,7 @@ static int esd_proto_stream_play(struct connection *c, const void *data, size_t
int format, rate; int format, rate;
struct pa_sink *sink; struct pa_sink *sink;
struct pa_sample_spec ss; struct pa_sample_spec ss;
size_t l;
assert(length == (sizeof(int)*2+ESD_NAME_MAX)); assert(length == (sizeof(int)*2+ESD_NAME_MAX));
format = maybe_swap_endian_32(c->swap_byte_order, *(int*)data); format = maybe_swap_endian_32(c->swap_byte_order, *(int*)data);
@ -217,7 +216,9 @@ static int esd_proto_stream_play(struct connection *c, const void *data, size_t
pa_client_rename(c->client, name); pa_client_rename(c->client, name);
assert(!c->input_memblockq); assert(!c->input_memblockq);
c->input_memblockq = pa_memblockq_new(MEMBLOCKQ_LENGTH, pa_sample_size(&ss), MEMBLOCKQ_PREBUF);
l = (size_t) (pa_bytes_per_second(&ss)*BUFFER_SECONDS);
c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&ss), l/2, l/10);
assert(c->input_memblockq); assert(c->input_memblockq);
assert(!c->sink_input); assert(!c->sink_input);
@ -252,7 +253,7 @@ static int esd_proto_get_latency(struct connection *c, const void *data, size_t
latency = 0; latency = 0;
else { else {
float usec = pa_sink_get_latency(sink); float usec = pa_sink_get_latency(sink);
usec += pa_samples_usec(MEMBLOCKQ_LENGTH-BUFSIZE, &sink->sample_spec); usec += BUFFER_SECONDS*1000000*.9; /* A better estimation would be a good idea! */
latency = (int) ((usec*44100)/1000000); latency = (int) ((usec*44100)/1000000);
} }
@ -452,16 +453,17 @@ static int do_read(struct connection *c) {
} else if (c->state == ESD_STREAMING_DATA) { } else if (c->state == ESD_STREAMING_DATA) {
struct pa_memchunk chunk; struct pa_memchunk chunk;
ssize_t r; ssize_t r;
size_t l;
assert(c->input_memblockq); assert(c->input_memblockq);
if (!pa_memblockq_is_writable(c->input_memblockq, BUFSIZE)) if (!(l = pa_memblockq_missing(c->input_memblockq)))
return 0; return 0;
chunk.memblock = pa_memblock_new(BUFSIZE); chunk.memblock = pa_memblock_new(l);
assert(chunk.memblock && chunk.memblock->data); assert(chunk.memblock && chunk.memblock->data);
if ((r = pa_iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) { if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) {
fprintf(stderr, "protocol-esound.c: read() failed: %s\n", r == 0 ? "EOF" : strerror(errno)); fprintf(stderr, "protocol-esound.c: read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
pa_memblock_unref(chunk.memblock); pa_memblock_unref(chunk.memblock);
return -1; return -1;

View file

@ -13,6 +13,8 @@ enum {
PA_COMMAND_REQUEST, PA_COMMAND_REQUEST,
PA_COMMAND_AUTH, PA_COMMAND_AUTH,
PA_COMMAND_SET_NAME, PA_COMMAND_SET_NAME,
PA_COMMAND_LOOKUP_SINK,
PA_COMMAND_LOOKUP_SOURCE,
PA_COMMAND_MAX PA_COMMAND_MAX
}; };

View file

@ -14,6 +14,7 @@
#include "pdispatch.h" #include "pdispatch.h"
#include "pstream-util.h" #include "pstream-util.h"
#include "authkey.h" #include "authkey.h"
#include "namereg.h"
struct connection; struct connection;
struct pa_protocol_native; struct pa_protocol_native;
@ -28,7 +29,6 @@ struct record_stream {
struct playback_stream { struct playback_stream {
struct connection *connection; struct connection *connection;
uint32_t index; uint32_t index;
size_t qlength;
struct pa_sink_input *sink_input; struct pa_sink_input *sink_input;
struct pa_memblockq *memblockq; struct pa_memblockq *memblockq;
size_t requested_bytes; size_t requested_bytes;
@ -58,11 +58,12 @@ static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i);
static void request_bytes(struct playback_stream*s); static void request_bytes(struct playback_stream*s);
static int command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static int command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static int command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static int command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata); static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata);
static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = { static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = { NULL }, [PA_COMMAND_ERROR] = { NULL },
@ -76,6 +77,8 @@ static const struct pa_pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_REQUEST] = { NULL }, [PA_COMMAND_REQUEST] = { NULL },
[PA_COMMAND_EXIT] = { command_exit }, [PA_COMMAND_EXIT] = { command_exit },
[PA_COMMAND_SET_NAME] = { command_set_name }, [PA_COMMAND_SET_NAME] = { command_set_name },
[PA_COMMAND_LOOKUP_SINK] = { command_lookup },
[PA_COMMAND_LOOKUP_SOURCE] = { command_lookup },
}; };
/* structure management */ /* structure management */
@ -89,14 +92,17 @@ static void record_stream_free(struct record_stream* r) {
free(r); free(r);
} }
static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) { static struct playback_stream* playback_stream_new(struct connection *c, struct pa_sink *sink, struct pa_sample_spec *ss, const char *name,
size_t maxlength,
size_t tlength,
size_t prebuf,
size_t minreq) {
struct playback_stream *s; struct playback_stream *s;
assert(c && sink && ss && name && qlen && maxlength && prebuf); assert(c && sink && ss && name && maxlength);
s = malloc(sizeof(struct playback_stream)); s = malloc(sizeof(struct playback_stream));
assert (s); assert (s);
s->connection = c; s->connection = c;
s->qlength = qlen;
s->sink_input = pa_sink_input_new(sink, name, ss); s->sink_input = pa_sink_input_new(sink, name, ss);
assert(s->sink_input); assert(s->sink_input);
@ -106,7 +112,7 @@ static struct playback_stream* playback_stream_new(struct connection *c, struct
s->sink_input->get_latency = sink_input_get_latency_cb; s->sink_input->get_latency = sink_input_get_latency_cb;
s->sink_input->userdata = s; s->sink_input->userdata = s;
s->memblockq = pa_memblockq_new(maxlength, pa_sample_size(ss), prebuf); s->memblockq = pa_memblockq_new(maxlength, tlength, pa_sample_size(ss), prebuf, minreq);
assert(s->memblockq); assert(s->memblockq);
s->requested_bytes = 0; s->requested_bytes = 0;
@ -149,13 +155,17 @@ static void request_bytes(struct playback_stream *s) {
size_t l; size_t l;
assert(s); assert(s);
if (!(l = pa_memblockq_missing_to(s->memblockq, s->qlength))) if (!(l = pa_memblockq_missing(s->memblockq)))
return; return;
if (l <= s->requested_bytes) if (l <= s->requested_bytes)
return; return;
l -= s->requested_bytes; l -= s->requested_bytes;
if (l < pa_memblockq_get_minreq(s->memblockq))
return;
s->requested_bytes += l; s->requested_bytes += l;
t = pa_tagstruct_new(NULL, 0); t = pa_tagstruct_new(NULL, 0);
@ -166,7 +176,7 @@ static void request_bytes(struct playback_stream *s) {
pa_tagstruct_putu32(t, l); pa_tagstruct_putu32(t, l);
pa_pstream_send_tagstruct(s->connection->pstream, t); pa_pstream_send_tagstruct(s->connection->pstream, t);
/* fprintf(stderr, "Requesting %u bytes\n", l);*/ /*fprintf(stderr, "Requesting %u bytes\n", l);*/
} }
/*** sinkinput callbacks ***/ /*** sinkinput callbacks ***/
@ -209,10 +219,15 @@ static uint32_t sink_input_get_latency_cb(struct pa_sink_input *i) {
/*** pdispatch callbacks ***/ /*** pdispatch callbacks ***/
static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void protocol_error(struct connection *c) {
fprintf(stderr, __FILE__": protocol error, kicking client\n");
connection_free(c);
}
static void command_create_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
struct playback_stream *s; struct playback_stream *s;
size_t maxlength, prebuf, qlength; size_t maxlength, tlength, prebuf, minreq;
uint32_t sink_index; uint32_t sink_index;
const char *name; const char *name;
struct pa_sample_spec ss; struct pa_sample_spec ss;
@ -223,15 +238,18 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm
if (pa_tagstruct_gets(t, &name) < 0 || if (pa_tagstruct_gets(t, &name) < 0 ||
pa_tagstruct_get_sample_spec(t, &ss) < 0 || pa_tagstruct_get_sample_spec(t, &ss) < 0 ||
pa_tagstruct_getu32(t, &sink_index) < 0 || pa_tagstruct_getu32(t, &sink_index) < 0 ||
pa_tagstruct_getu32(t, &qlength) < 0 ||
pa_tagstruct_getu32(t, &maxlength) < 0 || pa_tagstruct_getu32(t, &maxlength) < 0 ||
pa_tagstruct_getu32(t, &tlength) < 0 ||
pa_tagstruct_getu32(t, &prebuf) < 0 || pa_tagstruct_getu32(t, &prebuf) < 0 ||
!pa_tagstruct_eof(t)) pa_tagstruct_getu32(t, &minreq) < 0 ||
return -1; !pa_tagstruct_eof(t)) {
protocol_error(c);
return;
}
if (!c->authorized) { if (!c->authorized) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0; return;
} }
if (sink_index == (uint32_t) -1) if (sink_index == (uint32_t) -1)
@ -241,12 +259,12 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm
if (!sink) { if (!sink) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
return 0; return;
} }
if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) { if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, tlength, prebuf, minreq))) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID); pa_pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
return 0; return;
} }
reply = pa_tagstruct_new(NULL, 0); reply = pa_tagstruct_new(NULL, 0);
@ -258,107 +276,148 @@ static int command_create_playback_stream(struct pa_pdispatch *pd, uint32_t comm
pa_tagstruct_putu32(reply, s->sink_input->index); pa_tagstruct_putu32(reply, s->sink_input->index);
pa_pstream_send_tagstruct(c->pstream, reply); pa_pstream_send_tagstruct(c->pstream, reply);
request_bytes(s); request_bytes(s);
return 0;
} }
static int command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void command_delete_playback_stream(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
uint32_t channel; uint32_t channel;
struct playback_stream *s; struct playback_stream *s;
assert(c && t); assert(c && t);
if (pa_tagstruct_getu32(t, &channel) < 0 || if (pa_tagstruct_getu32(t, &channel) < 0 ||
!pa_tagstruct_eof(t)) !pa_tagstruct_eof(t)) {
return -1; protocol_error(c);
return;
}
if (!c->authorized) { if (!c->authorized) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0; return;
} }
if (!(s = pa_idxset_get_by_index(c->playback_streams, channel))) { if (!(s = pa_idxset_get_by_index(c->playback_streams, channel))) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); pa_pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
return 0; return;
} }
pa_pstream_send_simple_ack(c->pstream, tag); pa_pstream_send_simple_ack(c->pstream, tag);
return 0;
} }
static int command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void command_exit(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
assert(c && t); assert(c && t);
if (!pa_tagstruct_eof(t)) if (!pa_tagstruct_eof(t)) {
return -1; protocol_error(c);
return;
}
if (!c->authorized) { if (!c->authorized) {
pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0; return;
} }
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop); assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0); c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */ pa_pstream_send_simple_ack(c->pstream, tag); /* nonsense */
return 0; return;
} }
static int command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void command_auth(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
const void*cookie; const void*cookie;
assert(c && t); assert(c && t);
if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 || if (pa_tagstruct_get_arbitrary(t, &cookie, PA_NATIVE_COOKIE_LENGTH) < 0 ||
!pa_tagstruct_eof(t)) !pa_tagstruct_eof(t)) {
return -1; protocol_error(c);
return;
}
if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) { if (memcmp(c->protocol->auth_cookie, cookie, PA_NATIVE_COOKIE_LENGTH) != 0) {
fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n"); fprintf(stderr, "protocol-native.c: Denied access to client with invalid authorization key.\n");
pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); pa_pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0; return;
} }
c->authorized = 1; c->authorized = 1;
pa_pstream_send_simple_ack(c->pstream, tag); pa_pstream_send_simple_ack(c->pstream, tag);
return 0; return;
} }
static int command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) { static void command_set_name(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
const char *name; const char *name;
assert(c && t); assert(c && t);
if (pa_tagstruct_gets(t, &name) < 0 || if (pa_tagstruct_gets(t, &name) < 0 ||
!pa_tagstruct_eof(t)) !pa_tagstruct_eof(t)) {
return -1; protocol_error(c);
return;
}
pa_client_rename(c->client, name); pa_client_rename(c->client, name);
pa_pstream_send_simple_ack(c->pstream, tag); pa_pstream_send_simple_ack(c->pstream, tag);
return 0; return;
}
static void command_lookup(struct pa_pdispatch *pd, uint32_t command, uint32_t tag, struct pa_tagstruct *t, void *userdata) {
struct connection *c = userdata;
const char *name;
uint32_t index = PA_IDXSET_INVALID;
assert(c && t);
if (pa_tagstruct_gets(t, &name) < 0 ||
!pa_tagstruct_eof(t)) {
protocol_error(c);
return;
}
if (command == PA_COMMAND_LOOKUP_SINK) {
struct pa_sink *sink;
if ((sink = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SINK)))
index = sink->index;
} else {
struct pa_source *source;
assert(command == PA_COMMAND_LOOKUP_SOURCE);
if ((source = pa_namereg_get(c->protocol->core, name, PA_NAMEREG_SOURCE)))
index = source->index;
}
if (index == PA_IDXSET_INVALID)
pa_pstream_send_error(c->pstream, tag, PA_ERROR_NOENTITY);
else {
struct pa_tagstruct *reply;
reply = pa_tagstruct_new(NULL, 0);
assert(reply);
pa_tagstruct_putu32(reply, PA_COMMAND_REPLY);
pa_tagstruct_putu32(reply, tag);
pa_tagstruct_putu32(reply, index);
pa_pstream_send_tagstruct(c->pstream, reply);
}
} }
/*** pstream callbacks ***/ /*** pstream callbacks ***/
static int packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) { static void packet_callback(struct pa_pstream *p, struct pa_packet *packet, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
assert(p && packet && packet->data && c); assert(p && packet && packet->data && c);
if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) { if (pa_pdispatch_run(c->pdispatch, packet, c) < 0) {
fprintf(stderr, "protocol-native: invalid packet.\n"); fprintf(stderr, "protocol-native: invalid packet.\n");
return -1; connection_free(c);
} }
return 0;
} }
static int memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) { static void memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata) {
struct connection *c = userdata; struct connection *c = userdata;
struct playback_stream *stream; struct playback_stream *stream;
assert(p && chunk && userdata); assert(p && chunk && userdata);
if (!(stream = pa_idxset_get_by_index(c->playback_streams, channel))) { if (!(stream = pa_idxset_get_by_index(c->playback_streams, channel))) {
fprintf(stderr, "protocol-native: client sent block for invalid stream.\n"); fprintf(stderr, "protocol-native: client sent block for invalid stream.\n");
return -1; connection_free(c);
return;
} }
if (chunk->length >= stream->requested_bytes) if (chunk->length >= stream->requested_bytes)
@ -371,8 +430,6 @@ static int memblock_callback(struct pa_pstream *p, uint32_t channel, int32_t del
pa_sink_notify(stream->sink_input->sink); pa_sink_notify(stream->sink_input->sink);
/*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/ /*fprintf(stderr, "Recieved %u bytes.\n", chunk->length);*/
return 0;
} }
static void die_callback(struct pa_pstream *p, void *userdata) { static void die_callback(struct pa_pstream *p, void *userdata) {

View file

@ -28,7 +28,8 @@ struct pa_protocol_simple {
struct pa_sample_spec sample_spec; struct pa_sample_spec sample_spec;
}; };
#define BUFSIZE PIPE_BUF #define PLAYBACK_BUFFER_SECONDS (.5)
#define RECORD_BUFFER_SECONDS (5)
static void connection_free(struct connection *c) { static void connection_free(struct connection *c) {
assert(c); assert(c);
@ -52,17 +53,18 @@ static void connection_free(struct connection *c) {
static int do_read(struct connection *c) { static int do_read(struct connection *c) {
struct pa_memchunk chunk; struct pa_memchunk chunk;
ssize_t r; ssize_t r;
size_t l;
if (!pa_iochannel_is_readable(c->io)) if (!pa_iochannel_is_readable(c->io))
return 0; return 0;
if (!c->sink_input || !pa_memblockq_is_writable(c->input_memblockq, BUFSIZE)) if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq)))
return 0; return 0;
chunk.memblock = pa_memblock_new(BUFSIZE); chunk.memblock = pa_memblock_new(l);
assert(chunk.memblock); assert(chunk.memblock);
if ((r = pa_iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) { if ((r = pa_iochannel_read(c->io, chunk.memblock->data, l)) <= 0) {
fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno)); fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno));
pa_memblock_unref(chunk.memblock); pa_memblock_unref(chunk.memblock);
return -1; return -1;
@ -213,8 +215,8 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo
c->source_output->kill = source_output_kill_cb; c->source_output->kill = source_output_kill_cb;
c->source_output->userdata = c; c->source_output->userdata = c;
l = 5*pa_bytes_per_second(&p->sample_spec); /* 5s */ l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
c->output_memblockq = pa_memblockq_new(l, pa_sample_size(&p->sample_spec), l/2); c->output_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, 0);
} }
if (p->mode & PA_PROTOCOL_SIMPLE_PLAYBACK) { if (p->mode & PA_PROTOCOL_SIMPLE_PLAYBACK) {
@ -234,8 +236,8 @@ static void on_connection(struct pa_socket_server*s, struct pa_iochannel *io, vo
c->sink_input->get_latency = sink_input_get_latency_cb; c->sink_input->get_latency = sink_input_get_latency_cb;
c->sink_input->userdata = c; c->sink_input->userdata = c;
l = pa_bytes_per_second(&p->sample_spec)/2; /* half a second */ l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
c->input_memblockq = pa_memblockq_new(l, pa_sample_size(&p->sample_spec), l/2); c->input_memblockq = pa_memblockq_new(l, 0, pa_sample_size(&p->sample_spec), l/2, l/10);
} }

View file

@ -35,6 +35,8 @@ struct pa_pstream {
struct pa_iochannel *io; struct pa_iochannel *io;
struct pa_queue *send_queue; struct pa_queue *send_queue;
int in_use, shall_free;
int dead; int dead;
void (*die_callback) (struct pa_pstream *p, void *userdad); void (*die_callback) (struct pa_pstream *p, void *userdad);
void *die_callback_userdata; void *die_callback_userdata;
@ -46,9 +48,6 @@ struct pa_pstream {
size_t index; size_t index;
} write; } write;
void (*send_callback) (struct pa_pstream *p, void *userdata);
void *send_callback_userdata;
struct { struct {
struct pa_memblock *memblock; struct pa_memblock *memblock;
struct pa_packet *packet; struct pa_packet *packet;
@ -57,34 +56,51 @@ struct pa_pstream {
size_t index; size_t index;
} read; } read;
int (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata); void (*recieve_packet_callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata);
void *recieve_packet_callback_userdata; void *recieve_packet_callback_userdata;
int (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata); void (*recieve_memblock_callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata);
void *recieve_memblock_callback_userdata; void *recieve_memblock_callback_userdata;
void (*drain_callback)(struct pa_pstream *p, void *userdata);
void *drain_userdata;
}; };
static void do_write(struct pa_pstream *p); static void do_write(struct pa_pstream *p);
static void do_read(struct pa_pstream *p); static void do_read(struct pa_pstream *p);
static void do_something(struct pa_pstream *p) {
assert(p && !p->shall_free);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
p->in_use = 1;
do_write(p);
p->in_use = 0;
if (p->shall_free) {
pa_pstream_free(p);
return;
}
p->in_use = 1;
do_read(p);
p->in_use = 0;
if (p->shall_free) {
pa_pstream_free(p);
return;
}
}
static void io_callback(struct pa_iochannel*io, void *userdata) { static void io_callback(struct pa_iochannel*io, void *userdata) {
struct pa_pstream *p = userdata; struct pa_pstream *p = userdata;
assert(p && p->io == io); assert(p && p->io == io);
do_something(p);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
do_write(p);
do_read(p);
} }
static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) { static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) {
struct pa_pstream *p = userdata; struct pa_pstream *p = userdata;
assert(p && p->mainloop_source == id && p->mainloop == m); assert(p && p->mainloop_source == id && p->mainloop == m);
do_something(p);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
do_write(p);
do_read(p);
} }
struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io) { struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io) {
@ -115,15 +131,17 @@ struct pa_pstream *pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel
p->read.packet = NULL; p->read.packet = NULL;
p->read.index = 0; p->read.index = 0;
p->send_callback = NULL;
p->send_callback_userdata = NULL;
p->recieve_packet_callback = NULL; p->recieve_packet_callback = NULL;
p->recieve_packet_callback_userdata = NULL; p->recieve_packet_callback_userdata = NULL;
p->recieve_memblock_callback = NULL; p->recieve_memblock_callback = NULL;
p->recieve_memblock_callback_userdata = NULL; p->recieve_memblock_callback_userdata = NULL;
p->drain_callback = NULL;
p->drain_userdata = NULL;
p->in_use = p->shall_free = 0;
return p; return p;
} }
@ -146,6 +164,12 @@ static void item_free(void *item, void *p) {
void pa_pstream_free(struct pa_pstream *p) { void pa_pstream_free(struct pa_pstream *p) {
assert(p); assert(p);
if (p->in_use) {
/* If this pstream object is used by someone else on the call stack, we have to postpone the freeing */
p->dead = p->shall_free = 1;
return;
}
pa_iochannel_free(p->io); pa_iochannel_free(p->io);
pa_queue_free(p->send_queue, item_free, NULL); pa_queue_free(p->send_queue, item_free, NULL);
@ -162,13 +186,6 @@ void pa_pstream_free(struct pa_pstream *p) {
free(p); free(p);
} }
void pa_pstream_set_send_callback(struct pa_pstream*p, void (*callback) (struct pa_pstream *p, void *userdata), void *userdata) {
assert(p && callback);
p->send_callback = callback;
p->send_callback_userdata = userdata;
}
void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) { void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet) {
struct item_info *i; struct item_info *i;
assert(p && packet); assert(p && packet);
@ -199,14 +216,14 @@ void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t del
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1); p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
} }
void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) { void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata) {
assert(p && callback); assert(p && callback);
p->recieve_packet_callback = callback; p->recieve_packet_callback = callback;
p->recieve_packet_callback_userdata = userdata; p->recieve_packet_callback_userdata = userdata;
} }
void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata) { void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata) {
assert(p && callback); assert(p && callback);
p->recieve_memblock_callback = callback; p->recieve_memblock_callback = callback;
@ -261,7 +278,7 @@ static void do_write(struct pa_pstream *p) {
l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE); l = ntohl(p->write.descriptor[PA_PSTREAM_DESCRIPTOR_LENGTH]) - (p->write.index - PA_PSTREAM_DESCRIPTOR_SIZE);
} }
if ((r = pa_iochannel_write(p->io, d, l)) < 0) if ((r = pa_iochannel_write(p->io, d, l)) < 0)
goto die; goto die;
p->write.index += r; p->write.index += r;
@ -271,8 +288,8 @@ static void do_write(struct pa_pstream *p) {
item_free(p->write.current, (void *) 1); item_free(p->write.current, (void *) 1);
p->write.current = NULL; p->write.current = NULL;
if (p->send_callback && pa_queue_is_empty(p->send_queue)) if (p->drain_callback && !pa_pstream_is_pending(p))
p->send_callback(p, p->send_callback_userdata); p->drain_callback(p, p->drain_userdata);
} }
return; return;
@ -341,13 +358,14 @@ static void do_read(struct pa_pstream *p) {
chunk.memblock = p->read.memblock; chunk.memblock = p->read.memblock;
chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l; chunk.index = p->read.index - PA_PSTREAM_DESCRIPTOR_SIZE - l;
chunk.length = l; chunk.length = l;
if (p->recieve_memblock_callback(p, if (p->recieve_memblock_callback)
ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]), p->recieve_memblock_callback(
(int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]), p,
&chunk, ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_CHANNEL]),
p->recieve_memblock_callback_userdata) < 0) (int32_t) ntohl(p->read.descriptor[PA_PSTREAM_DESCRIPTOR_DELTA]),
goto die; &chunk,
p->recieve_memblock_callback_userdata);
} }
} }
@ -359,17 +377,13 @@ static void do_read(struct pa_pstream *p) {
pa_memblock_unref(p->read.memblock); pa_memblock_unref(p->read.memblock);
p->read.memblock = NULL; p->read.memblock = NULL;
} else { } else {
int r = 0;
assert(p->read.packet); assert(p->read.packet);
if (p->recieve_packet_callback) if (p->recieve_packet_callback)
r = p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata); p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
pa_packet_unref(p->read.packet); pa_packet_unref(p->read.packet);
p->read.packet = NULL; p->read.packet = NULL;
if (r < 0)
goto die;
} }
p->read.index = 0; p->read.index = 0;
@ -390,3 +404,21 @@ void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct p
p->die_callback = callback; p->die_callback = callback;
p->die_callback_userdata = userdata; p->die_callback_userdata = userdata;
} }
int pa_pstream_is_pending(struct pa_pstream *p) {
assert(p);
if (p->dead)
return 0;
return p->write.current || !pa_queue_is_empty(p->send_queue);
}
void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata) {
assert(p);
assert(!cb || pa_pstream_is_pending(p));
p->drain_callback = cb;
p->drain_userdata = userdata;
}

View file

@ -9,18 +9,22 @@
#include "mainloop-api.h" #include "mainloop-api.h"
#include "memchunk.h" #include "memchunk.h"
/* It is safe to destroy the calling pstream object from all callbacks */
struct pa_pstream; struct pa_pstream;
struct pa_pstream* pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io); struct pa_pstream* pa_pstream_new(struct pa_mainloop_api *m, struct pa_iochannel *io);
void pa_pstream_free(struct pa_pstream*p); void pa_pstream_free(struct pa_pstream*p);
void pa_pstream_set_send_callback(struct pa_pstream*p, void (*callback) (struct pa_pstream *p, void *userdata), void *userdata);
void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet); void pa_pstream_send_packet(struct pa_pstream*p, struct pa_packet *packet);
void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk); void pa_pstream_send_memblock(struct pa_pstream*p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk);
void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata); void pa_pstream_set_recieve_packet_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, struct pa_packet *packet, void *userdata), void *userdata);
void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, int (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata); void pa_pstream_set_recieve_memblock_callback(struct pa_pstream *p, void (*callback) (struct pa_pstream *p, uint32_t channel, int32_t delta, struct pa_memchunk *chunk, void *userdata), void *userdata);
void pa_pstream_set_drain_callback(struct pa_pstream *p, void (*cb)(struct pa_pstream *p, void *userdata), void *userdata);
void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata); void pa_pstream_set_die_callback(struct pa_pstream *p, void (*callback)(struct pa_pstream *p, void *userdata), void *userdata);
int pa_pstream_is_pending(struct pa_pstream *p);
#endif #endif

View file

@ -32,6 +32,8 @@ uint32_t pa_samples_usec(size_t length, const struct pa_sample_spec *spec);
int pa_sample_spec_valid(const struct pa_sample_spec *spec); int pa_sample_spec_valid(const struct pa_sample_spec *spec);
int pa_sample_spec_equal(const struct pa_sample_spec*a, const struct pa_sample_spec*b); int pa_sample_spec_equal(const struct pa_sample_spec*a, const struct pa_sample_spec*b);
#define PA_SAMPLE_SNPRINT_MAX_LENGTH 32
void pa_sample_snprint(char *s, size_t l, const struct pa_sample_spec *spec); void pa_sample_snprint(char *s, size_t l, const struct pa_sample_spec *spec);
#endif #endif

View file

@ -14,6 +14,27 @@ struct pa_simple {
int dead; int dead;
}; };
static int iterate(struct pa_simple *p, int block, int *perror) {
assert(p && p->context && p->mainloop && perror);
if (!block && !pa_context_is_pending(p->context))
return 0;
do {
if (pa_context_is_dead(p->context) || (p->stream && pa_stream_is_dead(p->stream))) {
*perror = pa_context_errno(p->context);
return -1;
}
if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) {
*perror = PA_ERROR_INTERNAL;
return -1;
}
} while (pa_context_is_pending(p->context));
return 0;
}
struct pa_simple* pa_simple_new( struct pa_simple* pa_simple_new(
const char *server, const char *server,
const char *name, const char *name,
@ -44,26 +65,18 @@ struct pa_simple* pa_simple_new(
goto fail; goto fail;
} }
/* Wait until the context is ready */
while (!pa_context_is_ready(p->context)) { while (!pa_context_is_ready(p->context)) {
if (pa_context_is_dead(p->context)) { if (iterate(p, 1, &error) < 0)
error = pa_context_errno(p->context);
goto fail;
}
if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0)
goto fail; goto fail;
} }
if (!(p->stream = pa_stream_new(p->context, dir, dev, stream_name, ss, attr, NULL, NULL))) if (!(p->stream = pa_stream_new(p->context, dir, dev, stream_name, ss, attr, NULL, NULL)))
goto fail; goto fail;
/* Wait until the stream is ready */
while (!pa_stream_is_ready(p->stream)) { while (!pa_stream_is_ready(p->stream)) {
if (pa_stream_is_dead(p->stream)) { if (iterate(p, 1, &error) < 0)
error = pa_context_errno(p->context);
goto fail;
}
if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0)
goto fail; goto fail;
} }
@ -96,17 +109,9 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
while (length > 0) { while (length > 0) {
size_t l; size_t l;
while (!(l = pa_stream_writable_size(p->stream))) { while (!(l = pa_stream_writable_size(p->stream)))
if (pa_context_is_dead(p->context)) { if (iterate(p, 1, perror) < 0)
*perror = pa_context_errno(p->context);
return -1; return -1;
}
if (pa_mainloop_iterate(p->mainloop, 1, NULL) < 0) {
*perror = PA_ERROR_INTERNAL;
return -1;
}
}
if (l > length) if (l > length)
l = length; l = length;
@ -116,9 +121,14 @@ int pa_simple_write(struct pa_simple *p, const void*data, size_t length, int *pe
length -= l; length -= l;
} }
/* Make sure that no data is pending for write */
if (iterate(p, 0, perror) < 0)
return -1;
return 0; return 0;
} }
int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *perror) { int pa_simple_read(struct pa_simple *s, void*data, size_t length, int *perror) {
assert(0); assert(0);
} }

View file

@ -274,8 +274,18 @@ char *pa_sink_list_to_string(struct pa_core *c) {
default_sink = pa_sink_get_default(c); default_sink = pa_sink_get_default(c);
for (sink = pa_idxset_first(c->sinks, &index); sink; sink = pa_idxset_next(c->sinks, &index)) { for (sink = pa_idxset_first(c->sinks, &index); sink; sink = pa_idxset_next(c->sinks, &index)) {
char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
pa_sample_snprint(ss, sizeof(ss), &sink->sample_spec);
assert(sink->monitor_source); assert(sink->monitor_source);
pa_strbuf_printf(s, " %c index: %u, name: <%s>, volume: <0x%04x>, latency: <%u usec>, monitor_source: <%u>\n", sink == default_sink ? '*' : ' ', sink->index, sink->name, (unsigned) sink->volume, pa_sink_get_latency(sink), sink->monitor_source->index); pa_strbuf_printf(
s,
" %c index: %u\n\tname: <%s>\n\tvolume: <0x%04x>\n\tlatency: <%u usec>\n\tmonitor_source: <%u>\n\tsample_spec: <%s>\n",
sink == default_sink ? '*' : ' ',
sink->index, sink->name,
(unsigned) sink->volume,
pa_sink_get_latency(sink),
sink->monitor_source->index,
ss);
} }
return pa_strbuf_tostring_free(s); return pa_strbuf_tostring_free(s);

View file

@ -85,13 +85,18 @@ char *pa_sink_input_list_to_string(struct pa_core *c) {
pa_strbuf_printf(s, "%u sink input(s) available.\n", pa_idxset_ncontents(c->sink_inputs)); pa_strbuf_printf(s, "%u sink input(s) available.\n", pa_idxset_ncontents(c->sink_inputs));
for (i = pa_idxset_first(c->sink_inputs, &index); i; i = pa_idxset_next(c->sink_inputs, &index)) { for (i = pa_idxset_first(c->sink_inputs, &index); i; i = pa_idxset_next(c->sink_inputs, &index)) {
char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
pa_sample_snprint(ss, sizeof(ss), &i->sample_spec);
assert(i->sink); assert(i->sink);
pa_strbuf_printf(s, " index: %u, name: <%s>, sink: <%u>; volume: <0x%04x>, latency: <%u usec>\n", pa_strbuf_printf(
i->index, s,
i->name, " index: %u\n\tname: <%s>\n\tsink: <%u>\n\tvolume: <0x%04x>\n\tlatency: <%u usec>\n\tsample_spec: <%s>\n",
i->sink->index, i->index,
(unsigned) i->volume, i->name,
pa_sink_input_get_latency(i)); i->sink->index,
(unsigned) i->volume,
pa_sink_input_get_latency(i),
ss);
} }
return pa_strbuf_tostring_free(s); return pa_strbuf_tostring_free(s);

View file

@ -5,6 +5,8 @@
#include "mainloop-api.h" #include "mainloop-api.h"
#include "iochannel.h" #include "iochannel.h"
/* It is safe to destroy the calling socket_client object from the callback */
struct pa_socket_client; struct pa_socket_client;
struct pa_socket_client* pa_socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port); struct pa_socket_client* pa_socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);

View file

@ -5,6 +5,8 @@
#include "mainloop-api.h" #include "mainloop-api.h"
#include "iochannel.h" #include "iochannel.h"
/* It is safe to destroy the calling socket_server object from the callback */
struct pa_socket_server; struct pa_socket_server;
struct pa_socket_server* pa_socket_server_new(struct pa_mainloop_api *m, int fd); struct pa_socket_server* pa_socket_server_new(struct pa_mainloop_api *m, int fd);

View file

@ -111,10 +111,12 @@ char *pa_source_list_to_string(struct pa_core *c) {
default_source = pa_source_get_default(c); default_source = pa_source_get_default(c);
for (source = pa_idxset_first(c->sources, &index); source; source = pa_idxset_next(c->sources, &index)) { for (source = pa_idxset_first(c->sources, &index); source; source = pa_idxset_next(c->sources, &index)) {
char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
char mo[256] = ""; char mo[256] = "";
if (source->monitor_of) if (source->monitor_of)
snprintf(mo, sizeof(mo), ", monitor_of: <%u>", source->monitor_of->index); snprintf(mo, sizeof(mo), "\n\tmonitor_of: <%u>", source->monitor_of->index);
pa_strbuf_printf(s, " %c index: %u, name: <%s>%s\n", source == default_source ? '*' : ' ', source->index, source->name, mo); pa_sample_snprint(ss, sizeof(ss), &source->sample_spec);
pa_strbuf_printf(s, " %c index: %u\n\tname: <%s>\n\tsample_spec: <%s>%s\n", source == default_source ? '*' : ' ', source->index, source->name, ss, mo);
} }
return pa_strbuf_tostring_free(s); return pa_strbuf_tostring_free(s);

View file

@ -68,11 +68,16 @@ char *pa_source_output_list_to_string(struct pa_core *c) {
pa_strbuf_printf(s, "%u source outputs(s) available.\n", pa_idxset_ncontents(c->source_outputs)); pa_strbuf_printf(s, "%u source outputs(s) available.\n", pa_idxset_ncontents(c->source_outputs));
for (o = pa_idxset_first(c->source_outputs, &index); o; o = pa_idxset_next(c->source_outputs, &index)) { for (o = pa_idxset_first(c->source_outputs, &index); o; o = pa_idxset_next(c->source_outputs, &index)) {
char ss[PA_SAMPLE_SNPRINT_MAX_LENGTH];
pa_sample_snprint(ss, sizeof(ss), &o->sample_spec);
assert(o->source); assert(o->source);
pa_strbuf_printf(s, " %c index: %u, name: <%s>, source: <%u>\n", pa_strbuf_printf(
o->index, s, " %c index: %u\n\tname: <%s>\n\tsource: <%u>\n\tsample_spec: <%u>\n",
o->name, o->index,
o->source->index); o->name,
o->source->index,
ss,
ss);
} }
return pa_strbuf_tostring_free(s); return pa_strbuf_tostring_free(s);

View file

@ -1,7 +1,7 @@
- recording (general, simple, esound, native) - recording (general, simple, esound, native)
- native library/protocol: - native library/protocol:
sync() function sync() function
more functions more functions (esp. latency)
- simple library - simple library
- config parser/cmdline - config parser/cmdline
@ -9,18 +9,20 @@
- move more stuff from module-oss[-dma] to liboss-util - move more stuff from module-oss[-dma] to liboss-util
- in module-oss: create source first, than sink - in module-oss: create source first, than sink
- client field for sinkinput/sourceoutput
- xmms+esound latency testing
- rename files - rename files
- svn-id and license in every file - svn-id and license in every file
- documentation - documentation
-- post 0.1 -- post 0.1
- future cancellation - future cancellation
- client-ui - client-ui
- clip cache - clip cache
- autoloading/autounloading - autoloading/autounloading
- ldap/rendezvous - slp/rendezvous
- doxygen - doxygen
drivers: drivers:

View file

@ -1,3 +1,4 @@
#include <signal.h>
#include <errno.h> #include <errno.h>
#include <assert.h> #include <assert.h>
#include <string.h> #include <string.h>
@ -51,7 +52,7 @@ void pa_peer_to_string(char *c, size_t l, int fd) {
ntohs(sa.in.sin_port)); ntohs(sa.in.sin_port));
return; return;
} else if (sa.sa.sa_family == AF_LOCAL) { } else if (sa.sa.sa_family == AF_LOCAL) {
snprintf(c, l, "UNIX client for %s", sa.un.sun_path); snprintf(c, l, "UNIX socket client");
return; return;
} }
@ -208,3 +209,15 @@ int pa_unix_socket_remove_stale(const char *fn) {
return 0; return 0;
} }
void pa_check_for_sigpipe(void) {
struct sigaction sa;
if (sigaction(SIGPIPE, NULL, &sa) < 0) {
fprintf(stderr, __FILE__": sigaction() failed: %s\n", strerror(errno));
return;
}
if (sa.sa_handler == SIG_DFL)
fprintf(stderr, "polypaudio: WARNING: SIGPIPE is not trapped. This might cause malfunction!\n");
}

View file

@ -18,4 +18,6 @@ ssize_t pa_loop_write(int fd, const void*data, size_t size);
int pa_unix_socket_is_stale(const char *fn); int pa_unix_socket_is_stale(const char *fn);
int pa_unix_socket_remove_stale(const char *fn); int pa_unix_socket_remove_stale(const char *fn);
void pa_check_for_sigpipe(void);
#endif #endif