main part of the native protocol

git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@31 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2004-06-23 23:17:30 +00:00
parent eecf602476
commit acb25b3510
62 changed files with 2473 additions and 755 deletions

View file

@ -33,7 +33,7 @@ else
automake -a -c
autoconf -Wall
./configure --sysconfdir=/etc "$@"
CFLAGS="-g -O0" ./configure --sysconfdir=/etc "$@"
make clean
fi

View file

@ -42,7 +42,7 @@ AC_PROG_LIBTOOL
# If using GCC specifiy some additional parameters
if test "x$GCC" = "xyes" ; then
CFLAGS="$CFLAGS -pipe -Wall -W"
CFLAGS="$CFLAGS -pipe -Wall -W -Wno-unused-parameter"
fi
AC_CONFIG_FILES([Makefile src/Makefile])

View file

@ -18,31 +18,39 @@
AM_CFLAGS=-ansi -D_GNU_SOURCE
bin_PROGRAMS = polypaudio
bin_PROGRAMS = polypaudio pacat
pkglib_LTLIBRARIES=libprotocol-simple.la module-simple-protocol-tcp.la \
libsocket-server.la module-pipe-sink.la libpstream.la libiochannel.la \
pkglib_LTLIBRARIES=libiochannel.la libsocket-server.la libsocket-client.la \
libprotocol-simple.la module-simple-protocol-tcp.la \
module-pipe-sink.la libpstream.la \
libpacket.la module-oss.la module-oss-mmap.la liboss.la libioline.la \
libcli.la module-cli.la libtokenizer.la libdynarray.la \
module-simple-protocol-unix.la module-cli-protocol-tcp.la \
libprotocol-cli.la libprotocol-native.la module-native-protocol-tcp.la \
module-native-protocol-unix.la module-cli-protocol-unix.la libtagstruct.la
libprotocol-cli.la module-cli-protocol-unix.la libtagstruct.la \
libpdispatch.la libprotocol-native.la libpstream-util.la \
module-native-protocol-tcp.la module-native-protocol-unix.la \
libpolyp.la
polypaudio_SOURCES = idxset.c idxset.h \
queue.c queue.h \
strbuf.c strbuf.h \
main.c main.h \
mainloop.c mainloop.h \
memblock.c memblock.h \
sample.c sample.h \
sample-util.c sample-util.h \
memblockq.c memblockq.h \
client.c client.h \
core.c core.h \
main.c main.h \
sourceoutput.c sourceoutput.h \
sinkinput.c sinkinput.h \
source.c source.h \
sink.c sink.h \
module.c module.h
module.c module.h \
mainloop-signal.c mainloop-signal.h \
mainloop-api.c mainloop-api.h \
util.c util.h
polypaudio_CFLAGS = $(AM_CFLAGS)
polypaudio_INCLUDES = $(INCLTDL)
polypaudio_LDADD = $(LIBLTDL)
@ -56,10 +64,22 @@ libsocket_server_la_SOURCES = socket-server.c socket-server.h
libsocket_server_la_LDFLAGS = -avoid-version
libsocket_server_la_LIBADD = libiochannel.la
libsocket_client_la_SOURCES = socket-client.c socket-client.h
libsocket_client_la_LDFLAGS = -avoid-version
libsocket_client_la_LIBADD = libiochannel.la
libpstream_la_SOURCES = pstream.c pstream.h
libpstream_la_LDFLAGS = -avoid-version
libpstream_la_LIBADD = libpacket.la
libpstream_util_la_SOURCES = pstream-util.c pstream-util.h
libpstream_util_la_LDFLAGS = -avoid-version
libpstream_util_la_LIBADD = libpstream.la libtagstruct.la
libpdispatch_la_SOURCES = pdispatch.c pdispatch.h
libpdispatch_la_LDFLAGS = -avoid-version
libpdispatch_la_LIBADD = libpacket.la libtagstruct.la
libiochannel_la_SOURCES = iochannel.c iochannel.h
libiochannel_la_LDFLAGS = -avoid-version
@ -90,7 +110,7 @@ libprotocol_cli_la_LIBADD = libsocket-server.la libiochannel.la libcli.la
libprotocol_native_la_SOURCES = protocol-native.c protocol-native.h
libprotocol_native_la_LDFLAGS = -avoid-version
libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la
libprotocol_native_la_LIBADD = libsocket-server.la libiochannel.la libpacket.la libpstream.la libpstream-util.la
libtagstruct_la_SOURCES = tagstruct.c tagstruct.h
libtagstruct_la_LDFLAGS = -avoid-version
@ -140,3 +160,24 @@ module_oss_mmap_la_LIBADD = libiochannel.la liboss.la
module_cli_la_SOURCES = module-cli.c
module_cli_la_LDFLAGS = -module -avoid-version
module_cli_la_LIBADD = libcli.la libiochannel.la libtokenizer.la
libpolyp_la_SOURCES = polyp.c polyp.h \
polypdef.h \
tagstruct.c tagstruct.h \
iochannel.c iochannel.h \
pstream.c pstream.h \
pstream-util.c pstream-util.h \
pdispatch.c pdispatch.h \
protocol-native-spec.h \
mainloop-api.c mainloop-api.h \
mainloop.c mainloop.h \
idxset.c idxset.h \
util.c util.h \
memblock.c memblock.h \
socket-client.c socket-client.h \
packet.c packet.h \
queue.c queue.h \
dynarray.c dynarray.h
pacat_SOURCES = pacat.c
pacat_LDADD = libpolyp.la

View file

@ -20,6 +20,8 @@ struct cli {
void (*eof_callback)(struct cli *c, void *userdata);
void *userdata;
struct client *client;
};
struct command {
@ -63,6 +65,7 @@ static const struct command commands[] = {
static const char prompt[] = ">>> ";
struct cli* cli_new(struct core *core, struct iochannel *io) {
char cname[256];
struct cli *c;
assert(io);
@ -75,16 +78,21 @@ struct cli* cli_new(struct core *core, struct iochannel *io) {
c->userdata = NULL;
c->eof_callback = NULL;
iochannel_peer_to_string(io, cname, sizeof(cname));
c->client = client_new(core, "CLI", cname);
assert(c->client);
ioline_set_callback(c->line, line_callback, c);
ioline_puts(c->line, "Welcome to polypaudio! Use \"help\" for usage information.\n");
ioline_puts(c->line, prompt);
return c;
}
void cli_free(struct cli *c) {
assert(c);
ioline_free(c->line);
client_free(c->client);
free(c);
}
@ -135,7 +143,7 @@ void cli_set_eof_callback(struct cli *c, void (*cb)(struct cli*c, void *userdata
static void cli_command_exit(struct cli *c, struct tokenizer *t) {
assert(c && c->core && c->core->mainloop && t);
mainloop_quit(c->core->mainloop, -1);
c->core->mainloop->quit(c->core->mainloop, 0);
}
static void cli_command_help(struct cli *c, struct tokenizer *t) {

View file

@ -7,7 +7,7 @@
#include "sink.h"
#include "source.h"
struct core* core_new(struct mainloop *m) {
struct core* core_new(struct pa_mainloop_api *m) {
struct core* c;
c = malloc(sizeof(struct core));
assert(c);

View file

@ -2,17 +2,17 @@
#define foocorehfoo
#include "idxset.h"
#include "mainloop.h"
#include "mainloop-api.h"
struct core {
struct mainloop *mainloop;
struct pa_mainloop_api *mainloop;
struct idxset *clients, *sinks, *sources, *sink_inputs, *source_outputs, *modules;
uint32_t default_source_index, default_sink_index;
};
struct core* core_new(struct mainloop *m);
struct core* core_new(struct pa_mainloop_api *m);
void core_free(struct core*c);
#endif

View file

@ -4,10 +4,11 @@
#include <unistd.h>
#include "iochannel.h"
#include "util.h"
struct iochannel {
int ifd, ofd;
struct mainloop* mainloop;
struct pa_mainloop_api* mainloop;
void (*callback)(struct iochannel*io, void *userdata);
void*userdata;
@ -17,43 +18,45 @@ struct iochannel {
int no_close;
struct mainloop_source* input_source, *output_source;
void* input_source, *output_source;
};
static void enable_mainloop_sources(struct iochannel *io) {
assert(io);
if (io->input_source == io->output_source) {
enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL;
enum pa_mainloop_api_io_events e = PA_MAINLOOP_API_IO_EVENT_NULL;
assert(io->input_source);
if (!io->readable)
e |= MAINLOOP_IO_EVENT_IN;
e |= PA_MAINLOOP_API_IO_EVENT_INPUT;
if (!io->writable)
e |= MAINLOOP_IO_EVENT_OUT;
e |= PA_MAINLOOP_API_IO_EVENT_OUTPUT;
mainloop_source_io_set_events(io->input_source, e);
io->mainloop->enable_io(io->mainloop, io->input_source, e);
} else {
if (io->input_source)
mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN);
io->mainloop->enable_io(io->mainloop, io->input_source, io->readable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_INPUT);
if (io->output_source)
mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT);
io->mainloop->enable_io(io->mainloop, io->output_source, io->writable ? PA_MAINLOOP_API_IO_EVENT_NULL : PA_MAINLOOP_API_IO_EVENT_OUTPUT);
}
}
static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) {
static void callback(struct pa_mainloop_api* m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct iochannel *io = userdata;
int changed = 0;
assert(s && fd >= 0 && userdata);
assert(m && fd >= 0 && events && userdata);
if ((events & MAINLOOP_IO_EVENT_IN) && !io->readable) {
if ((events & PA_MAINLOOP_API_IO_EVENT_INPUT) && !io->readable) {
io->readable = 1;
changed = 1;
assert(id == io->input_source);
}
if ((events & MAINLOOP_IO_EVENT_OUT) && !io->writable) {
if ((events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) && !io->writable) {
io->writable = 1;
changed = 1;
assert(id == io->output_source);
}
if (changed) {
@ -64,15 +67,7 @@ static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event ev
}
}
static void make_nonblock_fd(int fd) {
int v;
if ((v = fcntl(fd, F_GETFL)) >= 0)
if (!(v & O_NONBLOCK))
fcntl(fd, F_SETFL, v|O_NONBLOCK);
}
struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd) {
struct iochannel *io;
assert(m && (ifd >= 0 || ofd >= 0));
@ -90,18 +85,18 @@ struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
if (ifd == ofd) {
assert(ifd >= 0);
make_nonblock_fd(io->ifd);
io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io);
io->input_source = io->output_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_BOTH, callback, io);
} else {
if (ifd >= 0) {
make_nonblock_fd(io->ifd);
io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io);
io->input_source = m->source_io(m, ifd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, io);
} else
io->input_source = NULL;
if (ofd >= 0) {
make_nonblock_fd(io->ofd);
io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io);
io->output_source = m->source_io(m, ofd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, callback, io);
} else
io->output_source = NULL;
}
@ -120,9 +115,9 @@ void iochannel_free(struct iochannel*io) {
}
if (io->input_source)
mainloop_source_free(io->input_source);
io->mainloop->cancel_io(io->mainloop, io->input_source);
if (io->output_source && io->output_source != io->input_source)
mainloop_source_free(io->output_source);
io->mainloop->cancel_io(io->mainloop, io->output_source);
free(io);
}
@ -172,3 +167,8 @@ void iochannel_set_noclose(struct iochannel*io, int b) {
assert(io);
io->no_close = b;
}
void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l) {
assert(io && s && l);
peer_to_string(s, l, io->ifd);
}

View file

@ -2,11 +2,11 @@
#define fooiochannelhfoo
#include <sys/types.h>
#include "mainloop.h"
#include "mainloop-api.h"
struct iochannel;
struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd);
struct iochannel* iochannel_new(struct pa_mainloop_api*m, int ifd, int ofd);
void iochannel_free(struct iochannel*io);
ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l);
@ -19,4 +19,6 @@ void iochannel_set_noclose(struct iochannel*io, int b);
void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata);
void iochannel_peer_to_string(struct iochannel*io, char*s, size_t l);
#endif

View file

@ -8,46 +8,52 @@
#include "core.h"
#include "mainloop.h"
#include "module.h"
#include "mainloop-signal.h"
int stdin_inuse = 0, stdout_inuse = 0;
static void signal_callback(struct mainloop_source *m, int sig, void *userdata) {
mainloop_quit(mainloop_source_get_mainloop(m), -1);
static struct pa_mainloop *mainloop;
static void signal_callback(void *id, int sig, void *userdata) {
struct pa_mainloop_api* m = pa_mainloop_get_api(mainloop);
m->quit(m, 1);
fprintf(stderr, "main: got signal.\n");
}
int main(int argc, char *argv[]) {
struct mainloop *m;
struct core *c;
int r;
int r, retval = 0;
r = lt_dlinit();
assert(r == 0);
m = mainloop_new();
assert(m);
c = core_new(m);
assert(c);
mainloop = pa_mainloop_new();
assert(mainloop);
mainloop_source_new_signal(m, SIGINT, signal_callback, NULL);
r = pa_signal_init(pa_mainloop_get_api(mainloop));
assert(r == 0);
pa_signal_register(SIGINT, signal_callback, NULL);
signal(SIGPIPE, SIG_IGN);
c = core_new(pa_mainloop_get_api(mainloop));
assert(c);
module_load(c, "module-oss-mmap", "/dev/dsp1");
module_load(c, "module-pipe-sink", NULL);
module_load(c, "module-simple-protocol-tcp", NULL);
module_load(c, "module-cli", NULL);
fprintf(stderr, "main: mainloop entry.\n");
while (mainloop_iterate(m, 1) == 0);
/* fprintf(stderr, "main: %u blocks\n", n_blocks);*/
if (pa_mainloop_run(mainloop, &retval) < 0)
retval = 1;
fprintf(stderr, "main: mainloop exit.\n");
mainloop_run(m);
core_free(c);
mainloop_free(m);
pa_signal_done();
pa_mainloop_free(mainloop);
lt_dlexit();
return 0;
return retval;
}

35
src/mainloop-api.c Normal file
View file

@ -0,0 +1,35 @@
#include <assert.h>
#include <stdlib.h>
#include "mainloop-api.h"
struct once_info {
void (*callback)(void *userdata);
void *userdata;
};
static void once_callback(struct pa_mainloop_api *api, void *id, void *userdata) {
struct once_info *i = userdata;
assert(api && i && i->callback);
i->callback(i->userdata);
assert(api->cancel_fixed);
api->cancel_fixed(api, id);
free(i);
}
void pa_mainloop_api_once(struct pa_mainloop_api* api, void (*callback)(void *userdata), void *userdata) {
struct once_info *i;
void *id;
assert(api && callback);
i = malloc(sizeof(struct once_info));
assert(i);
i->callback = callback;
i->userdata = userdata;
assert(api->source_fixed);
id = api->source_fixed(api, once_callback, i);
assert(id);
/* Note: if the mainloop is destroyed before once_callback() was called, some memory is leaked. */
}

43
src/mainloop-api.h Normal file
View file

@ -0,0 +1,43 @@
#ifndef foomainloopapihfoo
#define foomainloopapihfoo
#include <time.h>
#include <sys/time.h>
enum pa_mainloop_api_io_events {
PA_MAINLOOP_API_IO_EVENT_NULL = 0,
PA_MAINLOOP_API_IO_EVENT_INPUT = 1,
PA_MAINLOOP_API_IO_EVENT_OUTPUT = 2,
PA_MAINLOOP_API_IO_EVENT_BOTH = 3
};
struct pa_mainloop_api {
void *userdata;
/* IO sources */
void* (*source_io)(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata);
void (*enable_io)(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events);
void (*cancel_io)(struct pa_mainloop_api*a, void* id);
/* Fixed sources */
void* (*source_fixed)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata);
void (*enable_fixed)(struct pa_mainloop_api*a, void* id, int b);
void (*cancel_fixed)(struct pa_mainloop_api*a, void* id);
/* Idle sources */
void* (*source_idle)(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata);
void (*enable_idle)(struct pa_mainloop_api*a, void* id, int b);
void (*cancel_idle)(struct pa_mainloop_api*a, void* id);
/* Time sources */
void* (*source_time)(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata);
void (*enable_time)(struct pa_mainloop_api*a, void *id, const struct timeval *tv);
void (*cancel_time)(struct pa_mainloop_api*a, void* id);
/* Exit mainloop */
void (*quit)(struct pa_mainloop_api*a, int retval);
};
void pa_mainloop_api_once(struct pa_mainloop_api*m, void (*callback)(void *userdata), void *userdata);
#endif

