From 2254a124af849ab2c4b623ffdb2c312eacc363cf Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 7 Jun 2018 11:16:09 +0200 Subject: [PATCH] implement more api --- src/context.c | 180 ++++++++++++++++++----- src/internal.h | 33 ++++- src/mainloop.c | 146 ++++++++++++++++--- src/stream.c | 383 ++++++++++++++++++++++++++++++++++++++----------- 4 files changed, 603 insertions(+), 139 deletions(-) diff --git a/src/context.c b/src/context.c index 1df2625de..0a4b3a812 100644 --- a/src/context.c +++ b/src/context.c @@ -26,12 +26,13 @@ #include #include +#include #include "internal.h" int pa_context_set_error(pa_context *c, int error) { - spa_assert(error >= 0); - spa_assert(error < PA_ERR_MAX); + pa_assert(error >= 0); + pa_assert(error < PA_ERR_MAX); if (c) c->error = error; return error; @@ -52,8 +53,8 @@ static void context_unlink(pa_context *c) } void pa_context_set_state(pa_context *c, pa_context_state_t st) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); if (c->state == st) return; @@ -72,8 +73,8 @@ void pa_context_set_state(pa_context *c, pa_context_state_t st) { } static void context_fail(pa_context *c, int error) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); pa_context_set_error(c, error); pa_context_set_state(c, PA_CONTEXT_FAILED); @@ -178,6 +179,7 @@ static void remote_state_changed(void *data, enum pw_remote_state old, o = pa_operation_new(c, NULL, on_ready, sizeof(struct ready_data)); d = o->userdata; d->context = c; + pa_operation_unref(o); break; } } @@ -213,11 +215,12 @@ pa_context *pa_context_new_with_proplist(pa_mainloop_api *mainloop, const char * struct pw_properties *props; pa_context *c; - spa_assert(mainloop); + pa_assert(mainloop); props = pw_properties_new(NULL, NULL); if (name) pw_properties_set(props, PA_PROP_APPLICATION_NAME, name); + pw_properties_set(props, "client.api", "pulseaudio"); loop = mainloop->userdata; core = pw_core_new(loop, NULL); @@ -264,8 +267,8 @@ static void context_free(pa_context *c) void pa_context_unref(pa_context *c) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); if (--c->refcount == 0) context_free(c); @@ -273,16 +276,16 @@ void pa_context_unref(pa_context *c) pa_context* pa_context_ref(pa_context *c) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); c->refcount++; return c; } void pa_context_set_state_callback(pa_context *c, pa_context_notify_cb_t cb, void *userdata) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); if (c->state == PA_CONTEXT_TERMINATED || c->state == PA_CONTEXT_FAILED) return; @@ -293,8 +296,8 @@ void pa_context_set_state_callback(pa_context *c, pa_context_notify_cb_t cb, voi void pa_context_set_event_callback(pa_context *c, pa_context_event_cb_t cb, void *userdata) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); if (c->state == PA_CONTEXT_TERMINATED || c->state == PA_CONTEXT_FAILED) return; @@ -308,21 +311,25 @@ int pa_context_errno(pa_context *c) if (!c) return PA_ERR_INVALID; - spa_assert(c->refcount >= 1); + pa_assert(c->refcount >= 1); return c->error; } int pa_context_is_pending(pa_context *c) { - pw_log_warn("Not Implemented"); - return 0; + pa_assert(c); + pa_assert(c->refcount >= 1); + + PA_CHECK_VALIDITY(c, PA_CONTEXT_IS_GOOD(c->state), PA_ERR_BADSTATE); + + return !spa_list_is_empty(&c->operations); } pa_context_state_t pa_context_get_state(pa_context *c) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); return c->state; } @@ -330,8 +337,8 @@ int pa_context_connect(pa_context *c, const char *server, pa_context_flags_t fla { int res; - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); PA_CHECK_VALIDITY(c, c->state == PA_CONTEXT_UNCONNECTED, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(c, !(flags & ~(PA_CONTEXT_NOAUTOSPAWN|PA_CONTEXT_NOFAIL)), PA_ERR_INVALID); @@ -350,8 +357,8 @@ int pa_context_connect(pa_context *c, const char *server, pa_context_flags_t fla void pa_context_disconnect(pa_context *c) { - spa_assert(c); - spa_assert(c->refcount >= 1); + pa_assert(c); + pa_assert(c->refcount >= 1); pw_remote_disconnect(c->remote); @@ -359,16 +366,58 @@ void pa_context_disconnect(pa_context *c) pa_context_set_state(c, PA_CONTEXT_TERMINATED); } +struct notify_data { + pa_context_notify_cb_t cb; + void *userdata; +}; + +static void on_notify(pa_operation *o, void *userdata) +{ + struct notify_data *d = userdata; + pa_operation_done(o); + if (d->cb) + d->cb(o->context, d->userdata); +} + pa_operation* pa_context_drain(pa_context *c, pa_context_notify_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_operation *o; + struct notify_data *d; + + o = pa_operation_new(c, NULL, on_notify, sizeof(struct notify_data)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + + return o; +} + +struct success_data { + pa_context_success_cb_t cb; + void *userdata; + int ret; +}; + +static void on_success(pa_operation *o, void *userdata) +{ + struct success_data *d = userdata; + pa_operation_done(o); + if (d->cb) + d->cb(o->context, d->ret, d->userdata); } pa_operation* pa_context_exit_daemon(pa_context *c, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_operation *o; + struct success_data *d; + + o = pa_operation_new(c, NULL, on_success, sizeof(struct success_data)); + d = o->userdata; + d->ret = PA_ERR_ACCESS; + d->cb = cb; + d->userdata = userdata; + + return o; } pa_operation* pa_context_set_default_sink(pa_context *c, const char *name, pa_context_success_cb_t cb, void *userdata) @@ -385,20 +434,51 @@ pa_operation* pa_context_set_default_source(pa_context *c, const char *name, pa_ int pa_context_is_local(pa_context *c) { - pw_log_warn("Not Implemented"); - return 0; + pa_assert(c); + pa_assert(c->refcount >= 1); + + PA_CHECK_VALIDITY_RETURN_ANY(c, PA_CONTEXT_IS_GOOD(c->state), PA_ERR_BADSTATE, -1); + + return 1; } pa_operation* pa_context_set_name(pa_context *c, const char *name, pa_context_success_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + struct spa_dict dict; + struct spa_dict_item items[1]; + pa_operation *o; + struct success_data *d; + + pa_assert(c); + pa_assert(c->refcount >= 1); + pa_assert(name); + + PA_CHECK_VALIDITY_RETURN_NULL(c, c->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); + + items[0] = SPA_DICT_ITEM_INIT(PA_PROP_APPLICATION_NAME, name); + dict = SPA_DICT_INIT(items, 1); + pw_remote_update_properties(c->remote, &dict); + + o = pa_operation_new(c, NULL, on_success, sizeof(struct success_data)); + d = o->userdata; + d->ret = PA_ERR_ACCESS; + d->cb = cb; + d->userdata = userdata; + + return o; } const char* pa_context_get_server(pa_context *c) { - pw_log_warn("Not Implemented"); - return NULL; + const struct pw_core_info *info; + + pa_assert(c); + pa_assert(c->refcount >= 1); + + info = pw_remote_get_core_info(c->remote); + PA_CHECK_VALIDITY_RETURN_NULL(c, info && info->name, PA_ERR_NOENTITY); + + return info->name; } uint32_t pa_context_get_protocol_version(pa_context *c) @@ -408,6 +488,11 @@ uint32_t pa_context_get_protocol_version(pa_context *c) uint32_t pa_context_get_server_protocol_version(pa_context *c) { + pa_assert(c); + pa_assert(c->refcount >= 1); + + PA_CHECK_VALIDITY_RETURN_ANY(c, PA_CONTEXT_IS_GOOD(c->state), PA_ERR_BADSTATE, PA_INVALID_INDEX); + return PA_PROTOCOL_VERSION; } @@ -430,13 +515,34 @@ uint32_t pa_context_get_index(pa_context *c) pa_time_event* pa_context_rttime_new(pa_context *c, pa_usec_t usec, pa_time_event_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + struct timeval tv; + + pa_assert(c); + pa_assert(c->refcount >= 1); + pa_assert(c->mainloop); + + if (usec == PA_USEC_INVALID) + return c->mainloop->time_new(c->mainloop, NULL, cb, userdata); + + pa_timeval_store(&tv, usec); + + return c->mainloop->time_new(c->mainloop, &tv, cb, userdata); } void pa_context_rttime_restart(pa_context *c, pa_time_event *e, pa_usec_t usec) { - pw_log_warn("Not Implemented"); + struct timeval tv; + + pa_assert(c); + pa_assert(c->refcount >= 1); + pa_assert(c->mainloop); + + if (usec == PA_USEC_INVALID) + c->mainloop->time_restart(e, NULL); + else { + pa_timeval_store(&tv, usec); + c->mainloop->time_restart(e, &tv); + } } size_t pa_context_get_tile_size(pa_context *c, const pa_sample_spec *ss) diff --git a/src/internal.h b/src/internal.h index 05671af75..443ba860d 100644 --- a/src/internal.h +++ b/src/internal.h @@ -146,6 +146,32 @@ struct pa_proplist { pa_proplist* pa_proplist_new_props(struct pw_properties *props); pa_proplist* pa_proplist_new_dict(struct spa_dict *dict); +struct pa_io_event { + struct spa_source *source; + struct pa_mainloop *mainloop; + int fd; + pa_io_event_flags_t events; + pa_io_event_cb_t cb; + void *userdata; + pa_io_event_destroy_cb_t destroy; +}; + +struct pa_time_event { + struct spa_source *source; + struct pa_mainloop *mainloop; + pa_time_event_cb_t cb; + void *userdata; + pa_time_event_destroy_cb_t destroy; +}; + +struct pa_defer_event { + struct spa_source *source; + struct pa_mainloop *mainloop; + pa_defer_event_cb_t cb; + void *userdata; + pa_defer_event_destroy_cb_t destroy; +}; + struct pa_mainloop { struct pw_loop *loop; struct spa_source *event; @@ -248,6 +274,7 @@ struct pa_stream { pa_stream_direction_t direction; pa_stream_state_t state; pa_stream_flags_t flags; + bool disconnecting; pa_sample_spec sample_spec; pa_channel_map channel_map; @@ -300,10 +327,10 @@ struct pa_stream { size_t dequeued_size; struct pw_buffer *buffer; - void *buffer_data; uint32_t buffer_index; - int64_t buffer_size; - int64_t buffer_offset; + void *buffer_data; + uint32_t buffer_size; + uint32_t buffer_offset; }; void pa_stream_set_state(pa_stream *s, pa_stream_state_t st); diff --git a/src/mainloop.c b/src/mainloop.c index da1ae450b..7663dedad 100644 --- a/src/mainloop.c +++ b/src/mainloop.c @@ -32,68 +32,183 @@ static void do_stop(void *data, uint64_t count) this->quit = true; } +static enum spa_io map_flags_to_spa(pa_io_event_flags_t flags) { + return (enum spa_io) + ((flags & PA_IO_EVENT_INPUT ? SPA_IO_IN : 0) | + (flags & PA_IO_EVENT_OUTPUT ? SPA_IO_OUT : 0) | + (flags & PA_IO_EVENT_ERROR ? SPA_IO_ERR : 0) | + (flags & PA_IO_EVENT_HANGUP ? SPA_IO_HUP : 0)); +} + +static pa_io_event_flags_t map_flags_from_spa(enum spa_io flags) { + return (flags & SPA_IO_IN ? PA_IO_EVENT_INPUT : 0) | + (flags & SPA_IO_OUT ? PA_IO_EVENT_OUTPUT : 0) | + (flags & SPA_IO_ERR ? PA_IO_EVENT_ERROR : 0) | + (flags & SPA_IO_HUP ? PA_IO_EVENT_HANGUP : 0); +} + +static void source_io_func(void *data, int fd, enum spa_io mask) +{ + pa_io_event *ev = data; + if (ev->cb) + ev->cb(&ev->mainloop->api, ev, ev->fd, map_flags_from_spa(mask), ev->userdata); +} + static pa_io_event* api_io_new(pa_mainloop_api*a, int fd, pa_io_event_flags_t events, pa_io_event_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_mainloop *mainloop = SPA_CONTAINER_OF(a, pa_mainloop, api); + pa_io_event *ev; + + pa_assert(a); + pa_assert(fd >= 0); + pa_assert(cb); + + ev = calloc(1, sizeof(pa_io_event)); + ev->source = pw_loop_add_io(mainloop->loop, fd, + map_flags_to_spa(events), false, source_io_func, ev); + ev->fd = fd; + ev->events = events; + ev->mainloop = mainloop; + ev->cb = cb; + ev->userdata = userdata; + + return ev; } static void api_io_enable(pa_io_event* e, pa_io_event_flags_t events) { - pw_log_warn("Not Implemented"); + pa_assert(e); + + if (e->events == events) + return; + + e->events = events; + pw_loop_update_io(e->mainloop->loop, e->source, map_flags_to_spa(events)); } static void api_io_free(pa_io_event* e) { - pw_log_warn("Not Implemented"); + pa_assert(e); + pw_loop_destroy_source(e->mainloop->loop, e->source); + if (e->destroy) + e->destroy(&e->mainloop->api, e, e->userdata); + free(e); } static void api_io_set_destroy(pa_io_event *e, pa_io_event_destroy_cb_t cb) { - pw_log_warn("Not Implemented"); + pa_assert(e); + e->destroy = cb; +} + +static void source_timer_func(void *data, uint64_t expirations) +{ + pa_time_event *ev = data; + struct timeval tv; + if (ev->cb) + ev->cb(&ev->mainloop->api, ev, &tv, ev->userdata); } static pa_time_event* api_time_new(pa_mainloop_api*a, const struct timeval *tv, pa_time_event_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_mainloop *mainloop = SPA_CONTAINER_OF(a, pa_mainloop, api); + pa_time_event *ev; + struct timespec ts; + + ev = calloc(1, sizeof(pa_time_event)); + ev->source = pw_loop_add_timer(mainloop->loop, source_timer_func, ev); + ev->mainloop = mainloop; + ev->cb = cb; + ev->userdata = userdata; + + if (tv == NULL) { + ts.tv_sec = 0; + ts.tv_nsec = 1; + } + else { + ts.tv_sec = tv->tv_sec; + ts.tv_nsec = tv->tv_usec * 1000LL; + } + pw_loop_update_timer(mainloop->loop, ev->source, &ts, NULL, true); + + return ev; } static void api_time_restart(pa_time_event* e, const struct timeval *tv) { - pw_log_warn("Not Implemented"); + struct timespec ts; + + pa_assert(e); + + if (tv == NULL) { + ts.tv_sec = 0; + ts.tv_nsec = 1; + } + else { + ts.tv_sec = tv->tv_sec; + ts.tv_nsec = tv->tv_usec * 1000LL; + } + pw_loop_update_timer(e->mainloop->loop, e->source, &ts, NULL, true); } static void api_time_free(pa_time_event* e) { - pw_log_warn("Not Implemented"); + pa_assert(e); + pw_loop_destroy_source(e->mainloop->loop, e->source); + if (e->destroy) + e->destroy(&e->mainloop->api, e, e->userdata); + free(e); } static void api_time_set_destroy(pa_time_event *e, pa_time_event_destroy_cb_t cb) { - pw_log_warn("Not Implemented"); + pa_assert(e); + e->destroy = cb; } +static void source_idle_func(void *data) +{ + pa_defer_event *ev = data; + if (ev->cb) + ev->cb(&ev->mainloop->api, ev, ev->userdata); +} static pa_defer_event* api_defer_new(pa_mainloop_api*a, pa_defer_event_cb_t cb, void *userdata) { - pw_log_warn("Not Implemented"); - return NULL; + pa_mainloop *mainloop = SPA_CONTAINER_OF(a, pa_mainloop, api); + pa_defer_event *ev; + + pa_assert(a); + pa_assert(cb); + + ev = calloc(1, sizeof(pa_defer_event)); + ev->source = pw_loop_add_idle(mainloop->loop, true, source_idle_func, ev); + ev->mainloop = mainloop; + ev->cb = cb; + ev->userdata = userdata; + + return ev; } static void api_defer_enable(pa_defer_event* e, int b) { - pw_log_warn("Not Implemented"); + pa_assert(e); + pw_loop_enable_idle(e->mainloop->loop, e->source, b ? true : false); } static void api_defer_free(pa_defer_event* e) { - pw_log_warn("Not Implemented"); + pa_assert(e); + pw_loop_destroy_source(e->mainloop->loop, e->source); + if (e->destroy) + e->destroy(&e->mainloop->api, e, e->userdata); + free(e); } static void api_defer_set_destroy(pa_defer_event *e, pa_defer_event_destroy_cb_t cb) { - pw_log_warn("Not Implemented"); + pa_assert(e); + e->destroy = cb; } static void api_quit(pa_mainloop_api*a, int retval) @@ -122,7 +237,6 @@ static const pa_mainloop_api api = .defer_set_destroy = api_defer_set_destroy, .quit = api_quit, - }; pa_mainloop *pa_mainloop_new(void) diff --git a/src/stream.c b/src/stream.c index 9820e2528..036cb5da1 100644 --- a/src/stream.c +++ b/src/stream.c @@ -39,7 +39,8 @@ static void stream_state_changed(void *data, enum pw_stream_state old, pa_stream_set_state(s, PA_STREAM_FAILED); break; case PW_STREAM_STATE_UNCONNECTED: - pa_stream_set_state(s, PA_STREAM_UNCONNECTED); + if (!s->disconnecting) + pa_stream_set_state(s, PA_STREAM_UNCONNECTED); break; case PW_STREAM_STATE_CONNECTING: pa_stream_set_state(s, PA_STREAM_CREATING); @@ -71,26 +72,28 @@ static void stream_format_changed(void *data, const struct spa_pod *format) static void stream_process(void *data) { pa_stream *s = data; + struct pw_buffer *buf; + uint32_t index; + + s->timing_info_valid = true; + + buf = pw_stream_dequeue_buffer(s->stream); + if (buf == NULL) + return; + + spa_ringbuffer_get_write_index(&s->dequeued_ring, &index); + s->dequeued[index & MASK_BUFFERS] = buf; + spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); if (s->direction == PA_STREAM_PLAYBACK) { - struct pw_buffer *buf; - uint32_t index; - - buf = pw_stream_dequeue_buffer(s->stream); - if (buf != NULL) { - spa_ringbuffer_get_write_index(&s->dequeued_ring, &index); - s->dequeued[index & MASK_BUFFERS] = buf; - spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); - - s->dequeued_size += buf->buffer->datas[0].maxsize; - } - + s->dequeued_size += buf->buffer->datas[0].maxsize; if (s->write_callback) s->write_callback(s, s->dequeued_size, s->write_userdata); } else { + s->dequeued_size += buf->buffer->datas[0].chunk->size; if (s->read_callback) - s->read_callback(s, 4096, s->read_userdata); + s->read_callback(s, s->dequeued_size, s->read_userdata); } } @@ -122,7 +125,11 @@ pa_stream* stream_new(pa_context *c, const char *name, if (s == NULL) return NULL; - s->stream = pw_stream_new(c->remote, name, NULL); + + s->stream = pw_stream_new(c->remote, name, + pw_properties_new( + "client.api", "pulseaudio", + NULL)); s->refcount = 1; s->context = c; init_type(&s->type, pw_core_get_type(c->core)->map); @@ -178,8 +185,6 @@ pa_stream* stream_new(pa_context *c, const char *name, s->device_index = PA_INVALID_INDEX; - s->timing_info_valid = false; - spa_ringbuffer_init(&s->dequeued_ring); spa_list_append(&c->streams, &s->link); @@ -341,6 +346,97 @@ int pa_stream_is_corked(pa_stream *s) return s->corked; } +static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) { + const char *e; + + pa_assert(s); + pa_assert(attr); + + if ((e = getenv("PULSE_LATENCY_MSEC"))) { + uint32_t ms; + pa_sample_spec ss; + + pa_sample_spec_init(&ss); + + if (pa_sample_spec_valid(&s->sample_spec)) + ss = s->sample_spec; + else if (s->n_formats == 1) + pa_format_info_to_sample_spec(s->req_formats[0], &ss, NULL); + + if ((ms = atoi(e)) < 0 || ms <= 0) { + pa_log_debug("Failed to parse $PULSE_LATENCY_MSEC: %s", e); + } + else if (!pa_sample_spec_valid(&s->sample_spec)) { + pa_log_debug("Ignoring $PULSE_LATENCY_MSEC: %s (invalid sample spec)", e); + } + else { + attr->maxlength = (uint32_t) -1; + attr->tlength = pa_usec_to_bytes(ms * PA_USEC_PER_MSEC, &ss); + attr->minreq = (uint32_t) -1; + attr->prebuf = (uint32_t) -1; + attr->fragsize = attr->tlength; + + if (flags) + *flags |= PA_STREAM_ADJUST_LATENCY; + } + } + + if (attr->maxlength == (uint32_t) -1) + attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */ + + if (attr->tlength == (uint32_t) -1) + attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */ + + if (attr->minreq == (uint32_t) -1) + attr->minreq = (attr->tlength)/5; /* Ask for more data when there are only 200ms left in the playback buffer */ + + if (attr->prebuf == (uint32_t) -1) + attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */ + + if (attr->fragsize == (uint32_t) -1) + attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */ +} + +static const uint32_t audio_formats[] = { + [PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8), + [PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN), + [PA_SAMPLE_ULAW] = offsetof(struct spa_type_audio_format, UNKNOWN), + [PA_SAMPLE_S16NE] = offsetof(struct spa_type_audio_format, S16), + [PA_SAMPLE_S16RE] = offsetof(struct spa_type_audio_format, S16_OE), + [PA_SAMPLE_FLOAT32NE] = offsetof(struct spa_type_audio_format, F32), + [PA_SAMPLE_FLOAT32RE] = offsetof(struct spa_type_audio_format, F32_OE), + [PA_SAMPLE_S32NE] = offsetof(struct spa_type_audio_format, S32), + [PA_SAMPLE_S32RE] = offsetof(struct spa_type_audio_format, S32_OE), + [PA_SAMPLE_S24NE] = offsetof(struct spa_type_audio_format, S24), + [PA_SAMPLE_S24RE] = offsetof(struct spa_type_audio_format, S24_OE), + [PA_SAMPLE_S24_32NE] = offsetof(struct spa_type_audio_format, S24_32), + [PA_SAMPLE_S24_32RE] = offsetof(struct spa_type_audio_format, S24_32_OE), +}; + +static inline uint32_t get_format(pa_stream *s, pa_sample_format_t format) +{ + if (format < 0 || format >= SPA_N_ELEMENTS(audio_formats)) + return s->type.audio_format.UNKNOWN; + return *SPA_MEMBER(&s->type.audio_format, audio_formats[format], uint32_t); +} + +static const struct spa_pod *get_param(pa_stream *s, pa_sample_spec *ss, pa_channel_map *map, + struct spa_pod_builder *b) +{ + const struct spa_pod *param; + struct pw_type *t = pw_core_get_type(s->context->core); + + param = spa_pod_builder_object(b, + t->param.idEnumFormat, t->spa_format, + "I", s->type.media_type.audio, + "I", s->type.media_subtype.raw, + ":", s->type.format_audio.format, "I", get_format(s, ss->format), + ":", s->type.format_audio.layout, "i", SPA_AUDIO_LAYOUT_INTERLEAVED, + ":", s->type.format_audio.channels, "i", ss->channels, + ":", s->type.format_audio.rate, "i", ss->rate); + return param; +} + static int create_stream(pa_stream_direction_t direction, pa_stream *s, const char *dev, @@ -351,49 +447,68 @@ static int create_stream(pa_stream_direction_t direction, { int res; enum pw_stream_flags fl; - const struct spa_pod *params[1]; - uint8_t buffer[1024]; + const struct spa_pod *params[16]; + uint32_t n_params = 0; + uint8_t buffer[4096]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - struct pw_type *t; + struct pw_properties *props; spa_assert(s); spa_assert(s->refcount >= 1); s->direction = direction; - - t = pw_core_get_type(s->context->core); + s->timing_info_valid = false; + s->disconnecting = false; pa_stream_set_state(s, PA_STREAM_CREATING); fl = PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_MAP_BUFFERS | - PW_STREAM_FLAG_RT_PROCESS; + PW_STREAM_FLAG_MAP_BUFFERS; + s->corked = SPA_FLAG_CHECK(flags, PA_STREAM_START_CORKED); + + if (s->corked) + fl |= PW_STREAM_FLAG_INACTIVE; if (flags & PA_STREAM_PASSTHROUGH) fl |= PW_STREAM_FLAG_EXCLUSIVE; - params[0] = spa_pod_builder_object(&b, - t->param.idEnumFormat, t->spa_format, - "I", s->type.media_type.audio, - "I", s->type.media_subtype.raw, - ":", s->type.format_audio.format, "I", s->type.audio_format.S16, - ":", s->type.format_audio.layout, "i", SPA_AUDIO_LAYOUT_INTERLEAVED, - ":", s->type.format_audio.channels, "i", 2, - ":", s->type.format_audio.rate, "i", 44100); + if (pa_sample_spec_valid(&s->sample_spec)) { + params[n_params++] = get_param(s, &s->sample_spec, &s->channel_map, &b); + } + else { + pa_sample_spec ss; + pa_channel_map map; + int i; + + for (i = 0; i < s->n_formats; i++) { + if (pa_format_info_to_sample_spec(s->req_formats[i], &ss, NULL) < 0) { + char buf[4096]; + pw_log_warn("can't convert format %s", + pa_format_info_snprint(buf,4096,s->req_formats[i])); + continue; + } + + params[n_params++] = get_param(s, &ss, &map, &b); + } + } if (attr) s->buffer_attr = *attr; + patch_buffer_attr(s, &s->buffer_attr, &flags); if (dev == NULL) dev = getenv("PIPEWIRE_NODE"); + props = (struct pw_properties *) pw_stream_get_properties(s->stream); + pw_properties_setf(props, "node.latency", "%u", s->buffer_attr.minreq); + res = pw_stream_connect(s->stream, direction == PA_STREAM_PLAYBACK ? PW_DIRECTION_OUTPUT : PW_DIRECTION_INPUT, dev, fl, - params, 1); + params, n_params); return res; } @@ -420,7 +535,7 @@ int pa_stream_connect_record( static void on_disconnected(pa_operation *o, void *userdata) { - pa_stream_set_state(o->stream, PA_STREAM_TERMINATED); + pa_stream_set_state(o->stream, PA_STREAM_TERMINATED); } int pa_stream_disconnect(pa_stream *s) @@ -432,9 +547,49 @@ int pa_stream_disconnect(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->context->state == PA_CONTEXT_READY, PA_ERR_BADSTATE); + s->disconnecting = true; pw_stream_disconnect(s->stream); o = pa_operation_new(s->context, s, on_disconnected, 0); pa_operation_unref(o); + + return 0; +} + +int peek_buffer(pa_stream *s) +{ + int32_t avail; + uint32_t index; + + if (s->buffer != NULL) + return 0; + + if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) <= 0) + return -EPIPE; + + s->buffer = s->dequeued[index & MASK_BUFFERS]; + s->buffer_index = index; + s->buffer_data = s->buffer->buffer->datas[0].data; + if (s->direction == PA_STREAM_RECORD) { + s->buffer_size = s->buffer->buffer->datas[0].chunk->size; + s->buffer_offset = s->buffer->buffer->datas[0].chunk->offset; + } + else { + s->buffer_size = s->buffer->buffer->datas[0].maxsize; + s->buffer_offset = 0; + } + return 0; +} + +int queue_buffer(pa_stream *s) +{ + if (s->buffer == NULL) + return 0; + + s->dequeued_size -= s->buffer_size; + spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); + + pw_stream_queue_buffer(s->stream, s->buffer); + s->buffer = NULL; return 0; } @@ -443,8 +598,6 @@ int pa_stream_begin_write( void **data, size_t *nbytes) { - int32_t avail; - uint32_t index; spa_assert(s); spa_assert(s->refcount >= 1); @@ -455,19 +608,11 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); - if (s->buffer == NULL) { - if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) <= 0) { - *data = NULL; - *nbytes = 0; - return 0; - } - s->buffer = s->dequeued[index & MASK_BUFFERS]; - s->buffer_index = index; - s->buffer_data = s->buffer->buffer->datas[0].data; - s->buffer_size = s->buffer->buffer->datas[0].maxsize; - s->buffer_offset = 0; + if (peek_buffer(s) < 0) { + *data = NULL; + *nbytes = 0; + return 0; } - *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); *nbytes = s->buffer_size - s->buffer_offset; @@ -516,23 +661,26 @@ int pa_stream_write_ext_free(pa_stream *s, PA_CHECK_VALIDITY(s->context, seek <= PA_SEEK_RELATIVE_END, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, + !s->buffer || + ((data >= s->buffer_data) && + ((const char*) data + nbytes <= (const char*) s->buffer_data + s->buffer_size)), + PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); + PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID); if (s->buffer == NULL) { pw_log_warn("Not Implemented"); + if (free_cb) + free_cb(free_cb_data); + return PA_ERR_INVALID; + } else { + s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data; + s->buffer->buffer->datas[0].chunk->size = nbytes; + queue_buffer(s); } - - s->buffer->buffer->datas[0].chunk->offset = 0; - s->buffer->buffer->datas[0].chunk->size = nbytes; - - s->dequeued_size -= s->buffer_size; - spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); - - pw_stream_queue_buffer(s->stream, s->buffer); - s->buffer = NULL; - return 0; } @@ -548,7 +696,14 @@ int pa_stream_peek(pa_stream *s, PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); - pw_log_warn("Not Implemented"); + if (peek_buffer(s) < 0) { + *data = NULL; + *nbytes = 0; + return 0; + } + *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); + *nbytes = s->buffer_size; + return 0; } @@ -559,7 +714,10 @@ int pa_stream_drop(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); - pw_log_warn("Not Implemented"); + PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE); + + queue_buffer(s); + return 0; } @@ -585,32 +743,59 @@ size_t pa_stream_readable_size(pa_stream *s) PA_ERR_BADSTATE, (size_t) -1); PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - pw_log_warn("Not Implemented"); - return 0; + + return s->dequeued_size; +} + +struct success_ack { + pa_stream_success_cb_t cb; + void *userdata; +}; + +static void on_success(pa_operation *o, void *userdata) +{ + struct success_ack *d = userdata; + pa_operation_done(o); + if (d->cb) + d->cb(o->stream, PA_OK, d->userdata); } pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { + pa_operation *o; + struct success_ack *d; + spa_assert(s); spa_assert(s->refcount >= 1); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction == PA_STREAM_PLAYBACK, PA_ERR_BADSTATE); - pw_log_warn("Not Implemented"); - return NULL; + o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + + return o; } pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { + pa_operation *o; + struct success_ack *d; + spa_assert(s); spa_assert(s->refcount >= 1); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - pw_log_warn("Not Implemented"); - return NULL; + o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + + return o; } void pa_stream_set_state_callback(pa_stream *s, pa_stream_notify_cb_t cb, void *userdata) @@ -751,18 +936,6 @@ void pa_stream_set_buffer_attr_callback(pa_stream *s, pa_stream_notify_cb_t cb, s->buffer_attr_userdata = userdata; } -struct success_ack { - pa_stream_success_cb_t cb; - void *userdata; -}; - -static void on_success(pa_operation *o, void *userdata) -{ - struct success_ack *d = userdata; - if (d->cb) - d->cb(o->stream, PA_OK, d->userdata); -} - pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, void *userdata) { pa_operation *o; @@ -870,6 +1043,9 @@ pa_operation* pa_stream_set_name(pa_stream *s, const char *name, pa_stream_succe int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { + struct pw_time t; + pa_usec_t res; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -877,7 +1053,12 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); - pw_log_warn("Not Implemented"); + pw_stream_get_time(s->stream, &t); + + res = (t.ticks * t.rate.num * PA_USEC_PER_SEC) / t.rate.denom; + + if (r_usec) + *r_usec = res; return 0; } @@ -893,6 +1074,10 @@ int pa_stream_get_latency(pa_stream *s, pa_usec_t *r_usec, int *negative) PA_CHECK_VALIDITY(s->context, s->timing_info_valid, PA_ERR_NODATA); pw_log_warn("Not Implemented"); + if (r_usec) + *r_usec = 0; + if (negative) + *negative = 0; return 0; } @@ -946,6 +1131,9 @@ const pa_buffer_attr* pa_stream_get_buffer_attr(pa_stream *s) pa_operation *pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr, pa_stream_success_cb_t cb, void *userdata) { + pa_operation *o; + struct success_ack *d; + spa_assert(s); spa_assert(s->refcount >= 1); spa_assert(attr); @@ -953,11 +1141,19 @@ pa_operation *pa_stream_set_buffer_attr(pa_stream *s, const pa_buffer_attr *attr PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - return NULL; + pw_log_warn("Not Implemented"); + o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + return o; } pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_stream_success_cb_t cb, void *userdata) { + pa_operation *o; + struct success_ack *d; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -966,11 +1162,19 @@ pa_operation *pa_stream_update_sample_rate(pa_stream *s, uint32_t rate, pa_strea PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->flags & PA_STREAM_VARIABLE_RATE, PA_ERR_BADSTATE); - return NULL; + pw_log_warn("Not Implemented"); + o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + return o; } pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_proplist *p, pa_stream_success_cb_t cb, void *userdata) { + pa_operation *o; + struct success_ack *d; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -979,11 +1183,19 @@ pa_operation *pa_stream_proplist_update(pa_stream *s, pa_update_mode_t mode, pa_ PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - return NULL; + pw_log_warn("Not Implemented"); + o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + return o; } pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], pa_stream_success_cb_t cb, void *userdata) { + pa_operation *o; + struct success_ack *d; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -991,7 +1203,12 @@ pa_operation *pa_stream_proplist_remove(pa_stream *s, const char *const keys[], PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - return NULL; + pw_log_warn("Not Implemented"); + o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + d = o->userdata; + d->cb = cb; + d->userdata = userdata; + return o; } int pa_stream_set_monitor_stream(pa_stream *s, uint32_t sink_input_idx)