pulse: various fixes and improvements

This commit is contained in:
Wim Taymans 2018-06-22 17:41:12 +02:00
parent 2254a124af
commit a30722c442
5 changed files with 241 additions and 65 deletions

View file

@ -33,6 +33,7 @@
int pa_context_set_error(pa_context *c, int error) {
pa_assert(error >= 0);
pa_assert(error < PA_ERR_MAX);
pw_log_error("context %p: error %d", c, error);
if (c)
c->error = error;
return error;

View file

@ -108,19 +108,24 @@ static inline const char *pa_strnull(const char *x) {
int pa_context_set_error(pa_context *c, int error);
#define PA_CHECK_VALIDITY(context, expression, error) \
do { \
if (!(expression)) \
return -pa_context_set_error((context), (error)); \
} while(false)
#define PA_CHECK_VALIDITY(context, expression, error) \
do { \
if (!(expression)) { \
fprintf(stderr, "'%s' failed at %s:%u %s()", \
#expression , __FILE__, __LINE__, __func__); \
return -pa_context_set_error((context), (error)); \
} \
} while(false)
#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) \
do { \
if (!(expression)) { \
pa_context_set_error((context), (error)); \
return value; \
} \
} while(false)
#define PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, value) \
do { \
if (!(expression)) { \
fprintf(stderr, "'%s' failed at %s:%u %s()", \
#expression , __FILE__, __LINE__, __func__); \
pa_context_set_error((context), (error)); \
return value; \
} \
} while(false)
#define PA_CHECK_VALIDITY_RETURN_NULL(context, expression, error) \
PA_CHECK_VALIDITY_RETURN_ANY(context, expression, error, NULL)

View file

@ -129,6 +129,7 @@ static pa_time_event* api_time_new(pa_mainloop_api*a, const struct timeval *tv,
ts.tv_sec = tv->tv_sec;
ts.tv_nsec = tv->tv_usec * 1000LL;
}
pw_log_debug("new timer %p %ld %ld", ev, ts.tv_sec, ts.tv_nsec);
pw_loop_update_timer(mainloop->loop, ev->source, &ts, NULL, true);
return ev;

View file

@ -21,14 +21,17 @@
#include <spa/utils/defs.h>
#include <pipewire/log.h>
#include <pulse/rtclock.h>
pa_usec_t pa_rtclock_now(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * SPA_USEC_PER_SEC +
ts.tv_nsec * SPA_NSEC_PER_USEC;
pa_usec_t res;
clock_gettime(CLOCK_MONOTONIC, &ts);
res = (ts.tv_sec * SPA_USEC_PER_SEC) + (ts.tv_nsec / SPA_NSEC_PER_USEC);
return res;
}

View file

@ -19,6 +19,7 @@
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <spa/utils/defs.h>
@ -28,6 +29,65 @@
#include <pipewire/stream.h>
#include "internal.h"
static const uint32_t audio_formats[] = {
[PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8),
[PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN),
[PA_SAMPLE_ULAW] = offsetof(struct spa_type_audio_format, UNKNOWN),
[PA_SAMPLE_S16NE] = offsetof(struct spa_type_audio_format, S16),
[PA_SAMPLE_S16RE] = offsetof(struct spa_type_audio_format, S16_OE),
[PA_SAMPLE_FLOAT32NE] = offsetof(struct spa_type_audio_format, F32),
[PA_SAMPLE_FLOAT32RE] = offsetof(struct spa_type_audio_format, F32_OE),
[PA_SAMPLE_S32NE] = offsetof(struct spa_type_audio_format, S32),
[PA_SAMPLE_S32RE] = offsetof(struct spa_type_audio_format, S32_OE),
[PA_SAMPLE_S24NE] = offsetof(struct spa_type_audio_format, S24),
[PA_SAMPLE_S24RE] = offsetof(struct spa_type_audio_format, S24_OE),
[PA_SAMPLE_S24_32NE] = offsetof(struct spa_type_audio_format, S24_32),
[PA_SAMPLE_S24_32RE] = offsetof(struct spa_type_audio_format, S24_32_OE),
};
static inline uint32_t format_pa2id(pa_stream *s, pa_sample_format_t format)
{
if (format < 0 || format >= SPA_N_ELEMENTS(audio_formats))
return s->type.audio_format.UNKNOWN;
return *SPA_MEMBER(&s->type.audio_format, audio_formats[format], uint32_t);
}
static inline pa_sample_format_t format_id2pa(pa_stream *s, uint32_t id)
{
int i;
for (i = 0; i < SPA_N_ELEMENTS(audio_formats); i++) {
if (id == *SPA_MEMBER(&s->type.audio_format, audio_formats[i], uint32_t))
return i;
}
return PA_SAMPLE_INVALID;
}
static int dequeue_buffer(pa_stream *s)
{
struct pw_buffer *buf;
uint32_t index;
buf = pw_stream_dequeue_buffer(s->stream);
if (buf == NULL)
return -EPIPE;
spa_ringbuffer_get_write_index(&s->dequeued_ring, &index);
s->dequeued[index & MASK_BUFFERS] = buf;
spa_ringbuffer_write_update(&s->dequeued_ring, index + 1);
if (s->direction == PA_STREAM_PLAYBACK)
s->dequeued_size += buf->buffer->datas[0].maxsize;
else
s->dequeued_size += buf->buffer->datas[0].chunk->size;
return 0;
}
static void configure_buffers(pa_stream *s)
{
s->buffer_attr.maxlength = 65536;
s->buffer_attr.prebuf = s->buffer_attr.minreq;
}
static void stream_state_changed(void *data, enum pw_stream_state old,
enum pw_stream_state state, const char *error)
@ -49,6 +109,7 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
case PW_STREAM_STATE_READY:
break;
case PW_STREAM_STATE_PAUSED:
configure_buffers(s);
pa_stream_set_state(s, PA_STREAM_READY);
break;
case PW_STREAM_STATE_STREAMING:
@ -56,42 +117,98 @@ static void stream_state_changed(void *data, enum pw_stream_state old,
}
}
static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *attr, struct spa_pod_builder *b)
{
const struct spa_pod *param;
struct pw_type *t = pw_core_get_type(s->context->core);
int32_t blocks, buffers, size, maxsize, stride;
blocks = 1;
stride = pa_frame_size(&s->sample_spec);
if (attr->tlength == -1)
maxsize = 1024;
else
maxsize = (attr->tlength / stride);
if (attr->minreq == -1)
size = SPA_MIN(1024, maxsize);
else
size = SPA_MIN(attr->minreq / stride, maxsize);
if (attr->maxlength == -1)
buffers = 3;
else
buffers = SPA_CLAMP(attr->maxlength / (maxsize * stride), 3, 64);
param = spa_pod_builder_object(b,
t->param.idBuffers, t->param_buffers.Buffers,
":", t->param_buffers.buffers, "iru", buffers,
SPA_POD_PROP_MIN_MAX(3, 64),
":", t->param_buffers.blocks, "i", blocks,
":", t->param_buffers.size, "iru", size * stride,
SPA_POD_PROP_MIN_MAX(size * stride, maxsize * stride),
":", t->param_buffers.stride, "i", stride,
":", t->param_buffers.align, "i", 16);
return param;
}
static void stream_format_changed(void *data, const struct spa_pod *format)
{
pa_stream *s = data;
const struct spa_pod *params[4];
uint32_t n_params = 0;
uint8_t buffer[4096];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
struct spa_audio_info info = { 0 };
int res;
s->sample_spec.format = PA_SAMPLE_S16NE,
s->sample_spec.rate = 44100;
s->sample_spec.channels = 2;
spa_pod_object_parse(format,
"I", &info.media_type,
"I", &info.media_subtype);
if (info.media_type != s->type.media_type.audio ||
info.media_subtype != s->type.media_subtype.raw ||
spa_format_audio_raw_parse(format, &info.info.raw, &s->type.format_audio) < 0 ||
info.info.raw.layout != SPA_AUDIO_LAYOUT_INTERLEAVED) {
res = -EINVAL;
goto done;
}
s->sample_spec.format = format_id2pa(s, info.info.raw.format);
if (s->sample_spec.format == PA_SAMPLE_INVALID) {
res = -EINVAL;
goto done;
}
s->sample_spec.rate = info.info.raw.rate;
s->sample_spec.channels = info.info.raw.channels;
if (s->format)
pa_format_info_free(s->format);
s->format = pa_format_info_from_sample_spec(&s->sample_spec, NULL);
params[n_params++] = get_buffers_param(s, &s->buffer_attr, &b);
res = 0;
done:
pw_stream_finish_format(s->stream, res, params, n_params);
}
static void stream_process(void *data)
{
pa_stream *s = data;
struct pw_buffer *buf;
uint32_t index;
s->timing_info_valid = true;
buf = pw_stream_dequeue_buffer(s->stream);
if (buf == NULL)
if (dequeue_buffer(s) < 0 && s->dequeued_size == 0)
return;
spa_ringbuffer_get_write_index(&s->dequeued_ring, &index);
s->dequeued[index & MASK_BUFFERS] = buf;
spa_ringbuffer_write_update(&s->dequeued_ring, index + 1);
if (s->direction == PA_STREAM_PLAYBACK) {
s->dequeued_size += buf->buffer->datas[0].maxsize;
if (s->write_callback)
s->write_callback(s, s->dequeued_size, s->write_userdata);
}
else {
s->dequeued_size += buf->buffer->datas[0].chunk->size;
if (s->read_callback)
s->read_callback(s, s->dequeued_size, s->read_userdata);
}
@ -185,6 +302,9 @@ pa_stream* stream_new(pa_context *c, const char *name,
s->device_index = PA_INVALID_INDEX;
s->device_index = 0;
s->device_name = strdup("unknown");
spa_ringbuffer_init(&s->dequeued_ring);
spa_list_append(&c->streams, &s->link);
@ -343,6 +463,7 @@ int pa_stream_is_corked(pa_stream *s)
PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
pw_log_debug("stream %p: corked %d", s, s->corked);
return s->corked;
}
@ -395,30 +516,14 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag
if (attr->fragsize == (uint32_t) -1)
attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */
pw_log_info("stream %p: maxlength: %u", s, attr->maxlength);
pw_log_info("stream %p: tlength: %u", s, attr->tlength);
pw_log_info("stream %p: minreq: %u", s, attr->minreq);
pw_log_info("stream %p: prebuf: %u", s, attr->prebuf);
pw_log_info("stream %p: fragsize: %u", s, attr->fragsize);
}
static const uint32_t audio_formats[] = {
[PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8),
[PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN),
[PA_SAMPLE_ULAW] = offsetof(struct spa_type_audio_format, UNKNOWN),
[PA_SAMPLE_S16NE] = offsetof(struct spa_type_audio_format, S16),
[PA_SAMPLE_S16RE] = offsetof(struct spa_type_audio_format, S16_OE),
[PA_SAMPLE_FLOAT32NE] = offsetof(struct spa_type_audio_format, F32),
[PA_SAMPLE_FLOAT32RE] = offsetof(struct spa_type_audio_format, F32_OE),
[PA_SAMPLE_S32NE] = offsetof(struct spa_type_audio_format, S32),
[PA_SAMPLE_S32RE] = offsetof(struct spa_type_audio_format, S32_OE),
[PA_SAMPLE_S24NE] = offsetof(struct spa_type_audio_format, S24),
[PA_SAMPLE_S24RE] = offsetof(struct spa_type_audio_format, S24_OE),
[PA_SAMPLE_S24_32NE] = offsetof(struct spa_type_audio_format, S24_32),
[PA_SAMPLE_S24_32RE] = offsetof(struct spa_type_audio_format, S24_32_OE),
};
static inline uint32_t get_format(pa_stream *s, pa_sample_format_t format)
{
if (format < 0 || format >= SPA_N_ELEMENTS(audio_formats))
return s->type.audio_format.UNKNOWN;
return *SPA_MEMBER(&s->type.audio_format, audio_formats[format], uint32_t);
}
static const struct spa_pod *get_param(pa_stream *s, pa_sample_spec *ss, pa_channel_map *map,
struct spa_pod_builder *b)
@ -430,7 +535,7 @@ static const struct spa_pod *get_param(pa_stream *s, pa_sample_spec *ss, pa_chan
t->param.idEnumFormat, t->spa_format,
"I", s->type.media_type.audio,
"I", s->type.media_subtype.raw,
":", s->type.format_audio.format, "I", get_format(s, ss->format),
":", s->type.format_audio.format, "I", format_pa2id(s, ss->format),
":", s->type.format_audio.layout, "i", SPA_AUDIO_LAYOUT_INTERLEAVED,
":", s->type.format_audio.channels, "i", ss->channels,
":", s->type.format_audio.rate, "i", ss->rate);
@ -481,7 +586,7 @@ static int create_stream(pa_stream_direction_t direction,
int i;
for (i = 0; i < s->n_formats; i++) {
if (pa_format_info_to_sample_spec(s->req_formats[i], &ss, NULL) < 0) {
if (pa_format_info_to_sample_spec(s->req_formats[i], &ss, &map) < 0) {
char buf[4096];
pw_log_warn("can't convert format %s",
pa_format_info_snprint(buf,4096,s->req_formats[i]));
@ -500,7 +605,7 @@ static int create_stream(pa_stream_direction_t direction,
dev = getenv("PIPEWIRE_NODE");
props = (struct pw_properties *) pw_stream_get_properties(s->stream);
pw_properties_setf(props, "node.latency", "%u", s->buffer_attr.minreq);
pw_properties_setf(props, "node.latency", "%u/44100", s->buffer_attr.minreq);
res = pw_stream_connect(s->stream,
direction == PA_STREAM_PLAYBACK ?
@ -563,6 +668,8 @@ int peek_buffer(pa_stream *s)
if (s->buffer != NULL)
return 0;
dequeue_buffer(s);
if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) <= 0)
return -EPIPE;
@ -585,9 +692,13 @@ int queue_buffer(pa_stream *s)
if (s->buffer == NULL)
return 0;
s->dequeued_size -= s->buffer_size;
spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1);
if (s->direction == PA_STREAM_PLAYBACK)
s->dequeued_size -= s->buffer->buffer->datas[0].maxsize;
else
s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size;
pw_stream_queue_buffer(s->stream, s->buffer);
s->buffer = NULL;
return 0;
@ -598,6 +709,7 @@ int pa_stream_begin_write(
void **data,
size_t *nbytes)
{
int res;
spa_assert(s);
spa_assert(s->refcount >= 1);
@ -608,7 +720,8 @@ int pa_stream_begin_write(
PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID);
PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID);
if (peek_buffer(s) < 0) {
if ((res = peek_buffer(s)) < 0) {
pw_log_warn("stream %p: no buffer", s);
*data = NULL;
*nbytes = 0;
return 0;
@ -651,6 +764,8 @@ int pa_stream_write_ext_free(pa_stream *s,
int64_t offset,
pa_seek_mode_t seek)
{
int res;
spa_assert(s);
spa_assert(s->refcount >= 1);
spa_assert(data);
@ -671,16 +786,39 @@ int pa_stream_write_ext_free(pa_stream *s,
PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID);
if (s->buffer == NULL) {
pw_log_warn("Not Implemented");
void *dst;
size_t dlen;
if ((res = pa_stream_begin_write(s, &dst, &dlen)) < 0)
return res;
if (dst == NULL || dlen == 0)
return 0;
nbytes = SPA_MIN(nbytes, dlen);
memcpy(dst, data, nbytes);
data = dst;
if (free_cb)
free_cb(free_cb_data);
return PA_ERR_INVALID;
} else {
s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data;
s->buffer->buffer->datas[0].chunk->size = nbytes;
queue_buffer(s);
}
s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data;
s->buffer->buffer->datas[0].chunk->size = nbytes;
/* Update the write index in the already available latency data */
if (s->timing_info_valid) {
if (seek == PA_SEEK_ABSOLUTE) {
s->timing_info.write_index_corrupt = false;
s->timing_info.write_index = offset + (int64_t) nbytes;
} else if (seek == PA_SEEK_RELATIVE) {
if (!s->timing_info.write_index_corrupt)
s->timing_info.write_index += offset + (int64_t) nbytes;
} else
s->timing_info.write_index_corrupt = true;
}
queue_buffer(s);
return 0;
}
@ -699,10 +837,12 @@ int pa_stream_peek(pa_stream *s,
if (peek_buffer(s) < 0) {
*data = NULL;
*nbytes = 0;
pw_log_debug("stream %p: no buffer", s);
return 0;
}
*data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void);
*nbytes = s->buffer_size;
pw_log_debug("stream %p: %p %zd", s, *data, *nbytes);
return 0;
}
@ -716,6 +856,7 @@ int pa_stream_drop(pa_stream *s)
PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE);
pw_log_debug("stream %p", s);
queue_buffer(s);
return 0;
@ -755,9 +896,10 @@ struct success_ack {
static void on_success(pa_operation *o, void *userdata)
{
struct success_ack *d = userdata;
pa_stream *s = o->stream;
pa_operation_done(o);
if (d->cb)
d->cb(o->stream, PA_OK, d->userdata);
d->cb(s, PA_OK, d->userdata);
}
pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)
@ -779,6 +921,17 @@ pa_operation* pa_stream_drain(pa_stream *s, pa_stream_success_cb_t cb, void *use
return o;
}
static void on_timing_success(pa_operation *o, void *userdata)
{
struct success_ack *d = userdata;
pa_stream *s = o->stream;
pa_operation_done(o);
s->timing_info_valid = true;
if (d->cb)
d->cb(s, s->timing_info_valid, d->userdata);
}
pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t cb, void *userdata)
{
pa_operation *o;
@ -790,7 +943,7 @@ pa_operation* pa_stream_update_timing_info(pa_stream *s, pa_stream_success_cb_t
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack));
o = pa_operation_new(s->context, s, on_timing_success, sizeof(struct success_ack));
d = o->userdata;
d->cb = cb;
d->userdata = userdata;
@ -1045,6 +1198,8 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
{
struct pw_time t;
pa_usec_t res;
struct timespec ts;
uint64_t now, delay;
spa_assert(s);
spa_assert(s->refcount >= 1);
@ -1055,11 +1210,20 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec)
pw_stream_get_time(s->stream, &t);
res = (t.ticks * t.rate.num * PA_USEC_PER_SEC) / t.rate.denom;
clock_gettime(CLOCK_MONOTONIC, &ts);
now = ts.tv_sec * SPA_NSEC_PER_SEC + ts.tv_nsec;
delay = (now - t.now) / PA_NSEC_PER_USEC;
if (t.rate.denom != 0)
res = delay + ((t.ticks * t.rate.num * PA_USEC_PER_SEC) / t.rate.denom);
else
res = 0;
if (r_usec)
*r_usec = res;
pw_log_debug("stream %p: %ld %ld %ld %ld %d/%d %ld", s, now, t.now, delay, t.ticks, t.rate.num, t.rate.denom, res);
return 0;
}
@ -1091,6 +1255,8 @@ const pa_timing_info* pa_stream_get_timing_info(pa_stream *s)
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->direction != PA_STREAM_UPLOAD, PA_ERR_BADSTATE);
PA_CHECK_VALIDITY_RETURN_NULL(s->context, s->timing_info_valid, PA_ERR_NODATA);
pw_log_warn("Not Implemented");
return &s->timing_info;
}