diff --git a/src/context.c b/src/context.c index 0a4b3a812..3087fa082 100644 --- a/src/context.c +++ b/src/context.c @@ -33,6 +33,7 @@ int pa_context_set_error(pa_context *c, int error) { pa_assert(error >= 0); pa_assert(error < PA_ERR_MAX); + pw_log_error("context %p: error %d", c, error); if (c) c->error = error; return error; diff --git a/src/internal.h b/src/internal.h index 443ba860d..ed702f946 100644 --- a/src/internal.h +++ b/src/internal.h @@ -108,19 +108,24 @@ static inline const char *pa_strnull(const char *x) { int pa_context_set_error(pa_context *c, int error); -#define PA_CHECK_VALIDITY(context, expression, error) \ - do { \ - if (!(expression)) \ - return -pa_context_set_error((context), (error)); \ - } while(false) +#define PA_CHECK_VALIDITY(context, expression, error) \ +do { \ + if (!(expression)) { \ + fprintf(stderr, "'%s' failed at %s:%u %s()", \ + #expression , __FILE__, __LINE__, __func__); \ + return -pa_context_set_error((context), (error)); \ + } \ +} while(false) -#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) \ - do { \ - if (!(expression)) { \ - pa_context_set_error((context), (error)); \ - return value; \ - } \ - } while(false) +#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) \ +do { \ + if (!(expression)) { \ + fprintf(stderr, "'%s' failed at %s:%u %s()", \ + #expression , __FILE__, __LINE__, __func__); \ + pa_context_set_error((context), (error)); \ + return value; \ + } \ +} while(false) #define PA_CHECK_VALIDITY_RETURN_NULL(context, expression, error) \ PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, NULL) diff --git a/src/mainloop.c b/src/mainloop.c index 7663dedad..578b94d6b 100644 --- a/src/mainloop.c +++ b/src/mainloop.c @@ -129,6 +129,7 @@ static pa_time_event* api_time_new(pa_mainloop_api*a, const struct timeval *tv, ts.tv_sec = tv->tv_sec; ts.tv_nsec = tv->tv_usec * 1000LL; } + pw_log_debug("new timer %p %ld %ld", ev, ts.tv_sec, ts.tv_nsec); pw_loop_update_timer(mainloop->loop, ev->source, &ts, NULL, true); return ev; diff --git a/src/rtclock.c b/src/rtclock.c index 96ef0fedf..a0b97c831 100644 --- a/src/rtclock.c +++ b/src/rtclock.c @@ -21,14 +21,17 @@ #include +#include + #include pa_usec_t pa_rtclock_now(void) { struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return ts.tv_sec * SPA_USEC_PER_SEC + - ts.tv_nsec * SPA_NSEC_PER_USEC; + pa_usec_t res; + clock_gettime(CLOCK_MONOTONIC, &ts); + res = (ts.tv_sec * SPA_USEC_PER_SEC) + (ts.tv_nsec / SPA_NSEC_PER_USEC); + return res; } diff --git a/src/stream.c b/src/stream.c index 036cb5da1..e6421420f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -19,6 +19,7 @@ #include #include +#include #include @@ -28,6 +29,65 @@ #include #include "internal.h" +static const uint32_t audio_formats[] = { + [PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8), + [PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN), + [PA_SAMPLE_ULAW] = offsetof(struct spa_type_audio_format, UNKNOWN), + [PA_SAMPLE_S16NE] = offsetof(struct spa_type_audio_format, S16), + [PA_SAMPLE_S16RE] = offsetof(struct spa_type_audio_format, S16_OE), + [PA_SAMPLE_FLOAT32NE] = offsetof(struct spa_type_audio_format, F32), + [PA_SAMPLE_FLOAT32RE] = offsetof(struct spa_type_audio_format, F32_OE), + [PA_SAMPLE_S32NE] = offsetof(struct spa_type_audio_format, S32), + [PA_SAMPLE_S32RE] = offsetof(struct spa_type_audio_format, S32_OE), + [PA_SAMPLE_S24NE] = offsetof(struct spa_type_audio_format, S24), + [PA_SAMPLE_S24RE] = offsetof(struct spa_type_audio_format, S24_OE), + [PA_SAMPLE_S24_32NE] = offsetof(struct spa_type_audio_format, S24_32), + [PA_SAMPLE_S24_32RE] = offsetof(struct spa_type_audio_format, S24_32_OE), +}; + +static inline uint32_t format_pa2id(pa_stream *s, pa_sample_format_t format) +{ + if (format < 0 || format >= SPA_N_ELEMENTS(audio_formats)) + return s->type.audio_format.UNKNOWN; + return *SPA_MEMBER(&s->type.audio_format, audio_formats[format], uint32_t); +} + +static inline pa_sample_format_t format_id2pa(pa_stream *s, uint32_t id) +{ + int i; + for (i = 0; i < SPA_N_ELEMENTS(audio_formats); i++) { + if (id == *SPA_MEMBER(&s->type.audio_format, audio_formats[i], uint32_t)) + return i; + } + return PA_SAMPLE_INVALID; +} + +static int dequeue_buffer(pa_stream *s) +{ + struct pw_buffer *buf; + uint32_t index; + + buf = pw_stream_dequeue_buffer(s->stream); + if (buf == NULL) + return -EPIPE; + + spa_ringbuffer_get_write_index(&s->dequeued_ring, &index); + s->dequeued[index & MASK_BUFFERS] = buf; + spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); + + if (s->direction == PA_STREAM_PLAYBACK) + s->dequeued_size += buf->buffer->datas[0].maxsize; + else + s->dequeued_size += buf->buffer->datas[0].chunk->size; + + return 0; +} + +static void configure_buffers(pa_stream *s) +{ + s->buffer_attr.maxlength = 65536; + s->buffer_attr.prebuf = s->buffer_attr.minreq; +} static void stream_state_changed(void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) @@ -49,6 +109,7 @@ static void stream_state_changed(void *data, enum pw_stream_state old, case PW_STREAM_STATE_READY: break; case PW_STREAM_STATE_PAUSED: + configure_buffers(s); pa_stream_set_state(s, PA_STREAM_READY); break; case PW_STREAM_STATE_STREAMING: @@ -56,42 +117,98 @@ static void stream_state_changed(void *data, enum pw_stream_state old, } } +static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *attr, struct spa_pod_builder *b) +{ + const struct spa_pod *param; + struct pw_type *t = pw_core_get_type(s->context->core); + int32_t blocks, buffers, size, maxsize, stride; + + blocks = 1; + stride = pa_frame_size(&s->sample_spec); + + if (attr->tlength == -1) + maxsize = 1024; + else + maxsize = (attr->tlength / stride); + + if (attr->minreq == -1) + size = SPA_MIN(1024, maxsize); + else + size = SPA_MIN(attr->minreq / stride, maxsize); + + if (attr->maxlength == -1) + buffers = 3; + else + buffers = SPA_CLAMP(attr->maxlength / (maxsize * stride), 3, 64); + + param = spa_pod_builder_object(b, + t->param.idBuffers, t->param_buffers.Buffers, + ":", t->param_buffers.buffers, "iru", buffers, + SPA_POD_PROP_MIN_MAX(3, 64), + ":", t->param_buffers.blocks, "i", blocks, + ":", t->param_buffers.size, "iru", size * stride, + SPA_POD_PROP_MIN_MAX(size * stride, maxsize * stride), + ":", t->param_buffers.stride, "i", stride, + ":", t->param_buffers.align, "i", 16); + return param; +} + static void stream_format_changed(void *data, const struct spa_pod *format) { pa_stream *s = data; + const struct spa_pod *params[4]; + uint32_t n_params = 0; + uint8_t buffer[4096]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + struct spa_audio_info info = { 0 }; + int res; - s->sample_spec.format = PA_SAMPLE_S16NE, - s->sample_spec.rate = 44100; - s->sample_spec.channels = 2; + spa_pod_object_parse(format, + "I", &info.media_type, + "I", &info.media_subtype); + + if (info.media_type != s->type.media_type.audio || + info.media_subtype != s->type.media_subtype.raw || + spa_format_audio_raw_parse(format, &info.info.raw, &s->type.format_audio) < 0 || + info.info.raw.layout != SPA_AUDIO_LAYOUT_INTERLEAVED) { + res = -EINVAL; + goto done; + } + + s->sample_spec.format = format_id2pa(s, info.info.raw.format); + if (s->sample_spec.format == PA_SAMPLE_INVALID) { + res = -EINVAL; + goto done; + } + s->sample_spec.rate = info.info.raw.rate; + s->sample_spec.channels = info.info.raw.channels; if (s->format) pa_format_info_free(s->format); s->format = pa_format_info_from_sample_spec(&s->sample_spec, NULL); + + params[n_params++] = get_buffers_param(s, &s->buffer_attr, &b); + + res = 0; + + done: + pw_stream_finish_format(s->stream, res, params, n_params); } static void stream_process(void *data) { pa_stream *s = data; - struct pw_buffer *buf; - uint32_t index; s->timing_info_valid = true; - buf = pw_stream_dequeue_buffer(s->stream); - if (buf == NULL) + if (dequeue_buffer(s) < 0 && s->dequeued_size == 0) return; - spa_ringbuffer_get_write_index(&s->dequeued_ring, &index); - s->dequeued[index & MASK_BUFFERS] = buf; - spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); - if (s->direction == PA_STREAM_PLAYBACK) { - s->dequeued_size += buf->buffer->datas[0].maxsize; if (s->write_callback) s->write_callback(s, s->dequeued_size, s->write_userdata); } else { - s->dequeued_size += buf->buffer->datas[0].chunk->size; if (s->read_callback) s->read_callback(s, s->dequeued_size, s->read_userdata); } @@ -185,6 +302,9 @@ pa_stream* stream_new(pa_context *c, const char *name, s->device_index = PA_INVALID_INDEX; + s->device_index = 0; + s->device_name = strdup("unknown"); + spa_ringbuffer_init(&s->dequeued_ring); spa_list_append(&c->streams, &s->link); @@ -343,6 +463,7 @@ int pa_stream_is_corked(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); + pw_log_debug("stream %p: corked %d", s, s->corked); return s->corked; } @@ -395,30 +516,14 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag if (attr->fragsize == (uint32_t) -1) attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */ + + pw_log_info("stream %p: maxlength: %u", s, attr->maxlength); + pw_log_info("stream %p: tlength: %u", s, attr->tlength); + pw_log_info("stream %p: minreq: %u", s, attr->minreq); + pw_log_info("stream %p: prebuf: %u", s, attr->prebuf); + pw_log_info("stream %p: fragsize: %u", s, attr->fragsize); } -static const uint32_t audio_formats[] = { - [PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8), - [PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN), - [PA_SAMPLE_ULAW] = offsetof(struct spa_type_audio_format, UNKNOWN), - [PA_SAMPLE_S16NE] = offsetof(struct spa_type_audio_format, S16), - [PA_SAMPLE_S16RE] = offsetof(struct spa_type_audio_format, S16_OE), - [PA_SAMPLE_FLOAT32NE] = offsetof(struct spa_type_audio_format, F32), - [PA_SAMPLE_FLOAT32RE] = offsetof(struct spa_type_audio_format, F32_OE), - [PA_SAMPLE_S32NE] = offsetof(struct spa_type_audio_format, S32), - [PA_SAMPLE_S32RE] = offsetof(struct spa_type_audio_format, S32_OE), - [PA_SAMPLE_S24NE] = offsetof(struct spa_type_audio_format, S24), - [PA_SAMPLE_S24RE] = offsetof(struct spa_type_audio_format, S24_OE), - [PA_SAMPLE_S24_32NE] = offsetof(struct spa_type_audio_format, S24_32), - [PA_SAMPLE_S24_32RE] = offsetof(struct spa_type_audio_format, S24_32_OE), -}; - -static inline uint32_t get_format(pa_stream *s, pa_sample_format_t format) -{ - if (format < 0 || format >= SPA_N_ELEMENTS(audio_formats)) - return s->type.audio_format.UNKNOWN; - return *SPA_MEMBER(&s->type.audio_format, audio_formats[format], uint32_t); -} static const struct spa_pod *get_param(pa_stream *s, pa_sample_spec *ss, pa_channel_map *map, struct spa_pod_builder *b) @@ -430,7 +535,7 @@ static const struct spa_pod *get_param(pa_stream *s, pa_sample_spec *ss, pa_chan t->param.idEnumFormat, t->spa_format, "I", s->type.media_type.audio, "I", s->type.media_subtype.raw, - ":", s->type.format_audio.format, "I", get_format(s, ss->format), + ":", s->type.format_audio.format, "I", format_pa2id(s, ss->format), ":", s->type.format_audio.layout, "i", SPA_AUDIO_LAYOUT_INTERLEAVED, ":", s->type.format_audio.channels, "i", ss->channels, ":", s->type.format_audio.rate, "i", ss->rate); @@ -481,7 +586,7 @@ static int create_stream(pa_stream_direction_t direction, int i; for (i = 0; i < s->n_formats; i++) { - if (pa_format_info_to_sample_spec(s->req_formats[i], &ss, NULL) < 0) { + if (pa_format_info_to_sample_spec(s->req_formats[i], &ss, &map) < 0) { char buf[4096]; pw_log_warn("can't convert format %s", pa_format_info_snprint(buf,4096,s->req_formats[i])); @@ -500,7 +605,7 @@ static int create_stream(pa_stream_direction_t direction, dev = getenv("PIPEWIRE_NODE"); props = (struct pw_properties *) pw_stream_get_properties(s->stream); - pw_properties_setf(props, "node.latency", "%u", s->buffer_attr.minreq); + pw_properties_setf(props, "node.latency", "%u/44100", s->buffer_attr.minreq); res = pw_stream_connect(s->stream, direction == PA_STREAM_PLAYBACK ? @@ -563,6 +668,8 @@ int peek_buffer(pa_stream *s) if (s->buffer != NULL) return 0; + dequeue_buffer(s); + if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) <= 0) return -EPIPE; @@ -585,9 +692,13 @@ int queue_buffer(pa_stream *s) if (s->buffer == NULL) return 0; - s->dequeued_size -= s->buffer_size; spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); + if (s->direction == PA_STREAM_PLAYBACK) + s->dequeued_size -= s->buffer->buffer->datas[0].maxsize; + else + s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size; + pw_stream_queue_buffer(s->stream, s->buffer); s->buffer = NULL; return 0; @@ -598,6 +709,7 @@ int pa_stream_begin_write( void **data, size_t *nbytes) { + int res; spa_assert(s); spa_assert(s->refcount >= 1); @@ -608,7 +720,8 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); - if (peek_buffer(s) < 0) { + if ((res = peek_buffer(s)) < 0) { + pw_log_warn("stream %p: no buffer", s); *data = NULL; *nbytes = 0; return 0; @@ -651,6 +764,8 @@ int pa_stream_write_ext_free(pa_stream *s, int64_t offset, pa_seek_mode_t seek) { + int res; + spa_assert(s); spa_assert(s->refcount >= 1); spa_assert(data); @@ -671,16 +786,39 @@ int pa_stream_write_ext_free(pa_stream *s, PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID); if (s->buffer == NULL) { - pw_log_warn("Not Implemented"); + void *dst; + size_t dlen; + + if ((res = pa_stream_begin_write(s, &dst, &dlen)) < 0) + return res; + + if (dst == NULL || dlen == 0) + return 0; + + nbytes = SPA_MIN(nbytes, dlen); + memcpy(dst, data, nbytes); + data = dst; + if (free_cb) free_cb(free_cb_data); - - return PA_ERR_INVALID; - } else { - s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data; - s->buffer->buffer->datas[0].chunk->size = nbytes; - queue_buffer(s); } + + s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data; + s->buffer->buffer->datas[0].chunk->size = nbytes; + + /* Update the write index in the already available latency data */ + if (s->timing_info_valid) { + if (seek == PA_SEEK_ABSOLUTE) { + s->timing_info.write_index_corrupt = false; + s->timing_info.write_index = offset + (int64_t) nbytes; + } else if (seek == PA_SEEK_RELATIVE) { + if (!s->timing_info.write_index_corrupt) + s->timing_info.write_index += offset + (int64_t) nbytes; + } else + s->timing_info.write_index_corrupt = true; + } + queue_buffer(s); + return 0; } @@ -699,10 +837,12 @@ int pa_stream_peek(pa_stream *s, if (peek_buffer(s) < 0) { *data = NULL; *nbytes = 0; + pw_log_debug("stream %p: no buffer", s); return 0; } *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); *nbytes = s->buffer_size; + pw_log_debug("stream %p: %p %zd", s, *data, *nbytes); return 0; } @@ -716,6 +856,7 @@ int pa_stream_drop(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE); + pw_log_debug("stream %p", s); queue_buffer(s); return 0; @@ -755,9 +896,10 @@ struct success_ack { static void on_success(pa_operation *o, void *userdata) { struct success_ack *d = userdata; + pa_stream *s = o->stream; pa_operation_done(o); if (d->cb) - d->cb(o->stream, PA_OK, d->userdata); + d->cb(s, PA_OK, d->userdata); } pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) @@ -779,6 +921,17 @@ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *use return o; } +static void on_timing_success(pa_operation *o, void *userdata) +{ + struct success_ack *d = userdata; + pa_stream *s = o->stream; + pa_operation_done(o); + s->timing_info_valid = true; + + if (d->cb) + d->cb(s, s->timing_info_valid, d->userdata); +} + pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata) { pa_operation *o; @@ -790,7 +943,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); + o = pa_operation_new(s->context, s, on_timing_success, sizeof(struct success_ack)); d = o->userdata; d->cb = cb; d->userdata = userdata; @@ -1045,6 +1198,8 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { struct pw_time t; pa_usec_t res; + struct timespec ts; + uint64_t now, delay; spa_assert(s); spa_assert(s->refcount >= 1); @@ -1055,11 +1210,20 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) pw_stream_get_time(s->stream, &t); - res = (t.ticks * t.rate.num * PA_USEC_PER_SEC) / t.rate.denom; + clock_gettime(CLOCK_MONOTONIC, &ts); + now = ts.tv_sec * SPA_NSEC_PER_SEC + ts.tv_nsec; + delay = (now - t.now) / PA_NSEC_PER_USEC; + + if (t.rate.denom != 0) + res = delay + ((t.ticks * t.rate.num * PA_USEC_PER_SEC) / t.rate.denom); + else + res = 0; if (r_usec) *r_usec = res; + pw_log_debug("stream %p: %ld %ld %ld %ld %d/%d %ld", s, now, t.now, delay, t.ticks, t.rate.num, t.rate.denom, res); + return 0; } @@ -1091,6 +1255,8 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s) PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE); PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA); + pw_log_warn("Not Implemented"); + return &s->timing_info; }