poll: remove threads from alsa-sink

Remove the thread from alsa sink and use the pollfd event.
Make it possible to pass multiple fds in one pollfd event
Add 3 callbacks to the pollfd event and add support for timeouts
This commit is contained in:
Wim Taymans 2016-07-08 12:18:01 +02:00
parent ac59fa9371
commit 5fa334a89b
10 changed files with 313 additions and 202 deletions

View file

@ -70,20 +70,53 @@ struct _SpaEvent {
};
/**
* SpaEventPoll:
* @fd: a file descriptor to watch
* @events: events to watch for
* @revents: result events
* @callback: callback called when there was activity on @fd
* @user_data: user data to pass to @callback
* SpaPollFd:
* @fd: a file descriptor
* @events: events to watch
* @revents: events after poll
*/
typedef struct {
int fd;
short events;
short revents;
SpaNotify callback;
void *user_data;
} SpaEventPoll;
} SpaPollFd;
/**
* SpaPollNotifyData:
* @user_data: user data
* @fds: array of file descriptors
* @n_fds: number of elements in @fds
* @now: the current time
* @timeout: the next desired wakeup time relative to @now
*
* Data passed to #SpaPollNotify.
*/
typedef struct {
void *user_data;
SpaPollFd *fds;
unsigned int n_fds;
uint64_t now;
uint64_t timeout;
} SpaPollNotifyData;
typedef int (*SpaPollNotify) (SpaPollNotifyData *data);
/**
* SpaPollItem:
* @fds: array of file descriptors to watch
* @n_fds: number of elements in @fds
* @callback: callback called when there was activity on any of @fds
* @user_data: user data
*/
typedef struct {
SpaPollFd *fds;
unsigned int n_fds;
SpaPollNotify idle_cb;
SpaPollNotify before_cb;
SpaPollNotify after_cb;
void *user_data;
} SpaPollItem;
#ifdef __cplusplus
} /* extern "C" */

View file

@ -4,8 +4,11 @@ alsa_dep = dependency('alsa')
v4l2_dep = dependency('libv4l2')
xv_dep = dependency('x11')
sdl_dep = dependency('sdl2')
dl_lib = find_library('dl', required : true)
pthread_lib = find_library('pthread', required : true)
cc = meson.get_compiler('c')
dl_lib = cc.find_library('dl', required : true)
pthread_lib = cc.find_library('pthread', required : true)
inc = include_directories('include')

View file

@ -20,7 +20,6 @@
#include <stddef.h>
#include <asoundlib.h>
#include <pthread.h>
#include <spa/node.h>
#include <spa/audio/format.h>
@ -58,8 +57,9 @@ typedef struct {
snd_pcm_sframes_t buffer_size;
snd_pcm_sframes_t period_size;
snd_pcm_channel_area_t areas[16];
pthread_t thread;
bool running;
SpaPollFd fds[16];
SpaPollItem poll;
} SpaALSAState;
typedef struct _ALSABuffer ALSABuffer;

View file

