diff --git a/spa/include/spa/event.h b/spa/include/spa/event.h index 8d9c72604..f3037be59 100644 --- a/spa/include/spa/event.h +++ b/spa/include/spa/event.h @@ -70,20 +70,53 @@ struct _SpaEvent { }; /** - * SpaEventPoll: - * @fd: a file descriptor to watch - * @events: events to watch for - * @revents: result events - * @callback: callback called when there was activity on @fd - * @user_data: user data to pass to @callback + * SpaPollFd: + * @fd: a file descriptor + * @events: events to watch + * @revents: events after poll */ typedef struct { int fd; short events; short revents; - SpaNotify callback; - void *user_data; -} SpaEventPoll; +} SpaPollFd; + + +/** + * SpaPollNotifyData: + * @user_data: user data + * @fds: array of file descriptors + * @n_fds: number of elements in @fds + * @now: the current time + * @timeout: the next desired wakeup time relative to @now + * + * Data passed to #SpaPollNotify. + */ +typedef struct { + void *user_data; + SpaPollFd *fds; + unsigned int n_fds; + uint64_t now; + uint64_t timeout; +} SpaPollNotifyData; + +typedef int (*SpaPollNotify) (SpaPollNotifyData *data); + +/** + * SpaPollItem: + * @fds: array of file descriptors to watch + * @n_fds: number of elements in @fds + * @callback: callback called when there was activity on any of @fds + * @user_data: user data + */ +typedef struct { + SpaPollFd *fds; + unsigned int n_fds; + SpaPollNotify idle_cb; + SpaPollNotify before_cb; + SpaPollNotify after_cb; + void *user_data; +} SpaPollItem; #ifdef __cplusplus } /* extern "C" */ diff --git a/spa/meson.build b/spa/meson.build index 40e6894c8..0160cca03 100644 --- a/spa/meson.build +++ b/spa/meson.build @@ -4,8 +4,11 @@ alsa_dep = dependency('alsa') v4l2_dep = dependency('libv4l2') xv_dep = dependency('x11') sdl_dep = dependency('sdl2') -dl_lib = find_library('dl', required : true) -pthread_lib = find_library('pthread', required : true) + +cc = meson.get_compiler('c') + +dl_lib = cc.find_library('dl', required : true) +pthread_lib = cc.find_library('pthread', required : true) inc = include_directories('include') diff --git a/spa/plugins/alsa/alsa-sink.c b/spa/plugins/alsa/alsa-sink.c index 17019042a..c3ebf8767 100644 --- a/spa/plugins/alsa/alsa-sink.c +++ b/spa/plugins/alsa/alsa-sink.c @@ -20,7 +20,6 @@ #include #include -#include #include #include @@ -58,8 +57,9 @@ typedef struct { snd_pcm_sframes_t buffer_size; snd_pcm_sframes_t period_size; snd_pcm_channel_area_t areas[16]; - pthread_t thread; bool running; + SpaPollFd fds[16]; + SpaPollItem poll; } SpaALSAState; typedef struct _ALSABuffer ALSABuffer; diff --git a/spa/plugins/alsa/alsa-utils.c b/spa/plugins/alsa/alsa-utils.c index 25290d458..3be5dfb03 100644 --- a/spa/plugins/alsa/alsa-utils.c +++ b/spa/plugins/alsa/alsa-utils.c @@ -175,137 +175,6 @@ xrun_recovery (snd_pcm_t *handle, int err) return err; } -/* - * Transfer method - direct write only - */ -static void * -direct_loop (void *user_data) -{ - SpaALSASink *this = user_data; - SpaALSAState *state = &this->state; - snd_pcm_t *handle = state->handle; - const snd_pcm_channel_area_t *my_areas; - snd_pcm_uframes_t offset, frames, size; - snd_pcm_sframes_t avail, commitres; - snd_pcm_state_t st; - int err, first = 1; - - while (state->running) { - st = snd_pcm_state(handle); - if (st == SND_PCM_STATE_XRUN) { - err = xrun_recovery(handle, -EPIPE); - if (err < 0) { - printf("XRUN recovery failed: %s\n", snd_strerror(err)); - return NULL; - } - first = 1; - } else if (st == SND_PCM_STATE_SUSPENDED) { - err = xrun_recovery(handle, -ESTRPIPE); - if (err < 0) { - printf("SUSPEND recovery failed: %s\n", snd_strerror(err)); - return NULL; - } - } - avail = snd_pcm_avail_update(handle); - if (avail < 0) { - err = xrun_recovery(handle, avail); - if (err < 0) { - printf("avail update failed: %s\n", snd_strerror(err)); - return NULL; - } - first = 1; - continue; - } - if (avail < state->period_size) { - if (first) { - first = 0; - err = snd_pcm_start(handle); - if (err < 0) { - printf("Start error: %s\n", snd_strerror(err)); - exit(EXIT_FAILURE); - } - } else { - err = snd_pcm_wait(handle, -1); - if (err < 0) { - if ((err = xrun_recovery(handle, err)) < 0) { - printf("snd_pcm_wait error: %s\n", snd_strerror(err)); - exit(EXIT_FAILURE); - } - first = 1; - } - } - continue; - } - size = state->period_size; - while (size > 0) { - frames = size; - err = snd_pcm_mmap_begin(handle, &my_areas, &offset, &frames); - if (err < 0) { - if ((err = xrun_recovery(handle, err)) < 0) { - printf("MMAP begin avail error: %s\n", snd_strerror(err)); - exit(EXIT_FAILURE); - } - first = 1; - } - - { - SpaEvent event; - ALSABuffer *buffer = &this->buffer; - - event.refcount = 1; - event.notify = NULL; - event.type = SPA_EVENT_TYPE_PULL_INPUT; - event.port_id = 0; - event.size = frames * sizeof (uint16_t) * 2; - event.data = buffer; - - buffer->buffer.refcount = 1; - buffer->buffer.notify = NULL; - buffer->buffer.size = frames * sizeof (uint16_t) * 2; - buffer->buffer.n_metas = 1; - buffer->buffer.metas = buffer->meta; - buffer->buffer.n_datas = 1; - buffer->buffer.datas = buffer->data; - - buffer->header.flags = 0; - buffer->header.seq = 0; - buffer->header.pts = 0; - buffer->header.dts_offset = 0; - - buffer->meta[0].type = SPA_META_TYPE_HEADER; - buffer->meta[0].data = &buffer->header; - buffer->meta[0].size = sizeof (buffer->header); - - buffer->data[0].type = SPA_DATA_TYPE_MEMPTR; - buffer->data[0].data = (uint8_t *)my_areas[0].addr + (offset * sizeof (uint16_t) * 2); - buffer->data[0].size = frames * sizeof (uint16_t) * 2; - - this->event_cb (&this->handle, &event,this->user_data); - - spa_buffer_unref ((SpaBuffer *)event.data); - } - if (this->input_buffer) { - if (this->input_buffer != &this->buffer.buffer) { - /* FIXME, copy input */ - } - spa_buffer_unref (this->input_buffer); - this->input_buffer = NULL; - } - - commitres = snd_pcm_mmap_commit(handle, offset, frames); - if (commitres < 0 || (snd_pcm_uframes_t)commitres != frames) { - if ((err = xrun_recovery(handle, commitres >= 0 ? -EPIPE : commitres)) < 0) { - printf("MMAP commit error: %s\n", snd_strerror(err)); - exit(EXIT_FAILURE); - } - first = 1; - } - size -= frames; - } - } - return NULL; -} - static int spa_alsa_open (SpaALSASink *this) { @@ -332,6 +201,130 @@ spa_alsa_open (SpaALSASink *this) return 0; } +static void +pull_input (SpaALSASink *this, void *data, snd_pcm_uframes_t frames) +{ + SpaEvent event; + ALSABuffer *buffer = &this->buffer; + + event.refcount = 1; + event.notify = NULL; + event.type = SPA_EVENT_TYPE_PULL_INPUT; + event.port_id = 0; + event.size = frames * sizeof (uint16_t) * 2; + event.data = buffer; + + buffer->buffer.refcount = 1; + buffer->buffer.notify = NULL; + buffer->buffer.size = frames * sizeof (uint16_t) * 2; + buffer->buffer.n_metas = 1; + buffer->buffer.metas = buffer->meta; + buffer->buffer.n_datas = 1; + buffer->buffer.datas = buffer->data; + + buffer->header.flags = 0; + buffer->header.seq = 0; + buffer->header.pts = 0; + buffer->header.dts_offset = 0; + + buffer->meta[0].type = SPA_META_TYPE_HEADER; + buffer->meta[0].data = &buffer->header; + buffer->meta[0].size = sizeof (buffer->header); + + buffer->data[0].type = SPA_DATA_TYPE_MEMPTR; + buffer->data[0].data = data; + buffer->data[0].size = frames * sizeof (uint16_t) * 2; + + this->event_cb (&this->handle, &event,this->user_data); + + spa_buffer_unref ((SpaBuffer *)event.data); +} + +static int +mmap_write (SpaALSASink *this) +{ + SpaALSAState *state = &this->state; + snd_pcm_t *handle = state->handle; + int err; + snd_pcm_sframes_t avail, commitres; + snd_pcm_uframes_t offset, frames, size; + const snd_pcm_channel_area_t *my_areas; + + if ((avail = snd_pcm_avail_update (handle)) < 0) { + if ((err = xrun_recovery (handle, avail)) < 0) { + printf ("Write error: %s\n", snd_strerror (err)); + return -1; + } + } + + size = avail; + while (size > 0) { + frames = size; + if ((err = snd_pcm_mmap_begin (handle, &my_areas, &offset, &frames)) < 0) { + if ((err = xrun_recovery(handle, err)) < 0) { + printf("MMAP begin avail error: %s\n", snd_strerror(err)); + return -1; + } + } + + pull_input (this, + (uint8_t *)my_areas[0].addr + (offset * sizeof (uint16_t) * 2), + frames); + + if (this->input_buffer) { + if (this->input_buffer != &this->buffer.buffer) { + /* FIXME, copy input */ + } + spa_buffer_unref (this->input_buffer); + this->input_buffer = NULL; + } + + commitres = snd_pcm_mmap_commit (handle, offset, frames); + if (commitres < 0 || (snd_pcm_uframes_t)commitres != frames) { + if ((err = xrun_recovery (handle, commitres >= 0 ? -EPIPE : commitres)) < 0) { + printf("MMAP commit error: %s\n", snd_strerror(err)); + return -1; + } + } + size -= frames; + } + return 0; +} + +static int +alsa_on_fd_events (SpaPollNotifyData *data) +{ + SpaALSASink *this = data->user_data; + SpaALSAState *state = &this->state; + snd_pcm_t *handle = state->handle; + int err; + unsigned short revents; + + snd_pcm_poll_descriptors_revents (handle, + (struct pollfd *)data->fds, + data->n_fds, + &revents); + if (revents & POLLERR) { + if (snd_pcm_state (handle) == SND_PCM_STATE_XRUN || + snd_pcm_state (handle) == SND_PCM_STATE_SUSPENDED) { + err = snd_pcm_state (handle) == SND_PCM_STATE_XRUN ? -EPIPE : -ESTRPIPE; + if ((err = xrun_recovery (handle, err)) < 0) { + printf ("Write error: %s\n", snd_strerror(err)); + return -1; + } + } else { + printf("Wait for poll failed\n"); + return -1; + } + } + if (!(revents & POLLOUT)) + return -1; + + mmap_write (this); + + return 0; +} + static int spa_alsa_close (SpaALSASink *this) { @@ -353,6 +346,7 @@ spa_alsa_start (SpaALSASink *this) { SpaALSAState *state = &this->state; int err; + SpaEvent event; if (spa_alsa_open (this) < 0) return -1; @@ -360,13 +354,34 @@ spa_alsa_start (SpaALSASink *this) CHECK (set_hwparams (this), "hwparams"); CHECK (set_swparams (this), "swparams"); - snd_pcm_dump (this->state.handle, this->state.output); + snd_pcm_dump (state->handle, state->output); - state->running = true; - if ((err = pthread_create (&state->thread, NULL, direct_loop, this)) != 0) { - printf ("can't create thread: %d", err); - state->running = false; + if ((state->poll.n_fds = snd_pcm_poll_descriptors_count (state->handle)) <= 0) { + printf ("Invalid poll descriptors count\n"); + return state->poll.n_fds; } + if ((err = snd_pcm_poll_descriptors (state->handle, (struct pollfd *)state->fds, state->poll.n_fds)) < 0) { + printf ("Unable to obtain poll descriptors for playback: %s\n", snd_strerror(err)); + return err; + } + + event.refcount = 1; + event.notify = NULL; + event.type = SPA_EVENT_TYPE_ADD_POLL; + event.port_id = 0; + event.data = &state->poll; + event.size = sizeof (state->poll); + + state->poll.fds = state->fds; + state->poll.idle_cb = NULL; + state->poll.before_cb = NULL; + state->poll.after_cb = alsa_on_fd_events; + state->poll.user_data = this; + this->event_cb (&this->handle, &event, this->user_data); + + mmap_write (this); + err = snd_pcm_start (state->handle); + return err; } @@ -374,11 +389,18 @@ static int spa_alsa_stop (SpaALSASink *this) { SpaALSAState *state = &this->state; + SpaEvent event; + + snd_pcm_drop (state->handle); + + event.refcount = 1; + event.notify = NULL; + event.type = SPA_EVENT_TYPE_REMOVE_POLL; + event.port_id = 0; + event.data = &state->poll; + event.size = sizeof (state->poll); + this->event_cb (&this->handle, &event, this->user_data); - if (state->running) { - state->running = false; - pthread_join (state->thread, NULL); - } spa_alsa_close (this); return 0; diff --git a/spa/plugins/v4l2/v4l2-source.c b/spa/plugins/v4l2/v4l2-source.c index cb3808cab..ea5b691f9 100644 --- a/spa/plugins/v4l2/v4l2-source.c +++ b/spa/plugins/v4l2/v4l2-source.c @@ -69,7 +69,8 @@ typedef struct { V4l2Buffer buffers[MAX_BUFFERS]; V4l2Buffer *ready; uint32_t ready_count; - SpaEventPoll poll; + SpaPollFd fds[1]; + SpaPollItem poll; } SpaV4l2State; struct _SpaV4l2Source { @@ -91,14 +92,6 @@ struct _SpaV4l2Source { #include "v4l2-utils.c" -static const uint32_t min_uint32 = 1; -static const uint32_t max_uint32 = UINT32_MAX; - -static const SpaPropRangeInfo uint32_range[] = { - { "min", "Minimum value", 4, &min_uint32 }, - { "max", "Maximum value", 4, &max_uint32 }, -}; - enum { PROP_ID_DEVICE, PROP_ID_DEVICE_NAME, diff --git a/spa/plugins/v4l2/v4l2-utils.c b/spa/plugins/v4l2/v4l2-utils.c index c47acffb4..abec0db2e 100644 --- a/spa/plugins/v4l2/v4l2-utils.c +++ b/spa/plugins/v4l2/v4l2-utils.c @@ -343,7 +343,6 @@ mmap_read (SpaV4l2Source *this) return -1; } } - fprintf (stderr, "captured buffer %d\n", buf.index); b = &state->buffers[buf.index]; b->next = state->ready; @@ -353,10 +352,10 @@ mmap_read (SpaV4l2Source *this) return 0; } -static void -v4l2_on_fd_events (void *user_data) +static int +v4l2_on_fd_events (SpaPollNotifyData *data) { - SpaV4l2Source *this = user_data; + SpaV4l2Source *this = data->user_data; SpaEvent event; mmap_read (this); @@ -368,6 +367,8 @@ v4l2_on_fd_events (void *user_data) event.size = 0; event.data = NULL; this->event_cb (&this->handle, &event, this->user_data); + + return 0; } static void @@ -386,8 +387,6 @@ v4l2_buffer_free (void *data) b->buffer.refcount = 1; b->outstanding = false; - fprintf (stderr, "queue buffer %d\n", buf.index); - if (xioctl (state->fd, VIDIOC_QBUF, &buf) < 0) { perror ("VIDIOC_QBUF"); } @@ -523,10 +522,15 @@ spa_v4l2_start (SpaV4l2Source *this) event.data = &state->poll; event.size = sizeof (state->poll); - state->poll.fd = state->fd; - state->poll.events = POLLIN | POLLPRI | POLLERR; - state->poll.revents = 0; - state->poll.callback = v4l2_on_fd_events; + state->fds[0].fd = state->fd; + state->fds[0].events = POLLIN | POLLPRI | POLLERR; + state->fds[0].revents = 0; + + state->poll.fds = state->fds; + state->poll.n_fds = 1; + state->poll.idle_cb = NULL; + state->poll.before_cb = NULL; + state->poll.after_cb = v4l2_on_fd_events; state->poll.user_data = this; this->event_cb (&this->handle, &event, this->user_data); diff --git a/spa/plugins/xv/xv-sink.c b/spa/plugins/xv/xv-sink.c index 2ff8aefe0..875832bd7 100644 --- a/spa/plugins/xv/xv-sink.c +++ b/spa/plugins/xv/xv-sink.c @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -47,11 +46,6 @@ reset_xv_sink_props (SpaXvSinkProps *props) #define MAX_BUFFERS 256 -typedef struct { - SpaEventPoll poll; - SpaXvSink *sink; -} XvEventPoll; - typedef struct _XvBuffer XvBuffer; struct _XvBuffer { @@ -68,8 +62,6 @@ struct _XvBuffer { typedef struct { bool opened; int fd; - pthread_t thread; - bool running; XvBuffer buffers[MAX_BUFFERS]; XvBuffer *ready; uint32_t ready_count; @@ -95,14 +87,6 @@ struct _SpaXvSink { #include "xv-utils.c" -static const uint32_t min_uint32 = 1; -static const uint32_t max_uint32 = UINT32_MAX; - -static const SpaPropRangeInfo uint32_range[] = { - { "min", "Minimum value", 4, &min_uint32 }, - { "max", "Maximum value", 4, &max_uint32 }, -}; - enum { PROP_ID_DEVICE, PROP_ID_DEVICE_NAME, diff --git a/spa/tests/meson.build b/spa/tests/meson.build index 0c5ba4364..d2df5f149 100644 --- a/spa/tests/meson.build +++ b/spa/tests/meson.build @@ -1,6 +1,6 @@ executable('test-mixer', 'test-mixer.c', include_directories : inc, - dependencies : [dl_lib], + dependencies : [dl_lib, pthread_lib], install : false) executable('test-v4l2', 'test-v4l2.c', diff --git a/spa/tests/test-mixer.c b/spa/tests/test-mixer.c index 0b3a3aead..449b653f8 100644 --- a/spa/tests/test-mixer.c +++ b/spa/tests/test-mixer.c @@ -22,6 +22,9 @@ #include #include #include +#include +#include +#include #include #include @@ -36,6 +39,11 @@ typedef struct { const SpaNode *source1_node; SpaHandle *source2; const SpaNode *source2_node; + bool running; + pthread_t thread; + SpaPollFd fds[16]; + unsigned int n_fds; + SpaPollItem poll; } AppData; static SpaResult @@ -101,7 +109,6 @@ on_mix_event (SpaHandle *handle, SpaEvent *event, void *user_data) oinfo.buffer = buf; oinfo.event = NULL; - printf ("pull source %p\n", buf); if (event->port_id == data->mix_ports[0]) { if ((res = data->source1_node->pull_port_output (data->source1, 1, &oinfo)) < 0) printf ("got error %d\n", res); @@ -115,7 +122,6 @@ on_mix_event (SpaHandle *handle, SpaEvent *event, void *user_data) iinfo.buffer = oinfo.buffer; iinfo.event = oinfo.event; - printf ("push mixer %p\n", iinfo.buffer); if ((res = data->mix_node->push_port_input (data->mix, 1, &iinfo)) < 0) printf ("got error from mixer %d\n", res); break; @@ -146,7 +152,6 @@ on_sink_event (SpaHandle *handle, SpaEvent *event, void *user_data) oinfo.buffer = buf; oinfo.event = NULL; - printf ("pull mixer %p\n", buf); if ((res = data->mix_node->pull_port_output (data->mix, 1, &oinfo)) < 0) printf ("got error %d\n", res); @@ -155,11 +160,23 @@ on_sink_event (SpaHandle *handle, SpaEvent *event, void *user_data) iinfo.buffer = oinfo.buffer; iinfo.event = oinfo.event; - printf ("push sink %p\n", iinfo.buffer); if ((res = data->sink_node->push_port_input (data->sink, 1, &iinfo)) < 0) printf ("got error %d\n", res); break; } + case SPA_EVENT_TYPE_ADD_POLL: + { + SpaPollItem *poll = event->data; + int i; + + data->poll = *poll; + for (i = 0; i < data->poll.n_fds; i++) { + data->fds[i] = poll->fds[i]; + } + data->n_fds = data->poll.n_fds; + data->poll.fds = data->fds; + break; + } default: printf ("got event %d\n", event->type); break; @@ -267,19 +284,63 @@ negotiate_formats (AppData *data) return SPA_RESULT_OK; } +static void * +loop (void *user_data) +{ + AppData *data = user_data; + int r; + + printf ("enter thread %d\n", data->poll.n_fds); + while (data->running) { + SpaPollNotifyData ndata; + + r = poll ((struct pollfd *)data->fds, data->n_fds, -1); + if (r < 0) { + if (errno == EINTR) + continue; + break; + } + if (r == 0) { + fprintf (stderr, "select timeout\n"); + break; + } + if (data->poll.after_cb) { + ndata.fds = data->poll.fds; + ndata.n_fds = data->poll.n_fds; + ndata.user_data = data->poll.user_data; + data->poll.after_cb (&ndata); + } + } + printf ("leave thread\n"); + + return NULL; +} + static void run_async_sink (AppData *data) { SpaResult res; SpaCommand cmd; + int err; cmd.type = SPA_COMMAND_START; if ((res = data->sink_node->send_command (data->sink, &cmd)) < 0) printf ("got error %d\n", res); + data->running = true; + if ((err = pthread_create (&data->thread, NULL, loop, data)) != 0) { + printf ("can't create thread: %d %s", err, strerror (err)); + data->running = false; + } + printf ("sleeping for 10 seconds\n"); sleep (10); + if (data->running) { + data->running = false; + pthread_join (data->thread, NULL); + } + cmd.type = SPA_COMMAND_STOP; if ((res = data->sink_node->send_command (data->sink, &cmd)) < 0) printf ("got error %d\n", res); diff --git a/spa/tests/test-v4l2.c b/spa/tests/test-v4l2.c index 1e6d5a486..1942218fd 100644 --- a/spa/tests/test-v4l2.c +++ b/spa/tests/test-v4l2.c @@ -39,7 +39,9 @@ typedef struct { SDL_Texture *texture; bool running; pthread_t thread; - SpaEventPoll poll; + SpaPollFd fds[16]; + unsigned int n_fds; + SpaPollItem poll; } AppData; static SpaResult @@ -128,8 +130,15 @@ on_source_event (SpaHandle *handle, SpaEvent *event, void *user_data) break; } case SPA_EVENT_TYPE_ADD_POLL: - memcpy (&data->poll, event->data, sizeof (SpaEventPoll)); + { + SpaPollItem *poll = event->data; + + data->poll = *poll; + data->fds[0] = poll->fds[0]; + data->n_fds = 1; + data->poll.fds = data->fds; break; + } default: printf ("got event %d\n", event->type); break; @@ -200,16 +209,13 @@ static void * loop (void *user_data) { AppData *data = user_data; - struct pollfd fds[1]; int r; - fds[0].fd = data->poll.fd; - fds[0].events = data->poll.events; - fds[0].revents = 0; - printf ("enter thread\n"); while (data->running) { - r = poll (fds, 1, 2000); + SpaPollNotifyData ndata; + + r = poll ((struct pollfd *) data->fds, data->n_fds, -1); if (r < 0) { if (errno == EINTR) continue; @@ -219,7 +225,12 @@ loop (void *user_data) fprintf (stderr, "select timeout\n"); break; } - data->poll.callback (data->poll.user_data); + if (data->poll.after_cb) { + ndata.fds = data->poll.fds; + ndata.n_fds = data->poll.n_fds; + ndata.user_data = data->poll.user_data; + data->poll.after_cb (&ndata); + } } printf ("leave thread\n"); return NULL;