138
src/mainloop-signal.c Normal file
View file

@ -0,0 +1,138 @@
#include <stdio.h>
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mainloop-signal.h"
#include "util.h"
struct signal_info {
int sig;
struct sigaction saved_sigaction;
void (*callback) (void *id, int signal, void *userdata);
void *userdata;
struct signal_info *previous, *next;
};
static struct pa_mainloop_api *api = NULL;
static int signal_pipe[2] = { -1, -1 };
static void* mainloop_source = NULL;
static struct signal_info *signals = NULL;
static void signal_handler(int sig) {
write(signal_pipe[1], &sig, sizeof(sig));
}
static void callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
assert(a && id && events == PA_MAINLOOP_API_IO_EVENT_INPUT && id == mainloop_source && fd == signal_pipe[0]);
for (;;) {
ssize_t r;
int sig;
struct signal_info*s;
if ((r = read(signal_pipe[0], &sig, sizeof(sig))) < 0) {
if (errno == EAGAIN)
return;
fprintf(stderr, "signal.c: read(): %s\n", strerror(errno));
return;
}
if (r != sizeof(sig)) {
fprintf(stderr, "signal.c: short read()\n");
return;
}
for (s = signals; s; s = s->next)
if (s->sig == sig) {
assert(s->callback);
s->callback(s, sig, s->userdata);
break;
}
}
}
int pa_signal_init(struct pa_mainloop_api *a) {
assert(a);
if (pipe(signal_pipe) < 0) {
fprintf(stderr, "pipe() failed: %s\n", strerror(errno));
return -1;
}
make_nonblock_fd(signal_pipe[0]);
make_nonblock_fd(signal_pipe[1]);
api = a;
mainloop_source = api->source_io(api, signal_pipe[0], PA_MAINLOOP_API_IO_EVENT_INPUT, callback, NULL);
assert(mainloop_source);
return 0;
}
void pa_signal_done(void) {
assert(api && signal_pipe[0] >= 0 && signal_pipe[1] >= 0 && mainloop_source);
api->cancel_io(api, mainloop_source);
mainloop_source = NULL;
close(signal_pipe[0]);
close(signal_pipe[1]);
signal_pipe[0] = signal_pipe[1] = -1;
while (signals)
pa_signal_unregister(signals);
api = NULL;
}
void* pa_signal_register(int sig, void (*callback) (void *id, int signal, void *userdata), void *userdata) {
struct signal_info *s = NULL;
struct sigaction sa;
assert(sig > 0 && callback);
for (s = signals; s; s = s->next)
if (s->sig == sig)
goto fail;
s = malloc(sizeof(struct signal_info));
assert(s);
s->sig = sig;
s->callback = callback;
s->userdata = userdata;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if (sigaction(sig, &sa, &s->saved_sigaction) < 0)
goto fail;
s->previous = NULL;
s->next = signals;
signals = s;
return s;
fail:
if (s)
free(s);
return NULL;
}
void pa_signal_unregister(void *id) {
struct signal_info *s = id;
assert(s);
if (s->next)
s->next->previous = s->previous;
if (s->previous)
s->previous->next = s->next;
else
signals = s->next;
sigaction(s->sig, &s->saved_sigaction, NULL);
free(s);
}

12
src/mainloop-signal.h Normal file
View file

@ -0,0 +1,12 @@
#ifndef foomainloopsignalhfoo
#define foomainloopsignalhfoo
#include "mainloop-api.h"
int pa_signal_init(struct pa_mainloop_api *api);
void pa_signal_done(void);
void* pa_signal_register(int signal, void (*callback) (void *id, int signal, void *userdata), void *userdata);
void pa_signal_unregister(void *id);
#endif

View file