@ -175,137 +175,6 @@ xrun_recovery (snd_pcm_t *handle, int err)
return err;
}
/*
* Transfer method - direct write only
*/
static void *
direct_loop (void *user_data)
{
SpaALSASink *this = user_data;
SpaALSAState *state = &this->state;
snd_pcm_t *handle = state->handle;
const snd_pcm_channel_area_t *my_areas;
snd_pcm_uframes_t offset, frames, size;
snd_pcm_sframes_t avail, commitres;
snd_pcm_state_t st;
int err, first = 1;
while (state->running) {
st = snd_pcm_state(handle);
if (st == SND_PCM_STATE_XRUN) {
err = xrun_recovery(handle, -EPIPE);
if (err < 0) {
printf("XRUN recovery failed: %s\n", snd_strerror(err));
return NULL;
}
first = 1;
} else if (st == SND_PCM_STATE_SUSPENDED) {
err = xrun_recovery(handle, -ESTRPIPE);
if (err < 0) {
printf("SUSPEND recovery failed: %s\n", snd_strerror(err));
return NULL;
}
}
avail = snd_pcm_avail_update(handle);
if (avail < 0) {
err = xrun_recovery(handle, avail);
if (err < 0) {
printf("avail update failed: %s\n", snd_strerror(err));
return NULL;
}
first = 1;
continue;
}
if (avail < state->period_size) {
if (first) {
first = 0;
err = snd_pcm_start(handle);
if (err < 0) {
printf("Start error: %s\n", snd_strerror(err));
exit(EXIT_FAILURE);
}
} else {
err = snd_pcm_wait(handle, -1);
if (err < 0) {
if ((err = xrun_recovery(handle, err)) < 0) {
printf("snd_pcm_wait error: %s\n", snd_strerror(err));
exit(EXIT_FAILURE);
}
first = 1;
}
}
continue;
}
size = state->period_size;
while (size > 0) {
frames = size;
err = snd_pcm_mmap_begin(handle, &my_areas, &offset, &frames);
if (err < 0) {
if ((err = xrun_recovery(handle, err)) < 0) {
printf("MMAP begin avail error: %s\n", snd_strerror(err));
exit(EXIT_FAILURE);
}
first = 1;
}
{
SpaEvent event;
ALSABuffer *buffer = &this->buffer;
event.refcount = 1;
event.notify = NULL;
event.type = SPA_EVENT_TYPE_PULL_INPUT;
event.port_id = 0;
event.size = frames * sizeof (uint16_t) * 2;
event.data = buffer;
buffer->buffer.refcount = 1;
buffer->buffer.notify = NULL;
buffer->buffer.size = frames * sizeof (uint16_t) * 2;
buffer->buffer.n_metas = 1;
buffer->buffer.metas = buffer->meta;
buffer->buffer.n_datas = 1;
buffer->buffer.datas = buffer->data;
buffer->header.flags = 0;
buffer->header.seq = 0;
buffer->header.pts = 0;
buffer->header.dts_offset = 0;
buffer->meta[0].type = SPA_META_TYPE_HEADER;
buffer->meta[0].data = &buffer->header;
buffer->meta[0].size = sizeof (buffer->header);
buffer->data[0].type = SPA_DATA_TYPE_MEMPTR;
buffer->data[0].data = (uint8_t *)my_areas[0].addr + (offset * sizeof (uint16_t) * 2);
buffer->data[0].size = frames * sizeof (uint16_t) * 2;
this->event_cb (&this->handle, &event,this->user_data);
spa_buffer_unref ((SpaBuffer *)event.data);
}
if (this->input_buffer) {
if (this->input_buffer != &this->buffer.buffer) {
/* FIXME, copy input */
}
spa_buffer_unref (this->input_buffer);
this->input_buffer = NULL;
}
commitres = snd_pcm_mmap_commit(handle, offset, frames);
if (commitres < 0 || (snd_pcm_uframes_t)commitres != frames) {
if ((err = xrun_recovery(handle, commitres >= 0 ? -EPIPE : commitres)) < 0) {
printf("MMAP commit error: %s\n", snd_strerror(err));
exit(EXIT_FAILURE);
}
first = 1;
}
size -= frames;
}
}
return NULL;
}
static int
spa_alsa_open (SpaALSASink *this)
{
@ -332,6 +201,130 @@ spa_alsa_open (SpaALSASink *this)
return 0;
}
static void
pull_input (SpaALSASink *this, void *data, snd_pcm_uframes_t frames)
{
SpaEvent event;
ALSABuffer *buffer = &this->buffer;
event.refcount = 1;
event.notify = NULL;
event.type = SPA_EVENT_TYPE_PULL_INPUT;
event.port_id = 0;
event.size = frames * sizeof (uint16_t) * 2;
event.data = buffer;
buffer->buffer.refcount = 1;
buffer->buffer.notify = NULL;
buffer->buffer.size = frames * sizeof (uint16_t) * 2;
buffer->buffer.n_metas = 1;
buffer->buffer.metas = buffer->meta;
buffer->buffer.n_datas = 1;
buffer->buffer.datas = buffer->data;
buffer->header.flags = 0;
buffer->header.seq = 0;
buffer->header.pts = 0;
buffer->header.dts_offset = 0;
buffer->meta[0].type = SPA_META_TYPE_HEADER;
buffer->meta[0].data = &buffer->header;
buffer->meta[0].size = sizeof (buffer->header);
buffer->data[0].type = SPA_DATA_TYPE_MEMPTR;
buffer->data[0].data = data;
buffer->data[0].size = frames * sizeof (uint16_t) * 2;
this->event_cb (&this->handle, &event,this->user_data);
spa_buffer_unref ((SpaBuffer *)event.data);
}
static int
mmap_write (SpaALSASink *this)
{
SpaALSAState *state = &this->state;
snd_pcm_t *handle = state->handle;
int err;
snd_pcm_sframes_t avail, commitres;
snd_pcm_uframes_t offset, frames, size;
const snd_pcm_channel_area_t *my_areas;
if ((avail = snd_pcm_avail_update (handle)) < 0) {
if ((err = xrun_recovery (handle, avail)) < 0) {
printf ("Write error: %s\n", snd_strerror (err));
return -1;
}
}
size = avail;
while (size > 0) {
frames = size;
if ((err = snd_pcm_mmap_begin (handle, &my_areas, &offset, &frames)) < 0) {
if ((err = xrun_recovery(handle, err)) < 0) {
printf("MMAP begin avail error: %s\n", snd_strerror(err));
return -1;
}
}
pull_input (this,
(uint8_t *)my_areas[0].addr + (offset * sizeof (uint16_t) * 2),
frames);
if (this->input_buffer) {
if (this->input_buffer != &this->buffer.buffer) {
/* FIXME, copy input */
}
spa_buffer_unref (this->input_buffer);
this->input_buffer = NULL;
}
commitres = snd_pcm_mmap_commit (handle, offset, frames);
if (commitres < 0 || (snd_pcm_uframes_t)commitres != frames) {
if ((err = xrun_recovery (handle, commitres >= 0 ? -EPIPE : commitres)) < 0) {
printf("MMAP commit error: %s\n", snd_strerror(err));
return -1;
}
}
size -= frames;
}
return 0;
}
static int
alsa_on_fd_events (SpaPollNotifyData *data)
{
SpaALSASink *this = data->user_data;
SpaALSAState *state = &this->state;
snd_pcm_t *handle = state->handle;
int err;
unsigned short revents;
snd_pcm_poll_descriptors_revents (handle,
(struct pollfd *)data->fds,
data->n_fds,
&revents);
if (revents & POLLERR) {
if (snd_pcm_state (handle) == SND_PCM_STATE_XRUN ||
snd_pcm_state (handle) == SND_PCM_STATE_SUSPENDED) {
err = snd_pcm_state (handle) == SND_PCM_STATE_XRUN ? -EPIPE : -ESTRPIPE;
if ((err = xrun_recovery (handle, err)) < 0) {
printf ("Write error: %s\n", snd_strerror(err));
return -1;
}
} else {
printf("Wait for poll failed\n");
return -1;
}
}
if (!(revents & POLLOUT))
return -1;
mmap_write (this);
return 0;
}
static int
spa_alsa_close (SpaALSASink *this)
{
@ -353,6 +346,7 @@ spa_alsa_start (SpaALSASink *this)
{
SpaALSAState *state = &this->state;
int err;
SpaEvent event;
if (spa_alsa_open (this) < 0)
return -1;
@ -360,13 +354,34 @@ spa_alsa_start (SpaALSASink *this)
CHECK (set_hwparams (this), "hwparams");
CHECK (set_swparams (this), "swparams");
snd_pcm_dump (this->state.handle, this->state.output);
snd_pcm_dump (state->handle, state->output);
state->running = true;
if ((err = pthread_create (&state->thread, NULL, direct_loop, this)) != 0) {
printf ("can't create thread: %d", err);
state->running = false;
if ((state->poll.n_fds = snd_pcm_poll_descriptors_count (state->handle)) <= 0) {
printf ("Invalid poll descriptors count\n");
return state->poll.n_fds;
}
if ((err = snd_pcm_poll_descriptors (state->handle, (struct pollfd *)state->fds, state->poll.n_fds)) < 0) {
printf ("Unable to obtain poll descriptors for playback: %s\n", snd_strerror(err));
return err;
}
event.refcount = 1;
event.notify = NULL;
event.type = SPA_EVENT_TYPE_ADD_POLL;
event.port_id = 0;
event.data = &state->poll;
event.size = sizeof (state->poll);
state->poll.fds = state->fds;
state->poll.idle_cb = NULL;
state->poll.before_cb = NULL;
state->poll.after_cb = alsa_on_fd_events;
state->poll.user_data = this;
this->event_cb (&this->handle, &event, this->user_data);
mmap_write (this);
err = snd_pcm_start (state->handle);
return err;
}
@ -374,11 +389,18 @@ static int
spa_alsa_stop (SpaALSASink *this)
{
SpaALSAState *state = &this->state;
SpaEvent event;
snd_pcm_drop (state->handle);
event.refcount = 1;
event.notify = NULL;
event.type = SPA_EVENT_TYPE_REMOVE_POLL;
event.port_id = 0;
event.data = &state->poll;
event.size = sizeof (state->poll);
this->event_cb (&this->handle, &event, this->user_data);
if (state->running) {
state->running = false;
pthread_join (state->thread, NULL);
}
spa_alsa_close (this);
return 0;

View file

@ -69,7 +69,8 @@ typedef struct {
V4l2Buffer buffers[MAX_BUFFERS];
V4l2Buffer *ready;
uint32_t ready_count;
SpaEventPoll poll;
SpaPollFd fds[1];
SpaPollItem poll;
} SpaV4l2State;
struct _SpaV4l2Source {
@ -91,14 +92,6 @@ struct _SpaV4l2Source {
#include "v4l2-utils.c"
static const uint32_t min_uint32 = 1;
static const uint32_t max_uint32 = UINT32_MAX;
static const SpaPropRangeInfo uint32_range[] = {
{ "min", "Minimum value", 4, &min_uint32 },
{ "max", "Maximum value", 4, &max_uint32 },
};
enum {
PROP_ID_DEVICE,
PROP_ID_DEVICE_NAME,

View file

@ -343,7 +343,6 @@ mmap_read (SpaV4l2Source *this)
return -1;
}
}
fprintf (stderr, "captured buffer %d\n", buf.index);
b = &state->buffers[buf.index];
b->next = state->ready;
@ -353,10 +352,10 @@ mmap_read (SpaV4l2Source *this)
return 0;
}
static void
v4l2_on_fd_events (void *user_data)
static int
v4l2_on_fd_events (SpaPollNotifyData *data)
{
SpaV4l2Source *this = user_data;
SpaV4l2Source *this = data->user_data;
SpaEvent event;
mmap_read (this);
@ -368,6 +367,8 @@ v4l2_on_fd_events (void *user_data)
event.size = 0;
event.data = NULL;
this->event_cb (&this->handle, &event, this->user_data);
return 0;
}
static void
@ -386,8 +387,6 @@ v4l2_buffer_free (void *data)
b->buffer.refcount = 1;
b->outstanding = false;
fprintf (stderr, "queue buffer %d\n", buf.index);
if (xioctl (state->fd, VIDIOC_QBUF, &buf) < 0) {
perror ("VIDIOC_QBUF");
}
@ -523,10 +522,15 @@ spa_v4l2_start (SpaV4l2Source *this)
event.data = &state->poll;
event.size = sizeof (state->poll);
state->poll.fd = state->fd;
state->poll.events = POLLIN | POLLPRI | POLLERR;
state->poll.revents = 0;
state->poll.callback = v4l2_on_fd_events;
state->fds[0].fd = state->fd;
state->fds[0].events = POLLIN | POLLPRI | POLLERR;
state->fds[0].revents = 0;
state->poll.fds = state->fds;
state->poll.n_fds = 1;
state->poll.idle_cb = NULL;
state->poll.before_cb = NULL;
state->poll.after_cb = v4l2_on_fd_events;
state->poll.user_data = this;
this->event_cb (&this->handle, &event, this->user_data);

View file

@ -22,7 +22,6 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include <linux/videodev2.h>
#include <spa/node.h>
@ -47,11 +46,6 @@ reset_xv_sink_props (SpaXvSinkProps *props)
#define MAX_BUFFERS 256
typedef struct {
SpaEventPoll poll;
SpaXvSink *sink;
} XvEventPoll;
typedef struct _XvBuffer XvBuffer;
struct _XvBuffer {
@ -68,8 +62,6 @@ struct _XvBuffer {
typedef struct {
bool opened;
int fd;
pthread_t thread;
bool running;
XvBuffer buffers[MAX_BUFFERS];
XvBuffer *ready;
uint32_t ready_count;
@ -95,14 +87,6 @@ struct _SpaXvSink {
#include "xv-utils.c"
static const uint32_t min_uint32 = 1;
static const uint32_t max_uint32 = UINT32_MAX;
static const SpaPropRangeInfo uint32_range[] = {
{ "min", "Minimum value", 4, &min_uint32 },
{ "max", "Maximum value", 4, &max_uint32 },
};
enum {
PROP_ID_DEVICE,
PROP_ID_DEVICE_NAME,

View file

@ -1,6 +1,6 @@
executable('test-mixer', 'test-mixer.c',
include_directories : inc,
dependencies : [dl_lib],
dependencies : [dl_lib, pthread_lib],
install : false)
executable('test-v4l2', 'test-v4l2.c',

View file

@ -22,6 +22,9 @@
#include <stdlib.h>
#include <unistd.h>
#include <dlfcn.h>
#include <errno.h>
#include <pthread.h>
#include <poll.h>
#include <spa/node.h>
#include <spa/audio/format.h>
@ -36,6 +39,11 @@ typedef struct {
const SpaNode *source1_node;
SpaHandle *source2;
const SpaNode *source2_node;
bool running;
pthread_t thread;
SpaPollFd fds[16];
unsigned int n_fds;
SpaPollItem poll;
} AppData;
static SpaResult
@ -101,7 +109,6 @@ on_mix_event (SpaHandle *handle, SpaEvent *event, void *user_data)
oinfo.buffer = buf;
oinfo.event = NULL;
printf ("pull source %p\n", buf);
if (event->port_id == data->mix_ports[0]) {
if ((res = data->source1_node->pull_port_output (data->source1, 1, &oinfo)) < 0)
printf ("got error %d\n", res);
@ -115,7 +122,6 @@ on_mix_event (SpaHandle *handle, SpaEvent *event, void *user_data)
iinfo.buffer = oinfo.buffer;
iinfo.event = oinfo.event;
printf ("push mixer %p\n", iinfo.buffer);
if ((res = data->mix_node->push_port_input (data->mix, 1, &iinfo)) < 0)
printf ("got error from mixer %d\n", res);
break;
@ -146,7 +152,6 @@ on_sink_event (SpaHandle *handle, SpaEvent *event, void *user_data)
oinfo.buffer = buf;
oinfo.event = NULL;
printf ("pull mixer %p\n", buf);
if ((res = data->mix_node->pull_port_output (data->mix, 1, &oinfo)) < 0)
printf ("got error %d\n", res);
@ -155,11 +160,23 @@ on_sink_event (SpaHandle *handle, SpaEvent *event, void *user_data)
iinfo.buffer = oinfo.buffer;
iinfo.event = oinfo.event;
printf ("push sink %p\n", iinfo.buffer);
if ((res = data->sink_node->push_port_input (data->sink, 1, &iinfo)) < 0)
printf ("got error %d\n", res);
break;
}
case SPA_EVENT_TYPE_ADD_POLL:
{
SpaPollItem *poll = event->data;
int i;
data->poll = *poll;
for (i = 0; i < data->poll.n_fds; i++) {
data->fds[i] = poll->fds[i];
}
data->n_fds = data->poll.n_fds;
data->poll.fds = data->fds;
break;
}
default:
printf ("got event %d\n", event->type);
break;
@ -267,19 +284,63 @@ negotiate_formats (AppData *data)
return SPA_RESULT_OK;
}
static void *
loop (void *user_data)
{
AppData *data = user_data;
int r;
printf ("enter thread %d\n", data->poll.n_fds);
while (data->running) {
SpaPollNotifyData ndata;
r = poll ((struct pollfd *)data->fds, data->n_fds, -1);
if (r < 0) {
if (errno == EINTR)
continue;
break;
}
if (r == 0) {
fprintf (stderr, "select timeout\n");
break;
}
if (data->poll.after_cb) {
ndata.fds = data->poll.fds;
ndata.n_fds = data->poll.n_fds;
ndata.user_data = data->poll.user_data;
data->poll.after_cb (&ndata);
}
}
printf ("leave thread\n");
return NULL;
}
static void
run_async_sink (AppData *data)
{
SpaResult res;
SpaCommand cmd;
int err;
cmd.type = SPA_COMMAND_START;
if ((res = data->sink_node->send_command (data->sink, &cmd)) < 0)
printf ("got error %d\n", res);
data->running = true;
if ((err = pthread_create (&data->thread, NULL, loop, data)) != 0) {
printf ("can't create thread: %d %s", err, strerror (err));
data->running = false;
}
printf ("sleeping for 10 seconds\n");
sleep (10);
if (data->running) {
data->running = false;
pthread_join (data->thread, NULL);
}
cmd.type = SPA_COMMAND_STOP;
if ((res = data->sink_node->send_command (data->sink, &cmd)) < 0)
printf ("got error %d\n", res);

View file

@ -39,7 +39,9 @@ typedef struct {
SDL_Texture *texture;
bool running;
pthread_t thread;
SpaEventPoll poll;
SpaPollFd fds[16];
unsigned int n_fds;
SpaPollItem poll;
} AppData;
static SpaResult
@ -128,8 +130,15 @@ on_source_event (SpaHandle *handle, SpaEvent *event, void *user_data)
break;
}
case SPA_EVENT_TYPE_ADD_POLL:
memcpy (&data->poll, event->data, sizeof (SpaEventPoll));
{
SpaPollItem *poll = event->data;
data->poll = *poll;
data->fds[0] = poll->fds[0];
data->n_fds = 1;
data->poll.fds = data->fds;
break;
}
default:
printf ("got event %d\n", event->type);
break;
@ -200,16 +209,13 @@ static void *
loop (void *user_data)
{
AppData *data = user_data;
struct pollfd fds[1];
int r;
fds[0].fd = data->poll.fd;
fds[0].events = data->poll.events;
fds[0].revents = 0;
printf ("enter thread\n");
while (data->running) {
r = poll (fds, 1, 2000);
SpaPollNotifyData ndata;
r = poll ((struct pollfd *) data->fds, data->n_fds, -1);
if (r < 0) {
if (errno == EINTR)
continue;
@ -219,7 +225,12 @@ loop (void *user_data)
fprintf (stderr, "select timeout\n");
break;
}
data->poll.callback (data->poll.user_data);
if (data->poll.after_cb) {
ndata.fds = data->poll.fds;
ndata.n_fds = data->poll.n_fds;
ndata.user_data = data->poll.user_data;
data->poll.after_cb (&ndata);
}
}
printf ("leave thread\n");
return NULL;