diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index e3eb95181..66e0e4cf3 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -401,9 +401,6 @@ struct pa_mem { void *user_data; }; -#define MAX_BUFFERS 64u -#define MASK_BUFFERS (MAX_BUFFERS-1) - struct pa_stream { struct spa_list link; int refcount; diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 02c1ca435..0f39ae808 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -33,14 +33,17 @@ #include "core-format.h" #include "internal.h" -#define MIN_QUEUED 1 - -#define MAX_SIZE (4*1024*1024) -#define BLOCK_SIZE (64*1024) +#define MIN_BUFFERS 8u +#define MAX_BUFFERS 64u +#define MAX_BUFFER_SAMPLES (8*1024u) +#define MAX_SIZE (4*1024*1024u) static void dump_buffer_attr(pa_stream *s, pa_buffer_attr *attr) { + char b[1024]; + pw_log_debug("stream %p: sample: %s", s, pa_sample_spec_snprint(b, sizeof(b), &s->sample_spec)); + pw_log_debug("stream %p: stride: %zu", s, pa_frame_size(&s->sample_spec)); pw_log_debug("stream %p: maxlength: %u", s, attr->maxlength); pw_log_debug("stream %p: tlength: %u", s, attr->tlength); pw_log_debug("stream %p: minreq: %u", s, attr->minreq); @@ -48,15 +51,6 @@ static void dump_buffer_attr(pa_stream *s, pa_buffer_attr *attr) pw_log_debug("stream %p: fragsize: %u", s, attr->fragsize); } -static void configure_buffers(pa_stream *s) -{ - s->buffer_attr.maxlength = MAX_SIZE; - if (s->buffer_attr.prebuf == (uint32_t)-1) - s->buffer_attr.prebuf = s->buffer_attr.minreq; - s->buffer_attr.fragsize = s->buffer_attr.minreq; - dump_buffer_attr(s, &s->buffer_attr); -} - static void configure_device(pa_stream *s) { struct global *g; @@ -131,7 +125,6 @@ static void stream_state_changed(void *data, enum pw_stream_state old, break; case PW_STREAM_STATE_STREAMING: configure_device(s); - configure_buffers(s); pa_stream_set_state(s, PA_STREAM_READY); if (s->suspended) { s->suspended = false; @@ -150,32 +143,19 @@ static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *att blocks = 1; stride = pa_frame_size(&s->sample_spec); - if (attr->tlength == (uint32_t)-1 || attr->tlength == 0) - maxsize = 1024; - else - maxsize = (attr->tlength / stride); - - if (attr->minreq == (uint32_t)-1 || attr->minreq == 0) - size = maxsize; - else - size = SPA_MIN(attr->minreq / stride, maxsize); - - if (attr->maxlength == (uint32_t)-1) - buffers = 3; - else - buffers = SPA_CLAMP(attr->maxlength / (size * stride), 3u, MAX_BUFFERS); + maxsize = attr->tlength; + size = attr->minreq; + buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS); pw_log_debug("stream %p: stride %d maxsize %d size %u buffers %d", s, stride, maxsize, size, buffers); param = spa_pod_builder_add_object(b, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers, - SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(buffers, buffers, MAX_BUFFERS), + SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(buffers, MIN_BUFFERS, MAX_BUFFERS), SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(blocks), SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int( - size * stride, - size * stride, - maxsize * stride), + size, size, maxsize), SPA_PARAM_BUFFERS_stride, SPA_POD_Int(stride), SPA_PARAM_BUFFERS_align, SPA_POD_Int(16)); return param; @@ -184,6 +164,7 @@ static const struct spa_pod *get_buffers_param(pa_stream *s, pa_buffer_attr *att static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flags_t *flags) { const char *e, *str; char buf[100]; + uint32_t stride; pa_assert(s); pa_assert(attr); @@ -228,21 +209,30 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag *flags |= PA_STREAM_ADJUST_LATENCY; } } + dump_buffer_attr(s, attr); - if (attr->maxlength == (uint32_t) -1) + stride = pa_frame_size(&s->sample_spec); + if (attr->maxlength == (uint32_t) -1 || attr->maxlength == 0) attr->maxlength = MAX_SIZE; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */ - if (attr->tlength == (uint32_t) -1) - attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */ + if (attr->tlength == (uint32_t) -1 || attr->tlength == 0) + attr->tlength = (uint32_t) pa_usec_to_bytes(2*PA_USEC_PER_SEC, &s->sample_spec); + attr->tlength = SPA_MIN(attr->tlength, attr->maxlength); - if (attr->minreq == (uint32_t) -1) - attr->minreq = attr->tlength; /* Ask for more data when there are only 200ms left in the playback buffer */ + if (attr->minreq == (uint32_t) -1 || attr->minreq == 0) + attr->minreq = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); + attr->minreq = SPA_MIN(attr->minreq, attr->tlength / MIN_BUFFERS); + attr->minreq = SPA_MAX(attr->minreq, stride); - if (attr->prebuf == (uint32_t) -1) - attr->prebuf = attr->tlength; /* Start to play only when the playback is fully filled up once */ + if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0) + attr->fragsize = pa_usec_to_bytes(25*PA_USEC_PER_MSEC, &s->sample_spec); + attr->fragsize = SPA_MIN(attr->fragsize, attr->tlength / MIN_BUFFERS); + attr->fragsize = SPA_MAX(attr->fragsize, stride); - if (attr->fragsize == (uint32_t) -1) - attr->fragsize = attr->tlength; /* Pass data to the app only when the buffer is filled up once */ + if (attr->prebuf == (uint32_t) -1 || attr->prebuf == 0) + attr->prebuf = attr->tlength - attr->minreq; + attr->prebuf = SPA_MIN(attr->prebuf, attr->tlength - attr->minreq); + attr->prebuf = SPA_MAX(attr->prebuf, stride); dump_buffer_attr(s, attr); } @@ -355,7 +345,6 @@ static void update_timing_info(pa_stream *s) ti->configured_source_usec = delay; ti->write_index = pos; } - ti->since_underrun = 0; s->timing_info_valid = true; s->queued_bytes = pwt.queued; @@ -449,20 +438,25 @@ static void stream_process(void *data) if (s->direction == PA_STREAM_PLAYBACK) { pa_timing_info *i = &s->timing_info; - uint64_t queued, writable; + uint64_t queued, writable, required; queue_output(s); queued = i->write_index - SPA_MIN(i->read_index, i->write_index); writable = s->maxblock - SPA_MIN(queued, s->maxblock); + required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); - if (s->write_callback && s->state == PA_STREAM_READY && writable > 0) + if (s->write_callback && s->state == PA_STREAM_READY && writable >= required) s->write_callback(s, writable, s->write_userdata); } else { + uint64_t required; + pull_input(s); - if (s->read_callback && s->ready_bytes > 0 && s->state == PA_STREAM_READY) + required = SPA_MIN(s->maxblock, s->buffer_attr.fragsize); + + if (s->read_callback && s->ready_bytes > required && s->state == PA_STREAM_READY) s->read_callback(s, s->ready_bytes, s->read_userdata); } } @@ -554,23 +548,6 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->direct_on_input = PA_INVALID_INDEX; s->stream_index = PA_INVALID_INDEX; - - s->buffer_attr.maxlength = (uint32_t) -1; - if (ss) - s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, ss); /* 250ms of buffering */ - else { - /* FIXME: We assume a worst-case compressed format corresponding to - * 48000 Hz, 2 ch, S16 PCM, but this can very well be incorrect */ - pa_sample_spec tmp_ss = { - .format = PA_SAMPLE_S16NE, - .rate = 48000, - .channels = 2, - }; - s->buffer_attr.tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &tmp_ss); /* 250ms of buffering */ - } - s->buffer_attr.minreq = (uint32_t) -1; - s->buffer_attr.prebuf = (uint32_t) -1; - s->buffer_attr.fragsize = (uint32_t) -1; s->maxblock = INT_MAX; s->device_index = PA_INVALID_INDEX; @@ -802,10 +779,9 @@ static int create_stream(pa_stream_direction_t direction, int res; enum pw_stream_flags fl; const struct spa_pod *params[16]; - uint32_t i, n_params = 0; - uint8_t buffer[4096]; - struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); - uint32_t sample_rate = 0, stride = 0, latency_num; + uint32_t i, n_params = 0, stride; + uint8_t buffer[4096]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); const char *str; uint32_t devid, n_items; struct global *g; @@ -886,16 +862,18 @@ static int create_stream(pa_stream_direction_t direction, monitor = (flags & PA_STREAM_PEAK_DETECT); no_remix = (flags & PA_STREAM_NO_REMIX_CHANNELS); + if (attr) + s->buffer_attr = *attr; + if (pa_sample_spec_valid(&s->sample_spec)) { params[n_params++] = pa_format_build_param(&b, SPA_PARAM_EnumFormat, &s->sample_spec, &s->channel_map); - sample_rate = s->sample_spec.rate; - stride = pa_frame_size(&s->sample_spec); } else { pa_sample_spec ss; pa_channel_map chmap; int i; + uint32_t sample_rate = 0; for (i = 0; i < s->n_formats; i++) { if ((res = pa_format_info_to_sample_spec(s->req_formats[i], &ss, NULL)) < 0) { @@ -911,17 +889,18 @@ static int create_stream(pa_stream_direction_t direction, &ss, &chmap); if (ss.rate > sample_rate) { sample_rate = ss.rate; - stride = pa_frame_size(&ss); + s->sample_spec = ss; } } + if (sample_rate == 0) { + s->sample_spec.format = PA_SAMPLE_S16NE; + s->sample_spec.rate = 48000; + s->sample_spec.channels = 2; + } } - if (sample_rate == 0) { - sample_rate = 48000; - stride = sizeof(int16_t) * 2; - } + if (!pa_sample_spec_valid(&s->sample_spec)) + return -EINVAL; - if (attr) - s->buffer_attr = *attr; patch_buffer_attr(s, &s->buffer_attr, &flags); if (direction == PA_STREAM_RECORD) @@ -972,8 +951,8 @@ static int create_stream(pa_stream_direction_t direction, else str = "Music"; - latency_num = s->buffer_attr.minreq / stride; - sprintf(latency, "%u/%u", SPA_MAX(latency_num, 1u), sample_rate); + stride = pa_frame_size(&s->sample_spec); + sprintf(latency, "%u/%u", s->buffer_attr.minreq / stride, s->sample_spec.rate); n_items = 0; items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency); items[n_items++] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_TYPE, "Audio"); @@ -1180,6 +1159,7 @@ int pa_stream_write_ext_free(pa_stream *s, free_cb(free_cb_data); s->timing_info.write_index += nbytes; + s->timing_info.since_underrun += nbytes; pw_log_trace("stream %p: written %zd bytes", s, nbytes); return 0; @@ -1253,7 +1233,7 @@ SPA_EXPORT size_t pa_stream_writable_size(PA_CONST pa_stream *s) { const pa_timing_info *i; - uint64_t now, then, queued, writable, elapsed; + uint64_t now, then, queued, writable, elapsed, required; struct timespec ts; spa_assert(s); @@ -1279,13 +1259,21 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s) queued -= SPA_MIN(queued, elapsed); writable = s->maxblock - SPA_MIN(queued, s->maxblock); - pw_log_trace("stream %p: %"PRIu64, s, writable); + required = SPA_MIN(s->maxblock, s->buffer_attr.minreq); + + pw_log_debug("stream %p: %"PRIu64" minreq:%u maxblock:%zu", s, + writable, s->buffer_attr.minreq, s->maxblock); + if (writable < required) + writable = 0; + return writable; } SPA_EXPORT size_t pa_stream_readable_size(PA_CONST pa_stream *s) { + uint64_t readable, required; + spa_assert(s); spa_assert(s->refcount >= 1); @@ -1294,8 +1282,13 @@ size_t pa_stream_readable_size(PA_CONST pa_stream *s) PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - pw_log_trace("stream %p: %zd", s, s->ready_bytes); - return s->ready_bytes; + readable = s->ready_bytes; + required = SPA_MIN(s->maxblock, s->buffer_attr.fragsize); + pw_log_trace("stream %p: %zd %zd", s, readable, required); + if (readable < required) + readable = 0; + + return readable; } struct success_ack {