@ -1,3 +1,4 @@
#include <stdio.h>
#include <signal.h>
#include <unistd.h>
#include <sys/poll.h>
@ -8,468 +9,515 @@
#include <errno.h>
#include "mainloop.h"
#include "util.h"
#include "idxset.h"
struct mainloop_source {
struct mainloop_source *next;
struct mainloop *mainloop;
enum mainloop_source_type type;
int enabled;
struct mainloop_source_header {
struct pa_mainloop *mainloop;
int dead;
};
struct mainloop_source_io {
struct mainloop_source_header header;
int fd;
enum pa_mainloop_api_io_events events;
void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata);
void *userdata;
struct {
int fd;
enum mainloop_io_event events;
void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata);
struct pollfd pollfd;
} io;
struct {
void (*callback)(struct mainloop_source*s, void *userdata);
} fixed;
struct {
void (*callback)(struct mainloop_source*s, void *userdata);
} idle;
struct {
int sig;
struct sigaction sigaction;
void (*callback)(struct mainloop_source*s, int sig, void *userdata);
} signal;
struct pollfd *pollfd;
};
struct mainloop_source_list {
struct mainloop_source *sources;
int n_sources;
int dead_sources;
struct mainloop_source_fixed_or_idle {
struct mainloop_source_header header;
int enabled;
void (*callback)(struct pa_mainloop_api*a, void *id, void *userdata);
void *userdata;
};
struct mainloop {
struct mainloop_source_list io_sources, fixed_sources, idle_sources, signal_sources;
struct mainloop_source_time {
struct mainloop_source_header header;
int enabled;
struct timeval timeval;
void (*callback)(struct pa_mainloop_api*a, void *id, const struct timeval*tv, void *userdata);
void *userdata;
};
struct pa_mainloop {
struct idxset *io_sources, *fixed_sources, *idle_sources, *time_sources;
int io_sources_scan_dead, fixed_sources_scan_dead, idle_sources_scan_dead, time_sources_scan_dead;
struct pollfd *pollfds;
int max_pollfds, n_pollfds;
unsigned max_pollfds, n_pollfds;
int rebuild_pollfds;
int quit;
int running;
int signal_pipe[2];
struct pollfd signal_pollfd;
int quit, running, retval;
struct pa_mainloop_api api;
};
static int signal_pipe = -1;
static void setup_api(struct pa_mainloop *m);
static void signal_func(int sig) {
if (signal_pipe >= 0)
write(signal_pipe, &sig, sizeof(sig));
}
struct pa_mainloop *pa_mainloop_new(void) {
struct pa_mainloop *m;
static void make_nonblock(int fd) {
int v;
if ((v = fcntl(fd, F_GETFL)) >= 0)
fcntl(fd, F_SETFL, v|O_NONBLOCK);
}
struct mainloop *mainloop_new(void) {
int r;
struct mainloop *m;
m = malloc(sizeof(struct mainloop));
m = malloc(sizeof(struct pa_mainloop));
assert(m);
memset(m, 0, sizeof(struct mainloop));
r = pipe(m->signal_pipe);
assert(r >= 0 && m->signal_pipe[0] >= 0 && m->signal_pipe[1] >= 0);
m->io_sources = idxset_new(NULL, NULL);
m->fixed_sources = idxset_new(NULL, NULL);
m->idle_sources = idxset_new(NULL, NULL);
m->time_sources = idxset_new(NULL, NULL);
make_nonblock(m->signal_pipe[0]);
make_nonblock(m->signal_pipe[1]);
assert(m->io_sources && m->fixed_sources && m->idle_sources && m->time_sources);
m->io_sources_scan_dead = m->fixed_sources_scan_dead = m->idle_sources_scan_dead = m->time_sources_scan_dead = 0;
signal_pipe = m->signal_pipe[1];
m->signal_pollfd.fd = m->signal_pipe[0];
m->signal_pollfd.events = POLLIN;
m->signal_pollfd.revents = 0;
m->pollfds = NULL;
m->max_pollfds = m->n_pollfds = m->rebuild_pollfds = 0;
m->quit = m->running = m->retval = 0;
setup_api(m);
return m;
}
static void free_sources(struct mainloop_source_list *l, int all) {
struct mainloop_source *s, *p;
assert(l);
static int foreach(void *p, uint32_t index, int *del, void*userdata) {
struct mainloop_source_header *h = p;
int *all = userdata;
assert(p && del && all);
if (!all && !l->dead_sources)
return;
p = NULL;
s = l->sources;
while (s) {
if (all || s->dead) {
struct mainloop_source *t = s;
s = s->next;
if (p)
p->next = s;
else
l->sources = s;
free(t);
} else {
p = s;
s = s->next;
}
if (*all || h->dead) {
free(h);
*del = 1;
}
l->dead_sources = 0;
return 0;
};
if (all) {
assert(!l->sources);
l->n_sources = 0;
}
}
void mainloop_free(struct mainloop* m) {
void pa_mainloop_free(struct pa_mainloop* m) {
int all = 1;
assert(m);
free_sources(&m->io_sources, 1);
free_sources(&m->fixed_sources, 1);
free_sources(&m->idle_sources, 1);
free_sources(&m->signal_sources, 1);
idxset_foreach(m->io_sources, foreach, &all);
idxset_foreach(m->fixed_sources, foreach, &all);
idxset_foreach(m->idle_sources, foreach, &all);
idxset_foreach(m->time_sources, foreach, &all);
idxset_free(m->io_sources, NULL, NULL);
idxset_free(m->fixed_sources, NULL, NULL);
idxset_free(m->idle_sources, NULL, NULL);
idxset_free(m->time_sources, NULL, NULL);
if (signal_pipe == m->signal_pipe[1])
signal_pipe = -1;
close(m->signal_pipe[0]);
close(m->signal_pipe[1]);
free(m->pollfds);
free(m);
}
static void rebuild_pollfds(struct mainloop *m) {
struct mainloop_source*s;
static void scan_dead(struct pa_mainloop *m) {
int all = 0;
assert(m);
if (m->io_sources_scan_dead)
idxset_foreach(m->io_sources, foreach, &all);
if (m->fixed_sources_scan_dead)
idxset_foreach(m->fixed_sources, foreach, &all);
if (m->idle_sources_scan_dead)
idxset_foreach(m->idle_sources, foreach, &all);
if (m->time_sources_scan_dead)
idxset_foreach(m->time_sources, foreach, &all);
}
static void rebuild_pollfds(struct pa_mainloop *m) {
struct mainloop_source_io*s;
struct pollfd *p;
if (m->max_pollfds < m->io_sources.n_sources+1) {
m->max_pollfds = (m->io_sources.n_sources+1)*2;
m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds);
uint32_t index = IDXSET_INVALID;
unsigned l;
l = idxset_ncontents(m->io_sources);
if (m->max_pollfds < l) {
m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*l);
m->max_pollfds = l;
}
m->n_pollfds = 0;
p = m->pollfds;
for (s = m->io_sources.sources; s; s = s->next) {
assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) {
*(p++) = s->io.pollfd;
m->n_pollfds++;
for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
if (s->header.dead) {
s->pollfd = NULL;
continue;
}
}
*(p++) = m->signal_pollfd;
m->n_pollfds++;
s->pollfd = p;
p->fd = s->fd;
p->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0);
p->revents = 0;
p++;
m->n_pollfds++;
}
}
static void dispatch_pollfds(struct mainloop *m) {
int i;
struct pollfd *p;
struct mainloop_source *s;
/* This loop assumes that m->sources and m->pollfds have the same
* order and that m->pollfds is a subset of m->sources! */
static void dispatch_pollfds(struct pa_mainloop *m) {
uint32_t index = IDXSET_INVALID;
struct mainloop_source_io *s;
s = m->io_sources.sources;
for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) {
if (!p->revents)
for (s = idxset_first(m->io_sources, &index); s; s = idxset_next(m->io_sources, &index)) {
if (s->header.dead || !s->events || !s->pollfd || !s->pollfd->revents)
continue;
assert(s->pollfd->revents <= s->pollfd->events && s->pollfd->fd == s->fd && s->callback);
s->callback(&m->api, s, s->fd, ((s->pollfd->revents & POLLIN) ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | ((s->pollfd->revents & POLLOUT) ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), s->userdata);
}
}
static void run_fixed_or_idle(struct pa_mainloop *m, struct idxset *i) {
uint32_t index = IDXSET_INVALID;
struct mainloop_source_fixed_or_idle *s;
for (s = idxset_first(i, &index); s; s = idxset_next(i, &index)) {
if (s->header.dead || !s->enabled)
continue;
if (p->fd == m->signal_pipe[0]) {
/* Event from signal pipe */
assert(s->callback);
s->callback(&m->api, s, s->userdata);
}
}
if (p->revents & POLLIN) {
int sig;
ssize_t r;
r = read(m->signal_pipe[0], &sig, sizeof(sig));
assert((r < 0 && errno == EAGAIN) || r == sizeof(sig));
static int calc_next_timeout(struct pa_mainloop *m) {
uint32_t index = IDXSET_INVALID;
struct mainloop_source_time *s;
struct timeval now;
int t = -1;
if (idxset_isempty(m->time_sources))
return -1;
gettimeofday(&now, NULL);
for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) {
int tmp;
if (s->header.dead || !s->enabled)
continue;
if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec))
return 0;
tmp = (s->timeval.tv_sec - now.tv_sec)*1000;
if (r == sizeof(sig)) {
struct mainloop_source *l = m->signal_sources.sources;
while (l) {
assert(l->type == MAINLOOP_SOURCE_TYPE_SIGNAL);
if (l->signal.sig == sig && l->enabled && !l->dead) {
assert(l->signal.callback);
l->signal.callback(l, sig, l->userdata);
}
l = l->next;
}
}
}
if (s->timeval.tv_usec > now.tv_usec)
tmp += (s->timeval.tv_usec - now.tv_usec)/1000;
else
tmp -= (now.tv_usec - s->timeval.tv_usec)/1000;
} else {
/* Event from I/O source */
if (tmp == 0)
return 0;
else if (tmp < t)
t = tmp;
}
for (; s; s = s->next) {
if (p->fd != s->io.fd)
continue;
assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
return t;
}
if (!s->dead && s->enabled) {
enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0);
if (e) {
assert(s->io.callback);
s->io.callback(s, s->io.fd, e, s->userdata);
}
}
static void dispatch_timeout(struct pa_mainloop *m) {
uint32_t index = IDXSET_INVALID;
struct mainloop_source_time *s;
struct timeval now;
assert(m);
break;
}
if (idxset_isempty(m->time_sources))
return;
gettimeofday(&now, NULL);
for (s = idxset_first(m->time_sources, &index); s; s = idxset_next(m->time_sources, &index)) {
if (s->header.dead || !s->enabled)
continue;
if (s->timeval.tv_sec < now.tv_sec || (s->timeval.tv_sec == now.tv_sec && s->timeval.tv_usec <= now.tv_usec)) {
assert(s->callback);
s->enabled = 0;
s->callback(&m->api, s, &s->timeval, s->userdata);
}
}
}
int mainloop_iterate(struct mainloop *m, int block) {
struct mainloop_source *s;
int c;
static int any_idle_sources(struct pa_mainloop *m) {
struct mainloop_source_fixed_or_idle *s;
uint32_t index;
assert(m);
for (s = idxset_first(m->idle_sources, &index); s; s = idxset_next(m->idle_sources, &index))
if (!s->header.dead && s->enabled)
return 1;
return 0;
}
int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval) {
int r, idle;
assert(m && !m->running);
if(m->quit)
return m->quit;
free_sources(&m->io_sources, 0);
free_sources(&m->fixed_sources, 0);
free_sources(&m->idle_sources, 0);
free_sources(&m->signal_sources, 0);
for (s = m->fixed_sources.sources; s; s = s->next) {
assert(s->type == MAINLOOP_SOURCE_TYPE_FIXED);
if (!s->dead && s->enabled) {
assert(s->fixed.callback);
s->fixed.callback(s, s->userdata);
}
if(m->quit) {
if (retval)
*retval = m->retval;
return 1;
}
m->running = 1;
scan_dead(m);
run_fixed_or_idle(m, m->fixed_sources);
if (m->rebuild_pollfds) {
rebuild_pollfds(m);
m->rebuild_pollfds = 0;
}
m->running = 1;
idle = any_idle_sources(m);
do {
c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0);
} while (c < 0 && errno == EINTR);
if (c > 0)
int t;
if (!block || idle)
t = 0;
else
t = calc_next_timeout(m);
r = poll(m->pollfds, m->n_pollfds, t);
} while (r < 0 && errno == EINTR);
dispatch_timeout(m);
if (r > 0)
dispatch_pollfds(m);
else if (c == 0) {
for (s = m->idle_sources.sources; s; s = s->next) {
assert(s->type == MAINLOOP_SOURCE_TYPE_IDLE);
if (!s->dead && s->enabled) {
assert(s->idle.callback);
s->idle.callback(s, s->userdata);
}
}
}
else if (r == 0 && idle)
run_fixed_or_idle(m, m->idle_sources);
else if (r < 0)
fprintf(stderr, "select(): %s\n", strerror(errno));
m->running = 0;
return c < 0 ? -1 : 0;
return r < 0 ? -1 : 0;
}
int mainloop_run(struct mainloop *m) {
int pa_mainloop_run(struct pa_mainloop *m, int *retval) {
int r;
while (!(r = mainloop_iterate(m, 1)));
while ((r = pa_mainloop_iterate(m, 1, retval)) == 0);
return r;
}
void mainloop_quit(struct mainloop *m, int r) {
void pa_mainloop_quit(struct pa_mainloop *m, int r) {
assert(m);
m->quit = r;
}
static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) {
struct mainloop_source_list *l;
switch(type) {
case MAINLOOP_SOURCE_TYPE_IO:
l = &m->io_sources;
break;
case MAINLOOP_SOURCE_TYPE_FIXED:
l = &m->fixed_sources;
break;
case MAINLOOP_SOURCE_TYPE_IDLE:
l = &m->idle_sources;
break;
case MAINLOOP_SOURCE_TYPE_SIGNAL:
l = &m->signal_sources;
break;
default:
l = NULL;
break;
}
return l;
}
/* IO sources */
static void* mainloop_source_io(struct pa_mainloop_api*a, int fd, enum pa_mainloop_api_io_events events, void (*callback) (struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata), void *userdata) {
struct pa_mainloop *m;
struct mainloop_source_io *s;
assert(a && a->userdata && fd >= 0 && callback);
m = a->userdata;
assert(a == &m->api);
static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) {
struct mainloop_source_list *l;
struct mainloop_source* s;
assert(m);
s = malloc(sizeof(struct mainloop_source));
s = malloc(sizeof(struct mainloop_source_io));
assert(s);
memset(s, 0, sizeof(struct mainloop_source));
s->header.mainloop = m;
s->header.dead = 0;
s->type = type;
s->mainloop = m;
l = get_source_list(m, type);
assert(l);
s->next = l->sources;
l->sources = s;
l->n_sources++;
return s;
}
struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) {
struct mainloop_source* s;
assert(m && fd>=0 && callback);
s = source_new(m, MAINLOOP_SOURCE_TYPE_IO);
s->io.fd = fd;
s->io.events = event;
s->io.callback = callback;
s->fd = fd;
s->events = events;
s->callback = callback;
s->userdata = userdata;
s->io.pollfd.fd = fd;
s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0);
s->io.pollfd.revents = 0;
s->enabled = 1;
s->pollfd = NULL;
idxset_put(m->io_sources, s, NULL);
m->rebuild_pollfds = 1;
return s;
}
struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
struct mainloop_source* s;
assert(m && callback);
static void mainloop_enable_io(struct pa_mainloop_api*a, void* id, enum pa_mainloop_api_io_events events) {
struct pa_mainloop *m;
struct mainloop_source_io *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api && s->header.mainloop == m);
s = source_new(m, MAINLOOP_SOURCE_TYPE_FIXED);
s->events = events;
if (s->pollfd)
s->pollfd->events = ((s->events & PA_MAINLOOP_API_IO_EVENT_INPUT) ? POLLIN : 0) | ((s->events & PA_MAINLOOP_API_IO_EVENT_OUTPUT) ? POLLOUT : 0);
}
static void mainloop_cancel_io(struct pa_mainloop_api*a, void* id) {
struct pa_mainloop *m;
struct mainloop_source_io *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api && s->header.mainloop == m);
s->header.dead = 1;
m->io_sources_scan_dead = 1;
}
/* Fixed sources */
static void* mainloop_source_fixed(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) {
struct pa_mainloop *m;
struct mainloop_source_fixed_or_idle *s;
assert(a && a->userdata && callback);
m = a->userdata;
assert(a == &m->api);
s = malloc(sizeof(struct mainloop_source_fixed_or_idle));
assert(s);
s->header.mainloop = m;
s->header.dead = 0;
s->fixed.callback = callback;
s->userdata = userdata;
s->enabled = 1;
s->callback = callback;
s->userdata = userdata;
idxset_put(m->fixed_sources, s, NULL);
return s;
}
struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
struct mainloop_source* s;
assert(m && callback);
s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE);
s->idle.callback = callback;
s->userdata = userdata;
s->enabled = 1;
return s;
}
struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata) {
struct mainloop_source* s;
struct sigaction save_sa, sa;
assert(m && callback);
memset(&sa, 0, sizeof(sa));
sa.sa_handler = signal_func;
sa.sa_flags = SA_RESTART;
sigemptyset(&sa.sa_mask);
memset(&save_sa, 0, sizeof(save_sa));
if (sigaction(sig, &sa, &save_sa) < 0)
return NULL;
s = source_new(m, MAINLOOP_SOURCE_TYPE_SIGNAL);
s->signal.sig = sig;
s->signal.sigaction = save_sa;
s->signal.callback = callback;
s->userdata = userdata;
s->enabled = 1;
return s;
}
void mainloop_source_free(struct mainloop_source*s) {
struct mainloop_source_list *l;
assert(s && !s->dead);
s->dead = 1;
assert(s->mainloop);
l = get_source_list(s->mainloop, s->type);
assert(l);
l->n_sources--;
l->dead_sources = 1;
if (s->type == MAINLOOP_SOURCE_TYPE_IO)
s->mainloop->rebuild_pollfds = 1;
else if (s->type == MAINLOOP_SOURCE_TYPE_SIGNAL)
sigaction(s->signal.sig, &s->signal.sigaction, NULL);
}
void mainloop_source_enable(struct mainloop_source*s, int b) {
assert(s && !s->dead);
if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) {
assert(s->mainloop);
s->mainloop->rebuild_pollfds = 1;
}
static void mainloop_enable_fixed(struct pa_mainloop_api*a, void* id, int b) {
struct pa_mainloop *m;
struct mainloop_source_fixed_or_idle *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api);
s->enabled = b;
}
void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) {
assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO);
static void mainloop_cancel_fixed(struct pa_mainloop_api*a, void* id) {
struct pa_mainloop *m;
struct mainloop_source_fixed_or_idle *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api);
if (s->io.events != events) {
assert(s->mainloop);
s->mainloop->rebuild_pollfds = 1;
}
s->io.events = events;
s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0);
s->header.dead = 1;
m->fixed_sources_scan_dead = 1;
}
struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) {
/* Idle sources */
static void* mainloop_source_idle(struct pa_mainloop_api*a, void (*callback) (struct pa_mainloop_api*a, void *id, void *userdata), void *userdata) {
struct pa_mainloop *m;
struct mainloop_source_fixed_or_idle *s;
assert(a && a->userdata && callback);
m = a->userdata;
assert(a == &m->api);
s = malloc(sizeof(struct mainloop_source_fixed_or_idle));
assert(s);
s->header.mainloop = m;
s->header.dead = 0;
return s->mainloop;
s->enabled = 1;
s->callback = callback;
s->userdata = userdata;
idxset_put(m->idle_sources, s, NULL);
return s;
}
struct once_info {
void (*callback)(void *userdata);
void *userdata;
};
static void mainloop_cancel_idle(struct pa_mainloop_api*a, void* id) {
struct pa_mainloop *m;
struct mainloop_source_fixed_or_idle *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api);
static void once_callback(struct mainloop_source *s, void *userdata) {
struct once_info *i = userdata;
assert(s && i && i->callback);
i->callback(i->userdata);
mainloop_source_free(s);
free(i);
s->header.dead = 1;
m->idle_sources_scan_dead = 1;
}
void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata) {
struct once_info *i;
assert(m && callback);
/* Time sources */
static void* mainloop_source_time(struct pa_mainloop_api*a, const struct timeval *tv, void (*callback) (struct pa_mainloop_api*a, void *id, const struct timeval *tv, void *userdata), void *userdata) {
struct pa_mainloop *m;
struct mainloop_source_time *s;
assert(a && a->userdata && callback);
m = a->userdata;
assert(a == &m->api);
i = malloc(sizeof(struct once_info));
assert(i);
i->callback = callback;
i->userdata = userdata;
s = malloc(sizeof(struct mainloop_source_time));
assert(s);
s->header.mainloop = m;
s->header.dead = 0;
s->enabled = !!tv;
if (tv)
s->timeval = *tv;
s->callback = callback;
s->userdata = userdata;
idxset_put(m->time_sources, s, NULL);
return s;
}
static void mainloop_enable_time(struct pa_mainloop_api*a, void *id, const struct timeval *tv) {
struct pa_mainloop *m;
struct mainloop_source_time *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api);
if (tv) {
s->enabled = 1;
s->timeval = *tv;
} else
s->enabled = 0;
}
static void mainloop_cancel_time(struct pa_mainloop_api*a, void* id) {
struct pa_mainloop *m;
struct mainloop_source_time *s = id;
assert(a && a->userdata && s && !s->header.dead);
m = a->userdata;
assert(a == &m->api);
s->header.dead = 1;
m->time_sources_scan_dead = 1;
}
static void mainloop_quit(struct pa_mainloop_api*a, int retval) {
struct pa_mainloop *m;
assert(a && a->userdata);
m = a->userdata;
assert(a == &m->api);
m->quit = 1;
m->retval = retval;
}
mainloop_source_new_fixed(m, once_callback, i);
static void setup_api(struct pa_mainloop *m) {
assert(m);
m->api.userdata = m;
m->api.source_io = mainloop_source_io;
m->api.enable_io = mainloop_enable_io;
m->api.cancel_io = mainloop_cancel_io;
m->api.source_fixed = mainloop_source_fixed;
m->api.enable_fixed = mainloop_enable_fixed;
m->api.cancel_fixed = mainloop_cancel_fixed;
m->api.source_idle = mainloop_source_idle;
m->api.enable_idle = mainloop_enable_fixed; /* (!) */
m->api.cancel_idle = mainloop_cancel_idle;
m->api.source_time = mainloop_source_time;
m->api.enable_time = mainloop_enable_time;
m->api.cancel_time = mainloop_cancel_time;
m->api.quit = mainloop_quit;
}
struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m) {
assert(m);
return &m->api;
}

View file

@ -1,42 +1,16 @@
#ifndef foomainloophfoo
#define foomainloophfoo
struct mainloop;
struct mainloop_source;
#include "mainloop-api.h"
enum mainloop_io_event {
MAINLOOP_IO_EVENT_NULL = 0,
MAINLOOP_IO_EVENT_IN = 1,
MAINLOOP_IO_EVENT_OUT = 2,
MAINLOOP_IO_EVENT_BOTH = 3
};
struct pa_mainloop;
enum mainloop_source_type {
MAINLOOP_SOURCE_TYPE_IO,
MAINLOOP_SOURCE_TYPE_FIXED,
MAINLOOP_SOURCE_TYPE_IDLE,
MAINLOOP_SOURCE_TYPE_SIGNAL
};
struct pa_mainloop *pa_mainloop_new(void);
void pa_mainloop_free(struct pa_mainloop* m);
struct mainloop *mainloop_new(void);
void mainloop_free(struct mainloop* m);
int pa_mainloop_iterate(struct pa_mainloop *m, int block, int *retval);
int pa_mainloop_run(struct pa_mainloop *m, int *retval);
int mainloop_iterate(struct mainloop *m, int block);
int mainloop_run(struct mainloop *m);
void mainloop_quit(struct mainloop *m, int r);
struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata);
struct mainloop_source* mainloop_source_new_fixed(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
struct mainloop_source* mainloop_source_new_signal(struct mainloop*m, int sig, void (*callback)(struct mainloop_source *s, int sig, void*userdata), void*userdata);
void mainloop_once(struct mainloop*m, void (*callback)(void *userdata), void *userdata);
void mainloop_source_free(struct mainloop_source*s);
void mainloop_source_enable(struct mainloop_source*s, int b);
void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event);
struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s);
struct pa_mainloop_api* pa_mainloop_get_api(struct pa_mainloop*m);
#endif

View file

@ -220,3 +220,12 @@ uint32_t memblockq_get_length(struct memblockq *bq) {
assert(bq);
return bq->total_length;
}
uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen) {
assert(bq && qlen);
if (bq->total_length >= qlen)
return 0;
return qlen - bq->total_length;
}

View file

@ -25,4 +25,6 @@ int memblockq_is_writable(struct memblockq *bq, size_t length);
uint32_t memblockq_get_delay(struct memblockq *bq);
uint32_t memblockq_get_length(struct memblockq *bq);
uint32_t memblockq_missing_to(struct memblockq *bq, size_t qlen);
#endif

View file

@ -16,15 +16,15 @@
#include "source.h"
#include "module.h"
#include "oss.h"
#include "sample-util.h"
struct userdata {
struct sink *sink;
struct source *source;
struct core *core;
struct sample_spec sample_spec;
struct pa_sample_spec sample_spec;
size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, sample_size, out_fill;
uint32_t sample_usec;
size_t in_fragment_size, out_fragment_size, in_fragments, out_fragments, out_fill;
int fd;
@ -161,12 +161,14 @@ static void do_read(struct userdata *u) {
in_clear_memblocks(u, u->in_fragments/2);
};
static void io_callback(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata) {
static void io_callback(struct pa_mainloop_api *m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct userdata *u = userdata;
if (event & MAINLOOP_IO_EVENT_IN)
assert (u && u->core->mainloop == m && u->mainloop_source == id);
if (events & PA_MAINLOOP_API_IO_EVENT_INPUT)
do_read(u);
if (event & MAINLOOP_IO_EVENT_OUT)
if (events & PA_MAINLOOP_API_IO_EVENT_OUTPUT)
do_write(u);
}
@ -175,7 +177,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {
assert(s && u);
do_write(u);
return u->out_fill/u->sample_size*u->sample_usec;
return pa_samples_usec(u->out_fill, &s->sample_spec);
}
int module_init(struct core *c, struct module*m) {
@ -316,10 +318,7 @@ int module_init(struct core *c, struct module*m) {
assert(u->source || u->sink);
u->sample_size = sample_size(&u->sample_spec);
u->sample_usec = 1000000/u->sample_spec.rate;
u->mainloop_source = mainloop_source_new_io(c->mainloop, u->fd, (u->source ? MAINLOOP_IO_EVENT_IN : 0) | (u->sink ? MAINLOOP_IO_EVENT_OUT : 0), io_callback, u);
u->mainloop_source = c->mainloop->source_io(c->mainloop, u->fd, (u->source ? PA_MAINLOOP_API_IO_EVENT_INPUT : 0) | (u->sink ? PA_MAINLOOP_API_IO_EVENT_OUTPUT : 0), io_callback, u);
assert(u->mainloop_source);
return 0;
@ -360,7 +359,7 @@ void module_done(struct core *c, struct module*m) {
source_free(u->source);
if (u->mainloop_source)
mainloop_source_free(u->mainloop_source);
u->core->mainloop->cancel_io(u->core->mainloop, u->mainloop_source);
if (u->fd >= 0)
close(u->fd);

View file

@ -15,7 +15,7 @@
#include "source.h"
#include "module.h"
#include "oss.h"
#include "sample.h"
#include "sample-util.h"
struct userdata {
struct sink *sink;
@ -81,7 +81,7 @@ static void do_read(struct userdata *u) {
return;
}
assert(r <= memchunk.memblock->length);
assert(r <= (ssize_t) memchunk.memblock->length);
memchunk.length = memchunk.memblock->length = r;
memchunk.index = 0;
@ -107,7 +107,7 @@ static uint32_t sink_get_latency_cb(struct sink *s) {
return 0;
}
return samples_usec(arg, &s->sample_spec);
return pa_samples_usec(arg, &s->sample_spec);
}
int module_init(struct core *c, struct module*m) {
@ -117,7 +117,7 @@ int module_init(struct core *c, struct module*m) {
int fd = -1;
int frag_size, in_frag_size, out_frag_size;
int mode;
struct sample_spec ss;
struct pa_sample_spec ss;
assert(c && m);
p = m->argument ? m->argument : "/dev/dsp";
@ -203,7 +203,7 @@ int module_init(struct core *c, struct module*m) {
u->memchunk.memblock = NULL;
u->memchunk.length = 0;
u->sample_size = sample_size(&ss);
u->sample_size = pa_sample_size(&ss);
u->out_fragment_size = out_frag_size;
u->in_fragment_size = in_frag_size;

View file

@ -18,7 +18,8 @@ struct userdata {
struct sink *sink;
struct iochannel *io;
struct core *core;
struct mainloop_source *mainloop_source;
void *mainloop_source;
struct pa_mainloop_api *mainloop;
struct memchunk memchunk;
};
@ -27,7 +28,7 @@ static void do_write(struct userdata *u) {
ssize_t r;
assert(u);
mainloop_source_enable(u->mainloop_source, 0);
u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);
if (!iochannel_is_writable(u->io))
return;
@ -57,10 +58,10 @@ static void notify_cb(struct sink*s) {
assert(s && u);
if (iochannel_is_writable(u->io))
mainloop_source_enable(u->mainloop_source, 1);
u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 1);
}
static void prepare_callback(struct mainloop_source *src, void *userdata) {
static void fixed_callback(struct pa_mainloop_api *m, void *id, void *userdata) {
struct userdata *u = userdata;
assert(u);
do_write(u);
@ -77,7 +78,7 @@ int module_init(struct core *c, struct module*m) {
struct stat st;
char *p;
int fd = -1;
static const struct sample_spec ss = {
static const struct pa_sample_spec ss = {
.format = SAMPLE_S16NE,
.rate = 44100,
.channels = 2,
@ -120,10 +121,11 @@ int module_init(struct core *c, struct module*m) {
u->memchunk.memblock = NULL;
u->memchunk.length = 0;
u->mainloop_source = mainloop_source_new_fixed(c->mainloop, prepare_callback, u);
u->mainloop = c->mainloop;
u->mainloop_source = u->mainloop->source_fixed(u->mainloop, fixed_callback, u);
assert(u->mainloop_source);
mainloop_source_enable(u->mainloop_source, 0);
u->mainloop->enable_fixed(u->mainloop, u->mainloop_source, 0);
m->userdata = u;
return 0;
@ -147,7 +149,7 @@ void module_done(struct core *c, struct module*m) {
sink_free(u->sink);
iochannel_free(u->io);
mainloop_source_free(u->mainloop_source);
u->mainloop->cancel_fixed(u->mainloop, u->mainloop_source);
assert(u->filename);
unlink(u->filename);

View file

@ -151,5 +151,5 @@ void module_unload_request(struct core *c, struct module *m) {
assert(i);
i->core = c;
i->index = m->index;
mainloop_once(c->mainloop, module_unload_once_callback, i);
pa_mainloop_api_once(c->mainloop, module_unload_once_callback, i);
}

View file

@ -7,7 +7,7 @@
#include "oss.h"
int oss_auto_format(int fd, struct sample_spec *ss) {
int oss_auto_format(int fd, struct pa_sample_spec *ss) {
int format, channels, speed;
assert(fd >= 0 && ss);

View file

@ -3,6 +3,6 @@
#include "sample.h"
int oss_auto_format(int fd, struct sample_spec *ss);
int oss_auto_format(int fd, struct pa_sample_spec *ss);
#endif

169
src/pacat.c Normal file
View file

@ -0,0 +1,169 @@
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include "polyp.h"
#include "mainloop.h"
static struct pa_context *context = NULL;
static struct pa_stream *stream = NULL;
static struct pa_mainloop_api *mainloop_api = NULL;
static void *buffer = NULL;
static size_t buffer_length = 0, buffer_index = 0;
static void* stdin_source = NULL;
static void context_die_callback(struct pa_context *c, void *userdata) {
assert(c);
fprintf(stderr, "Connection to server shut down, exiting.\n");
mainloop_api->quit(mainloop_api, 1);
}
static void stream_die_callback(struct pa_stream *s, void *userdata) {
assert(s);
fprintf(stderr, "Stream deleted, exiting.\n");
mainloop_api->quit(mainloop_api, 1);
}
static void stream_write_callback(struct pa_stream *s, size_t length, void *userdata) {
size_t l;
assert(s && length);
mainloop_api->enable_io(mainloop_api, stdin_source, PA_STREAM_PLAYBACK);
if (!buffer)
return;
assert(buffer_length);
l = length;
if (l > buffer_length)
l = buffer_length;
pa_stream_write(s, buffer+buffer_index, l);
buffer_length -= l;
buffer_index += l;
if (!buffer_length) {
free(buffer);
buffer = NULL;
buffer_index = buffer_length = 0;
}
}
static void stream_complete_callback(struct pa_context*c, struct pa_stream *s, void *userdata) {
assert(c);
if (!s) {
fprintf(stderr, "Stream creation failed.\n");
mainloop_api->quit(mainloop_api, 1);
}
stream = s;
pa_stream_set_die_callback(stream, stream_die_callback, NULL);
pa_stream_set_write_callback(stream, stream_write_callback, NULL);
}
static void context_complete_callback(struct pa_context *c, int success, void *userdata) {
static const struct pa_sample_spec ss = {
.format = SAMPLE_S16NE,
.rate = 44100,
.channels = 2
};
assert(c && !stream);
if (!success) {
fprintf(stderr, "Connection failed\n");
goto fail;
}
if (pa_stream_new(c, PA_STREAM_PLAYBACK, NULL, "pacat", &ss, NULL, stream_complete_callback, NULL) < 0) {
fprintf(stderr, "pa_stream_new() failed.\n");
goto fail;
}
return;
fail:
mainloop_api->quit(mainloop_api, 1);
}
static void stdin_callback(struct pa_mainloop_api*a, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
size_t l;
ssize_t r;
assert(a == mainloop_api && id && fd == STDIN_FILENO && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
if (buffer) {
mainloop_api->enable_io(mainloop_api, stdin_source, PA_MAINLOOP_API_IO_EVENT_NULL);
return;
}
if (!(l = pa_stream_writable_size(stream)))
l = 4096;
buffer = malloc(l);
assert(buffer);
if ((r = read(fd, buffer, l)) <= 0) {
if (r == 0)
mainloop_api->quit(mainloop_api, 0);
else {
fprintf(stderr, "read() failed: %s\n", strerror(errno));
mainloop_api->quit(mainloop_api, 1);
}
return;
}
buffer_length = r;
buffer_index = 0;
}
int main(int argc, char *argv[]) {
struct pa_mainloop* m;
int ret = 1;
if (!(m = pa_mainloop_new())) {
fprintf(stderr, "pa_mainloop_new() failed.\n");
goto quit;
}
mainloop_api = pa_mainloop_get_api(m);
if (!(stdin_source = mainloop_api->source_io(mainloop_api, STDIN_FILENO, PA_MAINLOOP_API_IO_EVENT_INPUT, stdin_callback, NULL))) {
fprintf(stderr, "source_io() failed.\n");
goto quit;
}
if (!(context = pa_context_new(mainloop_api, argv[0]))) {
fprintf(stderr, "pa_context_new() failed.\n");
goto quit;
}
if (pa_context_connect(context, NULL, context_complete_callback, NULL) < 0) {
fprintf(stderr, "pa_context_connext() failed.\n");
goto quit;
}
pa_context_set_die_callback(context, context_die_callback, NULL);
if (pa_mainloop_run(m, &ret) < 0) {
fprintf(stderr, "pa_mainloop_run() failed.\n");
goto quit;
}
quit:
if (stream)
pa_stream_free(stream);
if (context)
pa_context_free(context);
if (m)
pa_mainloop_free(m);
if (buffer)
free(buffer);
return ret;
}

View file

@ -16,7 +16,7 @@ struct packet* packet_new(size_t length) {
return p;
}
struct packet* packet_dynamic(uint8_t* data, size_t length) {
struct packet* packet_new_dynamic(uint8_t* data, size_t length) {
struct packet *p;
assert(data && length);
p = malloc(sizeof(struct packet));
@ -26,6 +26,7 @@ struct packet* packet_dynamic(uint8_t* data, size_t length) {
p->length = length;
p->data = data;
p->type = PACKET_DYNAMIC;
return p;
}
struct packet* packet_ref(struct packet *p) {

149
src/pdispatch.c Normal file
View file

@ -0,0 +1,149 @@
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include "pdispatch.h"
#include "protocol-native-spec.h"
struct reply_info {
struct pdispatch *pdispatch;
struct reply_info *next, *previous;
int (*callback)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
void *userdata;
uint32_t tag;
void *mainloop_timeout;
};
struct pdispatch {
struct pa_mainloop_api *mainloop;
const struct pdispatch_command *command_table;
unsigned n_commands;
struct reply_info *replies;
};
static void reply_info_free(struct reply_info *r) {
assert(r && r->pdispatch && r->pdispatch->mainloop);
r->pdispatch->mainloop->cancel_time(r->pdispatch->mainloop, r->mainloop_timeout);
if (r->previous)
r->previous->next = r->next;
else
r->pdispatch->replies = r->next;
if (r->next)
r->next->previous = r->previous;
free(r);
}
struct pdispatch* pdispatch_new(struct pa_mainloop_api *mainloop, const struct pdispatch_command*table, unsigned entries) {
struct pdispatch *pd;
assert(mainloop);
assert((entries && table) || (!entries && !table));
pd = malloc(sizeof(struct pdispatch));
assert(pd);
pd->mainloop = mainloop;
pd->command_table = table;
pd->n_commands = entries;
return pd;
}
void pdispatch_free(struct pdispatch *pd) {
assert(pd);
while (pd->replies)
reply_info_free(pd->replies);
free(pd);
}
int pdispatch_run(struct pdispatch *pd, struct packet*packet, void *userdata) {
uint32_t tag, command;
assert(pd && packet);
struct tagstruct *ts = NULL;
assert(pd && packet && packet->data);
if (packet->length <= 8)
goto fail;
ts = tagstruct_new(packet->data, packet->length);
assert(ts);
if (tagstruct_getu32(ts, &command) < 0 ||
tagstruct_getu32(ts, &tag) < 0)
goto fail;
if (command == PA_COMMAND_ERROR || command == PA_COMMAND_REPLY) {
struct reply_info *r;
int done = 0;
for (r = pd->replies; r; r = r->next) {
if (r->tag == tag) {
int ret = r->callback(r->pdispatch, command, tag, ts, r->userdata);
reply_info_free(r);
if (ret < 0)
goto fail;
done = 1;
break;
}
}
if (!done)
goto fail;
} else if (pd->command_table && command < pd->n_commands) {
const struct pdispatch_command *c = pd->command_table+command;
if (!c->proc)
goto fail;
if (c->proc(pd, command, tag, ts, userdata) < 0)
goto fail;
} else
goto fail;
tagstruct_free(ts);
return 0;
fail:
if (ts)
tagstruct_free(ts);
fprintf(stderr, "protocol-native: invalid packet.\n");
return -1;
}
static void timeout_callback(struct pa_mainloop_api*m, void *id, const struct timeval *tv, void *userdata) {
struct reply_info*r = userdata;
assert (r && r->mainloop_timeout == id && r->pdispatch && r->pdispatch->mainloop == m && r->callback);
r->callback(r->pdispatch, PA_COMMAND_TIMEOUT, r->tag, NULL, r->userdata);
reply_info_free(r);
}
void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata) {
struct reply_info *r;
struct timeval tv;
assert(pd && cb);
r = malloc(sizeof(struct reply_info));
assert(r);
r->pdispatch = pd;
r->callback = cb;
r->userdata = userdata;
r->tag = tag;
gettimeofday(&tv, NULL);
tv.tv_sec += timeout;
r->mainloop_timeout = pd->mainloop->source_time(pd->mainloop, &tv, timeout_callback, r);
assert(r->mainloop_timeout);
r->previous = NULL;
r->next = pd->replies;
if (r->next)
r->next->previous = r;
pd->replies = r;
}

22
src/pdispatch.h Normal file
View file

@ -0,0 +1,22 @@
#ifndef foopdispatchhfoo
#define foopdispatchhfoo
#include <inttypes.h>
#include "tagstruct.h"
#include "packet.h"
#include "mainloop-api.h"
struct pdispatch;
struct pdispatch_command {
int (*proc)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
};
struct pdispatch* pdispatch_new(struct pa_mainloop_api *m, const struct pdispatch_command*table, unsigned entries);
void pdispatch_free(struct pdispatch *pd);
int pdispatch_run(struct pdispatch *pd, struct packet*p, void *userdata);
void pdispatch_register_reply(struct pdispatch *pd, uint32_t tag, int timeout, int (*cb)(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata), void *userdata);
#endif

451
src/polyp.c Normal file
View file

@ -0,0 +1,451 @@
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include "polyp.h"
#include "protocol-native-spec.h"
#include "pdispatch.h"
#include "pstream.h"
#include "dynarray.h"
#include "socket-client.h"
#include "pstream-util.h"
#define DEFAULT_QUEUE_LENGTH 10240
#define DEFAULT_MAX_LENGTH 20480
#define DEFAULT_PREBUF 4096
#define DEFAULT_TIMEOUT 5
struct pa_context {
char *name;
struct pa_mainloop_api* mainloop;
struct socket_client *client;
struct pstream *pstream;
struct pdispatch *pdispatch;
struct dynarray *streams;
struct pa_stream *first_stream;
uint32_t ctag;
uint32_t errno;
enum { CONTEXT_UNCONNECTED, CONTEXT_CONNECTING, CONTEXT_READY, CONTEXT_DEAD} state;
void (*connect_complete_callback)(struct pa_context*c, int success, void *userdata);
void *connect_complete_userdata;
void (*die_callback)(struct pa_context*c, void *userdata);
void *die_userdata;
};
struct pa_stream {
struct pa_context *context;
struct pa_stream *next, *previous;
uint32_t channel;
int channel_valid;
enum pa_stream_direction direction;
enum { STREAM_CREATING, STREAM_READY, STREAM_DEAD} state;
uint32_t requested_bytes;
void (*read_callback)(struct pa_stream *p, const void*data, size_t length, void *userdata);
void *read_userdata;
void (*write_callback)(struct pa_stream *p, size_t length, void *userdata);
void *write_userdata;
void (*create_complete_callback)(struct pa_context*c, struct pa_stream *s, void *userdata);
void *create_complete_userdata;
void (*die_callback)(struct pa_stream*c, void *userdata);
void *die_userdata;
};
static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = { NULL },
[PA_COMMAND_REPLY] = { NULL },
[PA_COMMAND_CREATE_PLAYBACK_STREAM] = { NULL },
[PA_COMMAND_DELETE_PLAYBACK_STREAM] = { NULL },
[PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
[PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
[PA_COMMAND_EXIT] = { NULL },
[PA_COMMAND_REQUEST] = { command_request },
};
struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name) {
assert(mainloop && name);
struct pa_context *c;
c = malloc(sizeof(struct pa_context));
assert(c);
c->name = strdup(name);
c->mainloop = mainloop;
c->client = NULL;
c->pstream = NULL;
c->pdispatch = NULL;
c->streams = dynarray_new();
assert(c->streams);
c->first_stream = NULL;
c->errno = PA_ERROR_OK;
c->state = CONTEXT_UNCONNECTED;
c->ctag = 0;
c->connect_complete_callback = NULL;
c->connect_complete_userdata = NULL;
c->die_callback = NULL;
c->die_userdata = NULL;
return c;
}
void pa_context_free(struct pa_context *c) {
assert(c);
while (c->first_stream)
pa_stream_free(c->first_stream);
if (c->client)
socket_client_free(c->client);
if (c->pdispatch)
pdispatch_free(c->pdispatch);
if (c->pstream)
pstream_free(c->pstream);
if (c->streams)
dynarray_free(c->streams, NULL, NULL);
free(c->name);
free(c);
}
static void stream_dead(struct pa_stream *s) {
if (s->state == STREAM_DEAD)
return;
s->state = STREAM_DEAD;
if (s->die_callback)
s->die_callback(s, s->die_userdata);
}
static void context_dead(struct pa_context *c) {
struct pa_stream *s;
assert(c);
for (s = c->first_stream; s; s = s->next)
stream_dead(s);
if (c->state == CONTEXT_DEAD)
return;
c->state = CONTEXT_DEAD;
if (c->die_callback)
c->die_callback(c, c->die_userdata);
}
static void pstream_die_callback(struct pstream *p, void *userdata) {
struct pa_context *c = userdata;
assert(p && c);
assert(c->state != CONTEXT_DEAD);
c->state = CONTEXT_DEAD;
context_dead(c);
}
static int pstream_packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
struct pa_context *c = userdata;
assert(p && packet && c);
if (pdispatch_run(c->pdispatch, packet, c) < 0) {
fprintf(stderr, "polyp.c: invalid packet.\n");
return -1;
}
return 0;
}
static int pstream_memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
struct pa_context *c = userdata;
struct pa_stream *s;
assert(p && chunk && c && chunk->memblock && chunk->memblock->data);
if (!(s = dynarray_get(c->streams, channel)))
return -1;
if (s->read_callback)
s->read_callback(s, chunk->memblock->data + chunk->index, chunk->length, s->read_userdata);
return 0;
}
static void on_connection(struct socket_client *client, struct iochannel*io, void *userdata) {
struct pa_context *c = userdata;
assert(client && io && c && c->state == CONTEXT_CONNECTING);
socket_client_free(client);
c->client = NULL;
if (!io) {
c->errno = PA_ERROR_CONNECTIONREFUSED;
c->state = CONTEXT_UNCONNECTED;
if (c->connect_complete_callback)
c->connect_complete_callback(c, 0, c->connect_complete_userdata);
return;
}
c->pstream = pstream_new(c->mainloop, io);
assert(c->pstream);
pstream_set_die_callback(c->pstream, pstream_die_callback, c);
pstream_set_recieve_packet_callback(c->pstream, pstream_packet_callback, c);
pstream_set_recieve_memblock_callback(c->pstream, pstream_memblock_callback, c);
c->pdispatch = pdispatch_new(c->mainloop, command_table, PA_COMMAND_MAX);
assert(c->pdispatch);
c->state = CONTEXT_READY;
if (c->connect_complete_callback)
c->connect_complete_callback(c, 1, c->connect_complete_userdata);
}
int pa_context_connect(struct pa_context *c, const char *server, void (*complete) (struct pa_context*c, int success, void *userdata), void *userdata) {
assert(c && c->state == CONTEXT_UNCONNECTED);
assert(!c->client);
if (!(c->client = socket_client_new_unix(c->mainloop, server))) {
c->errno = PA_ERROR_CONNECTIONREFUSED;
return -1;
}
c->connect_complete_callback = complete;
c->connect_complete_userdata = userdata;
socket_client_set_callback(c->client, on_connection, c);
c->state = CONTEXT_CONNECTING;
return 0;
}
int pa_context_is_dead(struct pa_context *c) {
assert(c);
return c->state == CONTEXT_DEAD;
}
int pa_context_is_ready(struct pa_context *c) {
assert(c);
return c->state == CONTEXT_READY;
}
int pa_context_errno(struct pa_context *c) {
assert(c);
return c->errno;
}
void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata) {
assert(c);
c->die_callback = cb;
c->die_userdata = userdata;
}
static int command_request(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
struct pa_stream *s;
struct pa_context *c = userdata;
uint32_t bytes, channel;
assert(pd && command == PA_COMMAND_REQUEST && t && s);
if (tagstruct_getu32(t, &channel) < 0 ||
tagstruct_getu32(t, &bytes) < 0 ||
tagstruct_eof(t)) {
c->errno = PA_ERROR_PROTOCOL;
return -1;
}
if (!(s = dynarray_get(c->streams, channel))) {
c->errno = PA_ERROR_PROTOCOL;
return -1;
}
s->requested_bytes += bytes;
if (s->requested_bytes && s->write_callback)
s->write_callback(s, s->requested_bytes, s->write_userdata);
return 0;
}
static int create_playback_callback(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
int ret = 0;
struct pa_stream *s = userdata;
assert(pd && s && s->state == STREAM_CREATING);
if (command != PA_COMMAND_REPLY) {
struct pa_context *c = s->context;
assert(c);
if (command == PA_COMMAND_ERROR && tagstruct_getu32(t, &s->context->errno) < 0) {
s->context->errno = PA_ERROR_PROTOCOL;
ret = -1;
} else if (command == PA_COMMAND_TIMEOUT) {
s->context->errno = PA_ERROR_TIMEOUT;
ret = -1;
}
goto fail;
}
if (tagstruct_getu32(t, &s->channel) < 0 ||
tagstruct_eof(t)) {
s->context->errno = PA_ERROR_PROTOCOL;
ret = -1;
goto fail;
}
s->channel_valid = 1;
dynarray_put(s->context->streams, s->channel, s);
s->state = STREAM_READY;
assert(s->create_complete_callback);
s->create_complete_callback(s->context, s, s->create_complete_userdata);
return 0;
fail:
assert(s->create_complete_callback);
s->create_complete_callback(s->context, NULL, s->create_complete_userdata);
pa_stream_free(s);
return ret;
}
int pa_stream_new(
struct pa_context *c,
enum pa_stream_direction dir,
const char *dev,
const char *name,
const struct pa_sample_spec *ss,
const struct pa_buffer_attr *attr,
void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
void *userdata) {
struct pa_stream *s;
struct tagstruct *t;
uint32_t tag;
assert(c && name && ss && c->state == CONTEXT_READY && complete);
s = malloc(sizeof(struct pa_stream));
assert(s);
s->context = c;
s->read_callback = NULL;
s->read_userdata = NULL;
s->write_callback = NULL;
s->write_userdata = NULL;
s->die_callback = NULL;
s->die_userdata = NULL;
s->create_complete_callback = complete;
s->create_complete_userdata = NULL;
s->state = STREAM_CREATING;
s->requested_bytes = 0;
s->channel = 0;
s->channel_valid = 0;
s->direction = dir;
t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, dir == PA_STREAM_PLAYBACK ? PA_COMMAND_CREATE_PLAYBACK_STREAM : PA_COMMAND_CREATE_RECORD_STREAM);
tagstruct_putu32(t, tag = c->ctag++);
tagstruct_puts(t, name);
tagstruct_put_sample_spec(t, ss);
tagstruct_putu32(t, (uint32_t) -1);
tagstruct_putu32(t, attr ? attr->queue_length : DEFAULT_QUEUE_LENGTH);
tagstruct_putu32(t, attr ? attr->max_length : DEFAULT_MAX_LENGTH);
tagstruct_putu32(t, attr ? attr->prebuf : DEFAULT_PREBUF);
pstream_send_tagstruct(c->pstream, t);
pdispatch_register_reply(c->pdispatch, tag, DEFAULT_TIMEOUT, create_playback_callback, s);
s->next = c->first_stream;
if (s->next)
s->next->previous = s;
s->previous = NULL;
c->first_stream = s;
return 0;
}
void pa_stream_free(struct pa_stream *s) {
assert(s && s->context);
if (s->channel_valid) {
struct tagstruct *t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, PA_COMMAND_DELETE_PLAYBACK_STREAM);
tagstruct_putu32(t, s->context->ctag++);
tagstruct_putu32(t, s->channel);
pstream_send_tagstruct(s->context->pstream, t);
}
if (s->channel_valid)
dynarray_put(s->context->streams, s->channel, NULL);
if (s->next)
s->next->previous = s->previous;
if (s->previous)
s->previous->next = s->next;
else
s->context->first_stream = s->next;
free(s);
}
void pa_stream_set_write_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata) {
assert(s && cb);
s->write_callback = cb;
s->write_userdata = userdata;
}
void pa_stream_write(struct pa_stream *s, const void *data, size_t length) {
struct memchunk chunk;
assert(s && s->context && data && length);
chunk.memblock = memblock_new(length);
assert(chunk.memblock && chunk.memblock->data);
memcpy(chunk.memblock->data, data, length);
chunk.index = 0;
chunk.length = length;
pstream_send_memblock(s->context->pstream, s->channel, 0, &chunk);
if (length < s->requested_bytes)
s->requested_bytes -= length;
else
s->requested_bytes = 0;
}
size_t pa_stream_writable_size(struct pa_stream *s) {
assert(s);
return s->requested_bytes;
}
void pa_stream_set_read_callback(struct pa_stream *s, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata) {
assert(s && cb);
s->read_callback = cb;
s->read_userdata = userdata;
}
int pa_stream_is_dead(struct pa_stream *s) {
return s->state == STREAM_DEAD;
}
int pa_stream_is_ready(struct pa_stream*s) {
return s->state == STREAM_READY;
}
void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata) {
assert(s);
s->die_callback = cb;
s->die_userdata = userdata;
}

53
src/polyp.h Normal file
View file

@ -0,0 +1,53 @@
#ifndef foopolyphfoo
#define foopolyphfoo
#include <sys/types.h>
#include "sample.h"
#include "polypdef.h"
#include "mainloop-api.h"
struct pa_context;
struct pa_context *pa_context_new(struct pa_mainloop_api *mainloop, const char *name);
int pa_context_connect(
struct pa_context *c,
const char *server,
void (*complete) (struct pa_context*c, int success, void *userdata),
void *userdata);
void pa_context_free(struct pa_context *c);
void pa_context_set_die_callback(struct pa_context *c, void (*cb)(struct pa_context *c, void *userdata), void *userdata);
int pa_context_is_dead(struct pa_context *c);
int pa_context_is_ready(struct pa_context *c);
int pa_contect_errno(struct pa_context *c);
struct pa_stream;
int pa_stream_new(
struct pa_context *c,
enum pa_stream_direction dir,
const char *dev,
const char *name,
const struct pa_sample_spec *ss,
const struct pa_buffer_attr *attr,
void (*complete) (struct pa_context*c, struct pa_stream *s, void *userdata),
void *userdata);
void pa_stream_free(struct pa_stream *p);
void pa_stream_set_die_callback(struct pa_stream *s, void (*cb)(struct pa_stream *s, void *userdata), void *userdata);
void pa_stream_set_write_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, size_t length, void *userdata), void *userdata);
void pa_stream_write(struct pa_stream *p, const void *data, size_t length);
size_t pa_stream_writable_size(struct pa_stream *p);
void pa_stream_set_read_callback(struct pa_stream *p, void (*cb)(struct pa_stream *p, const void*data, size_t length, void *userdata), void *userdata);
int pa_stream_is_dead(struct pa_stream *p);
int pa_stream_is_ready(struct pa_stream*p);
#endif

18
src/polypdef.h Normal file
View file

@ -0,0 +1,18 @@
#ifndef foopolypdefhfoo
#define foopolypdefhfoo
#include <inttypes.h>
enum pa_stream_direction {
PA_STREAM_PLAYBACK,
PA_STREAM_RECORD
};
struct pa_buffer_attr {
uint32_t queue_length;
uint32_t max_length;
uint32_t prebuf;
};
#endif

View file

@ -12,8 +12,7 @@ struct protocol_cli {
static void cli_eof_cb(struct cli*c, void*userdata) {
struct protocol_cli *p = userdata;
assert(c && p);
assert(p);
idxset_remove_by_data(p->connections, c, NULL);
cli_free(c);
}
@ -22,7 +21,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
struct protocol_cli *p = userdata;
struct cli *c;
assert(s && io && p);
c = cli_new(p->core, io);
assert(c);
cli_set_eof_callback(c, cli_eof_cb, p);

View file

@ -0,0 +1,29 @@
#ifndef fooprotocolnativespech
#define fooprotocolnativespech
enum {
PA_COMMAND_ERROR,
PA_COMMAND_TIMEOUT, /* pseudo command */
PA_COMMAND_REPLY,
PA_COMMAND_CREATE_PLAYBACK_STREAM,
PA_COMMAND_DELETE_PLAYBACK_STREAM,
PA_COMMAND_CREATE_RECORD_STREAM,
PA_COMMAND_DELETE_RECORD_STREAM,
PA_COMMAND_EXIT,
PA_COMMAND_REQUEST,
PA_COMMAND_MAX
};
enum {
PA_ERROR_OK,
PA_ERROR_ACCESS,
PA_ERROR_COMMAND,
PA_ERROR_INVALID,
PA_ERROR_EXIST,
PA_ERROR_NOENTITY,
PA_ERROR_CONNECTIONREFUSED,
PA_ERROR_PROTOCOL,
PA_ERROR_TIMEOUT
};
#endif

View file

@ -3,34 +3,19 @@
#include <stdlib.h>
#include "protocol-native.h"
#include "protocol-native-spec.h"
#include "packet.h"
#include "client.h"
#include "sourceoutput.h"
#include "sinkinput.h"
#include "pstream.h"
#include "tagstruct.h"
#include "pdispatch.h"
#include "pstream-util.h"
struct connection;
struct protocol_native;
enum {
COMMAND_ERROR,
COMMAND_REPLY,
COMMAND_CREATE_PLAYBACK_STREAM,
COMMAND_DELETE_PLAYBACK_STREAM,
COMMAND_CREATE_RECORD_STREAM,
COMMAND_DELETE_RECORD_STREAM,
COMMAND_EXIT,
COMMAND_MAX
};
enum {
ERROR_ACCESS,
ERROR_COMMAND,
ERROR_ARGUMENT,
ERROR_EXIST
};
struct record_stream {
struct connection *connection;
uint32_t index;
@ -41,6 +26,7 @@ struct record_stream {
struct playback_stream {
struct connection *connection;
uint32_t index;
size_t qlength;
struct sink_input *sink_input;
struct memblockq *memblockq;
};
@ -50,6 +36,7 @@ struct connection {
struct protocol_native *protocol;
struct client *client;
struct pstream *pstream;
struct pdispatch *pdispatch;
struct idxset *record_streams, *playback_streams;
};
@ -60,6 +47,29 @@ struct protocol_native {
struct idxset *connections;
};
static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk);
static void sink_input_drop_cb(struct sink_input *i, size_t length);
static void sink_input_kill_cb(struct sink_input *i);
static uint32_t sink_input_get_latency_cb(struct sink_input *i);
static void request_bytes(struct playback_stream*s);
static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata);
static const struct pdispatch_command command_table[PA_COMMAND_MAX] = {
[PA_COMMAND_ERROR] = { NULL },
[PA_COMMAND_REPLY] = { NULL },
[PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
[PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream },
[PA_COMMAND_CREATE_RECORD_STREAM] = { NULL },
[PA_COMMAND_DELETE_RECORD_STREAM] = { NULL },
[PA_COMMAND_EXIT] = { command_exit },
};
/* structure management */
static void record_stream_free(struct record_stream* r) {
assert(r && r->connection);
@ -69,18 +79,28 @@ static void record_stream_free(struct record_stream* r) {
free(r);
}
static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct sample_spec *ss, const char *name, size_t maxlength, size_t prebuf) {
static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) {
struct playback_stream *s;
assert(c && sink && s && name && qlen && maxlength && prebuf);
s = malloc(sizeof(struct playback_stream));
assert (s);
s->connection = c;
s->qlength = qlen;
s->sink_input = sink_input_new(sink, ss, name);
assert(s->sink_input);
s->memblockq = memblockq_new(maxlength, sample_size(ss), prebuf);
s->sink_input->peek = sink_input_peek_cb;
s->sink_input->drop = sink_input_drop_cb;
s->sink_input->kill = sink_input_kill_cb;
s->sink_input->get_latency = sink_input_get_latency_cb;
s->sink_input->userdata = s;
s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf);
assert(s->memblockq);
idxset_put(c->playback_streams, s, &s->index);
request_bytes(s);
return s;
}
@ -99,7 +119,6 @@ static void connection_free(struct connection *c) {
assert(c && c->protocol);
idxset_remove_by_data(c->protocol->connections, c, NULL);
pstream_free(c->pstream);
while ((r = idxset_first(c->record_streams, NULL)))
record_stream_free(r);
idxset_free(c->record_streams, NULL, NULL);
@ -108,67 +127,90 @@ static void connection_free(struct connection *c) {
playback_stream_free(p);
idxset_free(c->playback_streams, NULL, NULL);
pdispatch_free(c->pdispatch);
pstream_free(c->pstream);
client_free(c->client);
free(c);
}
/*** pstream callbacks ***/
static void request_bytes(struct playback_stream *s) {
struct tagstruct *t;
size_t l;
assert(s);
static void send_tagstruct(struct pstream *p, struct tagstruct *t) {
size_t length;
uint8_t *data;
struct packet *packet;
assert(p && t);
if (!(l = memblockq_missing_to(s->memblockq, s->qlength)))
return;
data = tagstruct_free_data(t, &length);
assert(data && length);
packet = packet_new_dynamic(data, length);
assert(packet);
pstream_send_packet(p, packet);
packet_unref(packet);
}
static void send_error(struct pstream *p, uint32_t tag, uint32_t error) {
struct tagstruct *t = tagstruct_new(NULL, 0);
t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, COMMAND_ERROR);
tagstruct_putu32(t, tag);
tagstruct_putu32(t, error);
send_tagstruct(p, t);
tagstruct_putu32(t, PA_COMMAND_REQUEST);
tagstruct_putu32(t, s->index);
tagstruct_putu32(t, l);
pstream_send_tagstruct(s->connection->pstream, t);
}
static void send_simple_ack(struct pstream *p, uint32_t tag) {
struct tagstruct *t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, COMMAND_REPLY);
tagstruct_putu32(t, tag);
send_tagstruct(p, t);
}
/*** sinkinput callbacks ***/
struct command {
int (*func)(struct connection *c, uint32_t tag, struct tagstruct *t);
};
static int command_create_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) {
static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) {
struct playback_stream *s;
size_t maxlength, prebuf;
assert(i && i->userdata && chunk);
s = i->userdata;
if (memblockq_peek(s->memblockq, chunk) < 0)
return -1;
return 0;
}
static void sink_input_drop_cb(struct sink_input *i, size_t length) {
struct playback_stream *s;
assert(i && i->userdata && length);
s = i->userdata;
memblockq_drop(s->memblockq, length);
request_bytes(s);
}
static void sink_input_kill_cb(struct sink_input *i) {
struct playback_stream *s;
assert(i && i->userdata);
s = i->userdata;
playback_stream_free(s);
}
static uint32_t sink_input_get_latency_cb(struct sink_input *i) {
struct playback_stream *s;
assert(i && i->userdata);
s = i->userdata;
return pa_samples_usec(memblockq_get_length(s->memblockq), &s->sink_input->sample_spec);
}
/*** pdispatch callbacks ***/
static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
struct connection *c = userdata;
struct playback_stream *s;
size_t maxlength, prebuf, qlength;
uint32_t sink_index;
const char *name;
struct sample_spec ss;
struct pa_sample_spec ss;
struct tagstruct *reply;
struct sink *sink;
assert(c && t && c->protocol && c->protocol->core);
if (tagstruct_gets(t, &name) < 0 ||
tagstruct_get_sample_spec(t, &ss) < 0 ||
tagstruct_getu32(t, &sink_index) < 0 ||
tagstruct_getu32(t, &sink_index) < 0 ||
tagstruct_getu32(t, &qlength) < 0 ||
tagstruct_getu32(t, &maxlength) < 0 ||
tagstruct_getu32(t, &prebuf) < 0 ||
!tagstruct_eof(t))
return -1;
if (!c->authorized) {
send_error(c->pstream, tag, ERROR_ACCESS);
pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0;
}
@ -178,25 +220,28 @@ static int command_create_playback_stream(struct connection *c, uint32_t tag, st
sink = idxset_get_by_index(c->protocol->core->sinks, sink_index);
if (!sink) {
send_error(c->pstream, tag, ERROR_EXIST);
pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
return 0;
}
if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, prebuf))) {
send_error(c->pstream, tag, ERROR_ARGUMENT);
if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) {
pstream_send_error(c->pstream, tag, PA_ERROR_INVALID);
return 0;
}
reply = tagstruct_new(NULL, 0);
assert(reply);
tagstruct_putu32(reply, COMMAND_REPLY);
tagstruct_putu32(reply, PA_COMMAND_REPLY);
tagstruct_putu32(reply, tag);
tagstruct_putu32(reply, s->index);
send_tagstruct(c->pstream, reply);
assert(s->sink_input);
tagstruct_putu32(reply, s->sink_input->index);
pstream_send_tagstruct(c->pstream, reply);
return 0;
}
static int command_delete_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) {
static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
struct connection *c = userdata;
uint32_t channel;
struct playback_stream *s;
assert(c && t);
@ -206,78 +251,50 @@ static int command_delete_playback_stream(struct connection *c, uint32_t tag, st
return -1;
if (!c->authorized) {
send_error(c->pstream, tag, ERROR_ACCESS);
pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0;
}
if (!(s = idxset_get_by_index(c->playback_streams, channel))) {
send_error(c->pstream, tag, ERROR_EXIST);
pstream_send_error(c->pstream, tag, PA_ERROR_EXIST);
return 0;
}
send_simple_ack(c->pstream, tag);
pstream_send_simple_ack(c->pstream, tag);
return 0;
}
static int command_exit(struct connection *c, uint32_t tag, struct tagstruct *t) {
static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) {
struct connection *c = userdata;
assert(c && t);
if (!tagstruct_eof(t))
return -1;
if (!c->authorized) {
send_error(c->pstream, tag, ERROR_ACCESS);
pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS);
return 0;
}
assert(c->protocol && c->protocol->core);
mainloop_quit(c->protocol->core->mainloop, -1);
send_simple_ack(c->pstream, tag); /* nonsense */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop);
c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0);
pstream_send_simple_ack(c->pstream, tag); /* nonsense */
return 0;
}
static const struct command commands[] = {
[COMMAND_ERROR] = { NULL },
[COMMAND_REPLY] = { NULL },
[COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream },
[COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream },
[COMMAND_CREATE_RECORD_STREAM] = { NULL },
[COMMAND_DELETE_RECORD_STREAM] = { NULL },
[COMMAND_EXIT] = { command_exit },
};
/*** pstream callbacks ***/
static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) {
struct connection *c = userdata;
uint32_t tag, command;
struct tagstruct *ts = NULL;
assert(p && packet && packet->data && c);
if (packet->length <= 8)
goto fail;
ts = tagstruct_new(packet->data, packet->length);
assert(ts);
if (tagstruct_getu32(ts, &command) < 0 ||
tagstruct_getu32(ts, &tag) < 0)
goto fail;
if (command >= COMMAND_MAX || !commands[command].func)
send_error(p, tag, ERROR_COMMAND);
else if (commands[command].func(c, tag, ts) < 0)
goto fail;
if (pdispatch_run(c->pdispatch, packet, c) < 0) {
fprintf(stderr, "protocol-native: invalid packet.\n");
return -1;
}
tagstruct_free(ts);
return 0;
fail:
if (ts)
tagstruct_free(ts);
fprintf(stderr, "protocol-native: invalid packet.\n");
return -1;
}
static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) {
@ -326,6 +343,9 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
pstream_set_recieve_memblock_callback(c->pstream, memblock_callback, c);
pstream_set_die_callback(c->pstream, die_callback, c);
c->pdispatch = pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX);
assert(c->pdispatch);
c->record_streams = idxset_new(NULL, NULL);
c->playback_streams = idxset_new(NULL, NULL);
assert(c->record_streams && c->playback_streams);

View file

@ -9,6 +9,7 @@
#include "sourceoutput.h"
#include "protocol-simple.h"
#include "client.h"
#include "sample-util.h"
struct connection {
struct protocol_simple *protocol;
@ -115,9 +116,10 @@ static int do_write(struct connection *c) {
/*** sink_input callbacks ***/
static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) {
struct connection*c = i->userdata;
assert(i && c && chunk);
struct connection*c;
assert(i && i->userdata && chunk);
c = i->userdata;
if (memblockq_peek(c->input_memblockq, chunk) < 0)
return -1;
@ -143,7 +145,7 @@ static void sink_input_kill_cb(struct sink_input *i) {
static uint32_t sink_input_get_latency_cb(struct sink_input *i) {
struct connection*c = i->userdata;
assert(i && c);
return samples_usec(memblockq_get_length(c->input_memblockq), &DEFAULT_SAMPLE_SPEC);
return pa_samples_usec(memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec);
}
/*** source_output callbacks ***/
@ -185,6 +187,7 @@ static void io_callback(struct iochannel*io, void *userdata) {
static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
struct protocol_simple *p = userdata;
struct connection *c = NULL;
char cname[256];
assert(s && io && p);
c = malloc(sizeof(struct connection));
@ -195,7 +198,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->input_memblockq = c->output_memblockq = NULL;
c->protocol = p;
c->client = client_new(p->core, "SIMPLE", "Client");
iochannel_peer_to_string(io, cname, sizeof(cname));
c->client = client_new(p->core, "SIMPLE", cname);
assert(c->client);
c->client->kill = client_kill_cb;
c->client->userdata = c;
@ -215,8 +219,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->source_output->kill = source_output_kill_cb;
c->source_output->userdata = c;
l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
l = 5*pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
c->output_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
}
if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
@ -236,8 +240,8 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->sink_input->get_latency = sink_input_get_latency_cb;
c->sink_input->userdata = c;
l = bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
l = pa_bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
c->input_memblockq = memblockq_new(l, pa_sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
}

35
src/pstream-util.c Normal file
View file

@ -0,0 +1,35 @@
#include <assert.h>
#include "protocol-native-spec.h"
#include "pstream-util.h"
void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t) {
size_t length;
uint8_t *data;
struct packet *packet;
assert(p && t);
data = tagstruct_free_data(t, &length);
assert(data && length);
packet = packet_new_dynamic(data, length);
assert(packet);
pstream_send_packet(p, packet);
packet_unref(packet);
}
void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error) {
struct tagstruct *t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, PA_COMMAND_ERROR);
tagstruct_putu32(t, tag);
tagstruct_putu32(t, error);
pstream_send_tagstruct(p, t);
}
void pstream_send_simple_ack(struct pstream *p, uint32_t tag) {
struct tagstruct *t = tagstruct_new(NULL, 0);
assert(t);
tagstruct_putu32(t, PA_COMMAND_REPLY);
tagstruct_putu32(t, tag);
pstream_send_tagstruct(p, t);
}

14
src/pstream-util.h Normal file
View file

@ -0,0 +1,14 @@
#ifndef foopstreamutilhfoo
#define foopstreamutilhfoo
#include <inttypes.h>
#include "pstream.h"
#include "tagstruct.h"
/* The tagstruct is freed!*/
void pstream_send_tagstruct(struct pstream *p, struct tagstruct *t);
void pstream_send_error(struct pstream *p, uint32_t tag, uint32_t error);
void pstream_send_simple_ack(struct pstream *p, uint32_t tag);
#endif

View file

@ -30,7 +30,7 @@ struct item_info {
};
struct pstream {
struct mainloop *mainloop;
struct pa_mainloop_api *mainloop;
struct mainloop_source *mainloop_source;
struct iochannel *io;
struct queue *send_queue;
@ -70,18 +70,24 @@ static void do_read(struct pstream *p);
static void io_callback(struct iochannel*io, void *userdata) {
struct pstream *p = userdata;
assert(p && p->io == io);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
do_write(p);
do_read(p);
}
static void prepare_callback(struct mainloop_source *s, void*userdata) {
static void fixed_callback(struct pa_mainloop_api *m, void *id, void*userdata) {
struct pstream *p = userdata;
assert(p && p->mainloop_source == s);
assert(p && p->mainloop_source == id && p->mainloop == m);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 0);
do_write(p);
do_read(p);
}
struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
struct pstream *pstream_new(struct pa_mainloop_api *m, struct iochannel *io) {
struct pstream *p;
assert(io);
@ -96,8 +102,8 @@ struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
p->die_callback_userdata = NULL;
p->mainloop = m;
p->mainloop_source = mainloop_source_new_fixed(m, prepare_callback, p);
mainloop_source_enable(p->mainloop_source, 0);
p->mainloop_source = m->source_fixed(m, fixed_callback, p);
m->enable_fixed(m, p->mainloop_source, 0);
p->send_queue = queue_new();
assert(p->send_queue);
@ -152,7 +158,7 @@ void pstream_free(struct pstream *p) {
if (p->read.packet)
packet_unref(p->read.packet);
mainloop_source_free(p->mainloop_source);
p->mainloop->cancel_fixed(p->mainloop, p->mainloop_source);
free(p);
}
@ -173,7 +179,7 @@ void pstream_send_packet(struct pstream*p, struct packet *packet) {
i->packet = packet_ref(packet);
queue_push(p->send_queue, i);
mainloop_source_enable(p->mainloop_source, 1);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
}
void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
@ -190,7 +196,7 @@ void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, st
memblock_ref(i->chunk.memblock);
queue_push(p->send_queue, i);
mainloop_source_enable(p->mainloop_source, 1);
p->mainloop->enable_fixed(p->mainloop, p->mainloop_source, 1);
}
void pstream_set_recieve_packet_callback(struct pstream *p, int (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) {
@ -219,7 +225,7 @@ static void prepare_next_write_item(struct pstream *p) {
assert(p->write.current->packet);
p->write.data = p->write.current->packet->data;
p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = htonl(p->write.current->packet->length);
p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0;
p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = htonl((uint32_t) -1);
p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;
} else {
assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
@ -236,8 +242,6 @@ static void do_write(struct pstream *p) {
ssize_t r;
assert(p);
mainloop_source_enable(p->mainloop_source, 0);
if (p->dead || !iochannel_is_writable(p->io))
return;
@ -285,8 +289,6 @@ static void do_read(struct pstream *p) {
ssize_t r;
assert(p);
mainloop_source_enable(p->mainloop_source, 0);
if (p->dead || !iochannel_is_readable(p->io))
return;
@ -313,7 +315,7 @@ static void do_read(struct pstream *p) {
assert(!p->read.packet && !p->read.memblock);
if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == 0) {
if (ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL]) == (uint32_t) -1) {
/* Frame is a packet frame */
p->read.packet = packet_new(ntohl(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]));
assert(p->read.packet);
@ -331,7 +333,7 @@ static void do_read(struct pstream *p) {
if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblock data? Than pass it to the user */
size_t l;
l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r;
l = (p->read.index - r) < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : (size_t) r;
if (l > 0) {
struct memchunk chunk;

View file

@ -6,10 +6,11 @@
#include "packet.h"
#include "memblock.h"
#include "iochannel.h"
#include "mainloop-api.h"
struct pstream;
struct pstream* pstream_new(struct mainloop *m, struct iochannel *io);
struct pstream* pstream_new(struct pa_mainloop_api *m, struct iochannel *io);
void pstream_free(struct pstream*p);
void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata);

View file

@ -73,5 +73,12 @@ void* queue_pop(struct queue *q) {
p = e->data;
free(e);
q->length--;
return p;
}
int queue_is_empty(struct queue *q) {
assert(q);
return q->length == 0;
}

88
src/sample-util.c Normal file
View file

@ -0,0 +1,88 @@
#include <string.h>
#include <assert.h>
#include "sample-util.h"
struct pa_sample_spec default_sample_spec = {
.format = SAMPLE_S16NE,
.rate = 44100,
.channels = 2
};
struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec) {
assert(b && b->data && spec);
memblock_assert_exclusive(b);
silence_memory(b->data, b->length, spec);
return b;
}
void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec) {
assert(c && c->memblock && c->memblock->data && spec && c->length);
memblock_assert_exclusive(c->memblock);
silence_memory(c->memblock->data+c->index, c->length, spec);
}
void silence_memory(void *p, size_t length, struct pa_sample_spec *spec) {
char c = 0;
assert(p && length && spec);
switch (spec->format) {
case SAMPLE_U8:
c = 127;
break;
case SAMPLE_S16LE:
case SAMPLE_S16BE:
case SAMPLE_FLOAT32:
c = 0;
break;
case SAMPLE_ALAW:
case SAMPLE_ULAW:
c = 80;
break;
}
memset(p, c, length);
}
size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume) {
unsigned c, d;
assert(channels && data && length && spec);
assert(spec->format == SAMPLE_S16NE);
for (d = 0;; d += sizeof(int16_t)) {
int32_t sum = 0;
if (d >= length)
return d;
for (c = 0; c < nchannels; c++) {
int32_t v;
uint8_t volume = channels[c].volume;
if (d >= channels[c].chunk.length)
return d;
if (volume == 0)
v = 0;
else {
v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d));
if (volume != 0xFF)
v = v*volume/0xFF;
}
sum += v;
}
if (volume == 0)
sum = 0;
else if (volume != 0xFF)
sum = sum*volume/0xFF;
if (sum < -0x8000) sum = -0x8000;
if (sum > 0x7FFF) sum = 0x7FFF;
*((int16_t*) data) = sum;
data += sizeof(int16_t);
}
}

23
src/sample-util.h Normal file
View file

@ -0,0 +1,23 @@
#ifndef foosampleutilhfoo
#define foosampleutilhfoo
#include "sample.h"
#include "memblock.h"
#define DEFAULT_SAMPLE_SPEC default_sample_spec
extern struct pa_sample_spec default_sample_spec;
struct memblock *silence_memblock(struct memblock* b, struct pa_sample_spec *spec);
void silence_memchunk(struct memchunk *c, struct pa_sample_spec *spec);
void silence_memory(void *p, size_t length, struct pa_sample_spec *spec);
struct mix_info {
struct memchunk chunk;
uint8_t volume;
void *userdata;
};
size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct pa_sample_spec *spec, uint8_t volume);
#endif

View file

@ -1,50 +1,8 @@
#include <string.h>
#include <assert.h>
#include "sample.h"
struct sample_spec default_sample_spec = {
.format = SAMPLE_S16NE,
.rate = 44100,
.channels = 2
};
struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec) {
assert(b && b->data && spec);
memblock_assert_exclusive(b);
silence_memory(b->data, b->length, spec);
return b;
}
void silence_memchunk(struct memchunk *c, struct sample_spec *spec) {
assert(c && c->memblock && c->memblock->data && spec && c->length);
memblock_assert_exclusive(c->memblock);
silence_memory(c->memblock->data+c->index, c->length, spec);
}
void silence_memory(void *p, size_t length, struct sample_spec *spec) {
char c = 0;
assert(p && length && spec);
switch (spec->format) {
case SAMPLE_U8:
c = 127;
break;
case SAMPLE_S16LE:
case SAMPLE_S16BE:
case SAMPLE_FLOAT32:
c = 0;
break;
case SAMPLE_ALAW:
case SAMPLE_ULAW:
c = 80;
break;
}
memset(p, c, length);
}
size_t sample_size(struct sample_spec *spec) {
size_t pa_sample_size(struct pa_sample_spec *spec) {
assert(spec);
size_t b = 1;
@ -66,56 +24,14 @@ size_t sample_size(struct sample_spec *spec) {
return b * spec->channels;
}
size_t bytes_per_second(struct sample_spec *spec) {
size_t pa_bytes_per_second(struct pa_sample_spec *spec) {
assert(spec);
return spec->rate*sample_size(spec);
return spec->rate*pa_sample_size(spec);
}
size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume) {
unsigned c, d;
assert(channels && data && length && spec);
assert(spec->format == SAMPLE_S16NE);
for (d = 0;; d += sizeof(int16_t)) {
int32_t sum = 0;
if (d >= length)
return d;
for (c = 0; c < nchannels; c++) {
int32_t v;
uint8_t volume = channels[c].volume;
if (d >= channels[c].chunk.length)
return d;
if (volume == 0)
v = 0;
else {
v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d));
if (volume != 0xFF)
v = v*volume/0xFF;
}
sum += v;
}
if (volume == 0)
sum = 0;
else if (volume != 0xFF)
sum = sum*volume/0xFF;
if (sum < -0x8000) sum = -0x8000;
if (sum > 0x7FFF) sum = 0x7FFF;
*((int16_t*) data) = sum;
data += sizeof(int16_t);
}
}
uint32_t samples_usec(size_t length, struct sample_spec *spec) {
uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec) {
assert(spec);
return (uint32_t) (((double) length /sample_size(spec))/spec->rate*1000000);
return (uint32_t) (((double) length /pa_sample_size(spec))/spec->rate*1000000);
}

View file

@ -2,10 +2,9 @@
#define foosamplehfoo
#include <inttypes.h>
#include <sys/types.h>
#include "memblock.h"
enum sample_format {
enum pa_sample_format {
SAMPLE_U8,
SAMPLE_ALAW,
SAMPLE_ULAW,
@ -16,30 +15,14 @@ enum sample_format {
#define SAMPLE_S16NE SAMPLE_S16LE
struct sample_spec {
enum sample_format format;
struct pa_sample_spec {
enum pa_sample_format format;
uint32_t rate;
uint8_t channels;
};
#define DEFAULT_SAMPLE_SPEC default_sample_spec
extern struct sample_spec default_sample_spec;
struct memblock *silence_memblock(struct memblock* b, struct sample_spec *spec);
void silence_memchunk(struct memchunk *c, struct sample_spec *spec);
void silence_memory(void *p, size_t length, struct sample_spec *spec);
struct mix_info {
struct memchunk chunk;
uint8_t volume;
void *userdata;
};
size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume);
size_t bytes_per_second(struct sample_spec *spec);
size_t sample_size(struct sample_spec *spec);
uint32_t samples_usec(size_t length, struct sample_spec *spec);
size_t pa_bytes_per_second(struct pa_sample_spec *spec);
size_t pa_sample_size(struct pa_sample_spec *spec);
uint32_t pa_samples_usec(size_t length, struct pa_sample_spec *spec);
#endif

120
src/simple.c Normal file
View file

@ -0,0 +1,120 @@
#include "simple.h"
#include "polyp.h"
#include "mainloop.h"
struct pa_simple {
struct mainloop *mainloop;
struct pa_context *context;
struct pa_stream *stream;
size_t requested;
int dead;
};
static void playback_callback(struct pa_stream *p, size_t length, void *userdata) {
struct pa_stream *sp = userdata;
assert(p && length && sp);
sp->requested = length;
}
struct pa_simple* pa_simple_new(
const char *server,
const char *name,
enum pa_stream_direction dir,
const char *dev,
const char *stream_name,
const struct pa_sample_spec *ss,
const struct pa_buffer_attr *attr) {
struct pa_simple *p;
assert(ss);
p = malloc(sizeof(struct pa_simple));
assert(p);
p->context = NULL;
p->stream = NULL;
p->mainloop = pa_mainloop_new();
assert(p->mainloop);
p->requested = 0;
p->dead = 0;
if (!(p->context = pa_context_new(pa_mainloop_get_api(p->mainloop), name)))
goto fail;
if (pa_context_connect(c, server, NULL, NULL) < 0)
goto fail;
while (!pa_context_is_ready(c)) {
if (pa_context_is_dead(c))
goto fail;
if (mainloop_iterate(p->mainloop) < 0)
goto fail;
}
if (!(p->stream = pa_stream_new(p->context, dir, sink, stream_name, ss, attr, NULL, NULL)))
goto fail;
while (!pa_stream_is_ready(c)) {
if (pa_stream_is_dead(c))
goto fail;
if (mainloop_iterate(p->mainloop) < 0)
goto fail;
}
pa_stream_set_write_callback(p->stream, playback_callback, p);
return p;
fail:
pa_simple_free(p);
return NULL;
}
void pa_simple_free(struct pa_simple *s) {
assert(s);
if (s->stream)
pa_stream_free(s->stream);
if (s->context)
pa_context_free(s->context);
if (s->mainloop)
mainloop_free(s->mainloop);
free(s);
}
int pa_simple_write(struct pa_simple *s, const void*data, size_t length) {
assert(s && data);
while (length > 0) {
size_t l;
while (!s->requested) {
if (pa_context_is_dead(c))
return -1;
if (mainloop_iterate(s->mainloop) < 0)
return -1;
}
l = length;
if (l > s->requested)
l = s->requested;
pa_stream_write(s->stream, data, l);
data += l;
length -= l;
s->requested = -l;
}
return 0;
}
int pa_simple_read(struct pa_simple *s, const void*data, size_t length) {
assert(0);
}

25
src/simple.h Normal file
View file

@ -0,0 +1,25 @@
#ifndef foosimplehfoo
#define foosimplehfoo
#include <sys/types.h>
#include "sample.h"
#include "polypdef.h"
struct pa_simple;
struct pa_simple* pa_simple_new(
const char *server,
const char *name,
enum pa_stream_direction dir,
const char *dev,
const char *stream_name,
const struct pa_sample_spec *ss,
const struct pa_buffer_attr *attr);
void pa_simple_free(struct pa_simple *s);
int pa_simple_write(struct pa_simple *s, const void*data, size_t length);
int pa_simple_read(struct pa_simple *s, const void*data, size_t length);
#endif

View file

@ -6,10 +6,11 @@
#include "sink.h"
#include "sinkinput.h"
#include "strbuf.h"
#include "sample-util.h"
#define MAX_MIX_CHANNELS 32
struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) {
struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {
struct sink *s;
char *n = NULL;
int r;

View file

@ -15,7 +15,7 @@ struct sink {
char *name;
struct core *core;
struct sample_spec sample_spec;
struct pa_sample_spec sample_spec;
struct idxset *inputs;
struct source *monitor_source;
@ -27,7 +27,7 @@ struct sink {
void *userdata;
};
struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec);
struct sink* sink_new(struct core *core, const char *name, const struct pa_sample_spec *spec);
void sink_free(struct sink* s);
int sink_render(struct sink*s, size_t length, struct memchunk *result);

View file

@ -5,7 +5,7 @@
#include "sinkinput.h"
#include "strbuf.h"
struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name) {
struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name) {
struct sink_input *i;
int r;
assert(s && spec);
@ -14,7 +14,7 @@ struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, cons
assert(i);
i->name = name ? strdup(name) : NULL;
i->sink = s;
i->spec = *spec;
i->sample_spec = *spec;
i->peek = NULL;
i->drop = NULL;

View file

@ -12,7 +12,7 @@ struct sink_input {
char *name;
struct sink *sink;
struct sample_spec spec;
struct pa_sample_spec sample_spec;
uint8_t volume;
int (*peek) (struct sink_input *i, struct memchunk *chunk);
@ -23,7 +23,7 @@ struct sink_input {
void *userdata;
};
struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name);
struct sink_input* sink_input_new(struct sink *s, struct pa_sample_spec *spec, const char *name);
void sink_input_free(struct sink_input* i);
/* Code that didn't create the input stream should call this function to

177
src/socket-client.c Normal file
View file

@ -0,0 +1,177 @@
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "socket-client.h"
#include "util.h"
struct socket_client {
struct pa_mainloop_api *mainloop;
int fd;
void *io_source, *fixed_source;
void (*callback)(struct socket_client*c, struct iochannel *io, void *userdata);
void *userdata;
};
static struct socket_client*socket_client_new(struct pa_mainloop_api *m) {
struct socket_client *c;
assert(m);
c = malloc(sizeof(struct socket_client));
assert(c);
c->mainloop = m;
c->fd = -1;
c->io_source = c->fixed_source = NULL;
c->callback = NULL;
c->userdata = NULL;
return c;
}
static void do_call(struct socket_client *c) {
struct iochannel *io;
int error, lerror;
assert(c && c->callback);
lerror = sizeof(error);
if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &error, &lerror) < 0) {
fprintf(stderr, "getsockopt(): %s\n", strerror(errno));
goto failed;
}
if (lerror != sizeof(error)) {
fprintf(stderr, "getsocktop() returned invalid size.\n");
goto failed;
}
if (error != 0) {
fprintf(stderr, "connect(): %s\n", strerror(error));
goto failed;
}
io = iochannel_new(c->mainloop, c->fd, c->fd);
assert(io);
c->fd = -1;
c->callback(c, io, c->userdata);
return;
failed:
close(c->fd);
c->fd = -1;
c->callback(c, NULL, c->userdata);
return;
}
static void connect_fixed_cb(struct pa_mainloop_api *m, void *id, void *userdata) {
struct socket_client *c = userdata;
assert(m && c && c->fixed_source == id);
m->cancel_fixed(m, c->fixed_source);
c->fixed_source = NULL;
do_call(c);
}
static void connect_io_cb(struct pa_mainloop_api*m, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct socket_client *c = userdata;
assert(m && c && c->io_source == id && fd >= 0 && events == PA_MAINLOOP_API_IO_EVENT_OUTPUT);
m->cancel_io(m, c->io_source);
c->io_source = NULL;
do_call(c);
}
static int do_connect(struct socket_client *c, const struct sockaddr *sa, socklen_t len) {
int r;
assert(c && sa && len);
make_nonblock_fd(c->fd);
if ((r = connect(c->fd, sa, len)) < 0) {
if (r != EINPROGRESS) {
fprintf(stderr, "connect(): %s\n", strerror(errno));
return -1;
}
c->io_source = c->mainloop->source_io(c->mainloop, c->fd, PA_MAINLOOP_API_IO_EVENT_OUTPUT, connect_io_cb, c);
assert(c->io_source);
} else {
c->fixed_source = c->mainloop->source_fixed(c->mainloop, connect_fixed_cb, c);
assert(c->io_source);
}
return 0;
}
struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {
struct socket_client *c;
struct sockaddr_in sa;
c = socket_client_new(m);
assert(c);
if ((c->fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, "socket(): %s\n", strerror(errno));
goto fail;
}
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
sa.sin_addr.s_addr = htonl(address);
if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0)
goto fail;
return c;
fail:
socket_client_free(c);
return NULL;
}
struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename) {
struct socket_client *c;
struct sockaddr_un sa;
c = socket_client_new(m);
assert(c);
if ((c->fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, "socket(): %s\n", strerror(errno));
goto fail;
}
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1);
sa.sun_path[sizeof(sa.sun_path) - 1] = 0;
if (do_connect(c, (struct sockaddr*) &sa, sizeof(sa)) < 0)
goto fail;
return c;
fail:
socket_client_free(c);
return NULL;
}
void socket_client_free(struct socket_client *c) {
assert(c && c->mainloop);
if (c->io_source)
c->mainloop->cancel_io(c->mainloop, c->io_source);
if (c->fixed_source)
c->mainloop->cancel_fixed(c->mainloop, c->fixed_source);
if (c->fd >= 0)
close(c->fd);
free(c);
}
void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata) {
assert(c);
c->callback = on_connection;
c->userdata = userdata;
}

17
src/socket-client.h Normal file
View file

@ -0,0 +1,17 @@
#ifndef foosocketclienthfoo
#define foosocketclienthfoo
#include <inttypes.h>
#include "mainloop-api.h"
#include "iochannel.h"
struct socket_client;
struct socket_client* socket_client_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
struct socket_client* socket_client_new_unix(struct pa_mainloop_api *m, const char *filename);
void socket_client_free(struct socket_client *c);
void socket_client_set_callback(struct socket_client *c, void (*on_connection)(struct socket_client *c, struct iochannel*io, void *userdata), void *userdata);
#endif

View file

@ -19,14 +19,15 @@ struct socket_server {
void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata);
void *userdata;
struct mainloop_source *mainloop_source;
void *mainloop_source;
struct pa_mainloop_api *mainloop;
};
static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) {
static void callback(struct pa_mainloop_api *mainloop, void *id, int fd, enum pa_mainloop_api_io_events events, void *userdata) {
struct socket_server *s = userdata;
struct iochannel *io;
int nfd;
assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s);
assert(s && s->mainloop == mainloop && s->mainloop_source == id && id && fd >= 0 && fd == s->fd && events == PA_MAINLOOP_API_IO_EVENT_INPUT);
if ((nfd = accept(fd, NULL, NULL)) < 0) {
fprintf(stderr, "accept(): %s\n", strerror(errno));
@ -38,12 +39,12 @@ static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event
return;
}
io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd);
io = iochannel_new(s->mainloop, nfd, nfd);
assert(io);
s->on_connection(s, io, s->userdata);
}
struct socket_server* socket_server_new(struct mainloop *m, int fd) {
struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd) {
struct socket_server *s;
assert(m && fd >= 0);
@ -54,13 +55,14 @@ struct socket_server* socket_server_new(struct mainloop *m, int fd) {
s->on_connection = NULL;
s->userdata = NULL;
s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s);
s->mainloop = m;
s->mainloop_source = m->source_io(m, fd, PA_MAINLOOP_API_IO_EVENT_INPUT, callback, s);
assert(s->mainloop_source);
return s;
}
struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) {
struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename) {
int fd = -1;
struct sockaddr_un sa;
struct socket_server *s;
@ -101,7 +103,7 @@ fail:
return NULL;
}
struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) {
struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port) {
int fd = -1;
struct sockaddr_in sa;
int on = 1;
@ -148,7 +150,8 @@ void socket_server_free(struct socket_server*s) {
free(s->filename);
}
mainloop_source_free(s->mainloop_source);
s->mainloop->cancel_io(s->mainloop, s->mainloop_source);
free(s);
}

View file

@ -2,14 +2,14 @@
#define foosocketserverhfoo
#include <inttypes.h>
#include "mainloop.h"
#include "mainloop-api.h"
#include "iochannel.h"
struct socket_server;
struct socket_server* socket_server_new(struct mainloop *m, int fd);
struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename);
struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port);
struct socket_server* socket_server_new(struct pa_mainloop_api *m, int fd);
struct socket_server* socket_server_new_unix(struct pa_mainloop_api *m, const char *filename);
struct socket_server* socket_server_new_ipv4(struct pa_mainloop_api *m, uint32_t address, uint16_t port);
void socket_server_free(struct socket_server*s);

View file

@ -7,7 +7,7 @@
#include "sourceoutput.h"
#include "strbuf.h"
struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) {
struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec) {
struct source *s;
int r;
assert(core && spec);

View file

@ -14,14 +14,14 @@ struct source {
char *name;
struct core *core;
struct sample_spec sample_spec;
struct pa_sample_spec sample_spec;
struct idxset *outputs;
void (*notify)(struct source*source);
void *userdata;
};
struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec);
struct source* source_new(struct core *core, const char *name, const struct pa_sample_spec *spec);
void source_free(struct source *s);
/* Pass a new memory block to all output streams */

View file

@ -5,7 +5,7 @@
#include "sourceoutput.h"
#include "strbuf.h"
struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name) {
struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name) {
struct source_output *o;
int r;
assert(s && spec);
@ -14,7 +14,7 @@ struct source_output* source_output_new(struct source *s, struct sample_spec *sp
assert(o);
o->name = name ? strdup(name) : NULL;
o->source = s;
o->spec = *spec;
o->sample_spec = *spec;
o->push = NULL;
o->kill = NULL;

View file

@ -12,7 +12,7 @@ struct source_output {
char *name;
struct source *source;
struct sample_spec spec;
struct pa_sample_spec sample_spec;
void (*push)(struct source_output *o, struct memchunk *chunk);
void (*kill)(struct source_output* o);
@ -20,7 +20,7 @@ struct source_output {
void *userdata;
};
struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name);
struct source_output* source_output_new(struct source *s, struct pa_sample_spec *spec, const char *name);
void source_output_free(struct source_output* o);
void source_output_kill(struct source_output*o);

View file

@ -90,7 +90,7 @@ void tagstruct_putu8(struct tagstruct*t, uint8_t c) {
t->length += 2;
}
void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss) {
void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss) {
assert(t && ss);
extend(t, 7);
t->data[t->length] = TAG_SAMPLE_SPEC;
@ -156,7 +156,7 @@ int tagstruct_getu8(struct tagstruct*t, uint8_t *c) {
return 0;
}
int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss) {
int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss) {
assert(t && ss);
if (t->rindex+7 > t->length)

View file

@ -15,12 +15,12 @@ uint8_t* tagstruct_free_data(struct tagstruct*t, size_t *l);
void tagstruct_puts(struct tagstruct*t, const char *s);
void tagstruct_putu32(struct tagstruct*t, uint32_t i);
void tagstruct_putu8(struct tagstruct*t, uint8_t c);
void tagstruct_put_sample_spec(struct tagstruct *t, struct sample_spec *ss);
void tagstruct_put_sample_spec(struct tagstruct *t, const struct pa_sample_spec *ss);
int tagstruct_gets(struct tagstruct*t, const char **s);
int tagstruct_getu32(struct tagstruct*t, uint32_t *i);
int tagstruct_getu8(struct tagstruct*t, uint8_t *c);
int tagstruct_get_sample_spec(struct tagstruct *t, struct sample_spec *ss);
int tagstruct_get_sample_spec(struct tagstruct *t, struct pa_sample_spec *ss);
int tagstruct_eof(struct tagstruct*t);
const uint8_t* tagstruct_data(struct tagstruct*t, size_t *l);

View file

@ -1,8 +1,10 @@
- sync() function in native library
- name registrar
- native protocol/library
- simple control protocol: kill client/input/output; set_volume
- resampling
- esound protocol
- config parser
- config parser/cmdline
- record testing
-- 0.1
- optimierung von rebuild_pollfds()

62
src/util.c Normal file
View file

@ -0,0 +1,62 @@
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <fcntl.h>
#include "util.h"
void make_nonblock_fd(int fd) {
int v;
if ((v = fcntl(fd, F_GETFL)) >= 0)
if (!(v & O_NONBLOCK))
fcntl(fd, F_SETFL, v|O_NONBLOCK);
}
void peer_to_string(char *c, size_t l, int fd) {
struct stat st;
assert(c && l && fd >= 0);
if (fstat(fd, &st) < 0) {
snprintf(c, l, "Invalid client fd");
return;
}
if (S_ISSOCK(st.st_mode)) {
union {
struct sockaddr sa;
struct sockaddr_in in;
struct sockaddr_un un;
} sa;
socklen_t sa_len = sizeof(sa);
if (getpeername(fd, &sa.sa, &sa_len) >= 0) {
if (sa.sa.sa_family == AF_INET) {
uint32_t ip = ntohl(sa.in.sin_addr.s_addr);
snprintf(c, l, "TCP/IP client from %i.%i.%i.%i:%u",
ip >> 24,
(ip >> 16) & 0xFF,
(ip >> 8) & 0xFF,
ip & 0xFF,
ntohs(sa.in.sin_port));
return;
} else if (sa.sa.sa_family == AF_LOCAL) {
snprintf(c, l, "UNIX client for %s", sa.un.sun_path);
return;
}
}
snprintf(c, l, "Unknown network client");
return;
} else if (S_ISCHR(st.st_mode) && (fd == 0 || fd == 1)) {
snprintf(c, l, "STDIN/STDOUT client");
return;
}
snprintf(c, l, "Unknown client");
}

8
src/util.h Normal file
View file

@ -0,0 +1,8 @@
#ifndef fooutilhfoo
#define fooutilhfoo
void make_nonblock_fd(int fd);
void peer_to_string(char *c, size_t l, int fd);
#endif