diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index 9fd56719e..e13d7b5ba 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -325,6 +325,14 @@ struct global *pa_context_find_global(pa_context *c, uint32_t id); struct global *pa_context_find_global_by_name(pa_context *c, uint32_t mask, const char *name); struct global *pa_context_find_linked(pa_context *c, uint32_t id); +struct pa_mem { + struct spa_list link; + void *data; + size_t maxsize; + size_t size; + size_t offset; +}; + #define MAX_BUFFERS 64u #define MASK_BUFFERS (MAX_BUFFERS-1) @@ -357,7 +365,7 @@ struct pa_stream { char *device_name; pa_timing_info timing_info; - int64_t ticks_base; + uint64_t ticks_base; uint32_t direct_on_input; @@ -389,17 +397,16 @@ struct pa_stream { pa_stream_notify_cb_t buffer_attr_callback; void *buffer_attr_userdata; - struct pw_buffer *dequeued[MAX_BUFFERS]; - struct spa_ringbuffer dequeued_ring; - size_t dequeued_size; size_t maxsize; - struct spa_list pending; + size_t maxblock; + size_t requested_bytes; - struct pw_buffer *buffer; - uint32_t buffer_index; - void *buffer_data; - uint32_t buffer_size; - uint32_t buffer_offset; + struct pa_mem *mem; /* current mem for playback */ + struct spa_list free; /* free to fill */ + struct spa_list ready; /* ready for playback */ + size_t ready_bytes; + + struct pw_buffer *buffer; /* currently reading for capture */ uint32_t n_channel_volumes; float channel_volumes[SPA_AUDIO_MAX_CHANNELS]; diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index 3047a1a5d..4ecce803b 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -36,6 +36,7 @@ #define MIN_QUEUED 1 #define MAX_SIZE (4*1024*1024) +#define BLOCK_SIZE (64*1024) static const uint32_t audio_formats[] = { [PA_SAMPLE_U8] = SPA_AUDIO_FORMAT_U8, @@ -149,26 +150,6 @@ static inline pa_channel_position_t channel_id2pa(pa_stream *s, uint32_t id) return PA_CHANNEL_POSITION_INVALID; } -static inline int dequeue_buffer(pa_stream *s) -{ - struct pw_buffer *buf; - uint32_t index; - - buf = pw_stream_dequeue_buffer(s->stream); - if (buf == NULL) - return -EPIPE; - - spa_ringbuffer_get_write_index(&s->dequeued_ring, &index); - s->dequeued[index & MASK_BUFFERS] = buf; - if (s->direction == PA_STREAM_PLAYBACK) - s->dequeued_size += buf->buffer->datas[0].maxsize; - else - s->dequeued_size += buf->buffer->datas[0].chunk->size; - spa_ringbuffer_write_update(&s->dequeued_ring, index + 1); - - return 0; -} - static void dump_buffer_attr(pa_stream *s, pa_buffer_attr *attr) { pw_log_info("stream %p: maxlength: %u", s, attr->maxlength); @@ -180,7 +161,7 @@ static void dump_buffer_attr(pa_stream *s, pa_buffer_attr *attr) static void configure_buffers(pa_stream *s) { - s->buffer_attr.maxlength = s->maxsize; + s->buffer_attr.maxlength = MAX_SIZE; if (s->buffer_attr.prebuf == (uint32_t)-1) s->buffer_attr.prebuf = s->buffer_attr.minreq; s->buffer_attr.fragsize = s->buffer_attr.minreq; @@ -342,7 +323,7 @@ static void patch_buffer_attr(pa_stream *s, pa_buffer_attr *attr, pa_stream_flag } if (attr->maxlength == (uint32_t) -1) - attr->maxlength = 4*1024*1024; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */ + attr->maxlength = MAX_SIZE; /* 4MB is the maximum queue length PulseAudio <= 0.9.9 supported. */ if (attr->tlength == (uint32_t) -1) attr->tlength = (uint32_t) pa_usec_to_bytes(250*PA_USEC_PER_MSEC, &s->sample_spec); /* 250ms of buffering */ @@ -432,11 +413,18 @@ static void stream_add_buffer(void *data, struct pw_buffer *buffer) { pa_stream *s = data; s->maxsize += buffer->buffer->datas[0].maxsize; + s->maxblock = SPA_MIN(buffer->buffer->datas[0].maxsize, s->maxblock); } + static void stream_remove_buffer(void *data, struct pw_buffer *buffer) { pa_stream *s = data; + struct pa_mem *m = buffer->user_data; s->maxsize -= buffer->buffer->datas[0].maxsize; + s->maxblock = INT_MAX; + if (m != NULL) + spa_list_append(&s->free, &m->link); + buffer->user_data = NULL; } static void update_timing_info(pa_stream *s) @@ -449,8 +437,6 @@ static void update_timing_info(pa_stream *s) pw_stream_get_time(s->stream, &pwt); s->timing_info_valid = false; - pw_log_debug("stream %p: %"PRIu64" rate:%d", s, pwt.queued, pwt.rate.denom); - pa_timeval_store(&ti->timestamp, pwt.now / SPA_NSEC_PER_USEC); ti->synchronized_clocks = true; ti->transport_usec = 0; @@ -459,9 +445,12 @@ static void update_timing_info(pa_stream *s) ti->read_index_corrupt = false; if (pwt.rate.denom > 0) { - if (s->ticks_base == -1) + if (s->ticks_base == (uint64_t)-1) s->ticks_base = pwt.ticks + pwt.delay; - index = ((pwt.ticks + pwt.delay - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; + if (pwt.ticks > s->ticks_base) + index = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; + else + index = 0; delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; s->have_time = true; } else { @@ -479,25 +468,58 @@ static void update_timing_info(pa_stream *s) ti->configured_source_usec = 0; ti->since_underrun = 0; s->timing_info_valid = true; + + pw_log_trace("stream %p: %"PRIu64" rate:%d delay:%"PRIi64, s, pwt.queued, pwt.rate.denom, delay); + +} + +static void push_output(pa_stream *s) +{ + struct pa_mem *m, *t, *old; + struct pw_buffer *buf; + + spa_list_for_each_safe(m, t, &s->ready, link) { + buf = pw_stream_dequeue_buffer(s->stream); + if (buf == NULL) + break; + + if ((old = buf->user_data) != NULL) + spa_list_append(&s->free, &old->link); + + spa_list_remove(&m->link); + s->ready_bytes -= m->size; + + buf->buffer->datas[0].maxsize = m->maxsize; + buf->buffer->datas[0].data = m->data; + buf->buffer->datas[0].chunk->offset = m->offset; + buf->buffer->datas[0].chunk->size = m->size; + buf->user_data = m; + + pw_stream_queue_buffer(s->stream, buf); + } } static void stream_process(void *data) { pa_stream *s = data; - while (dequeue_buffer(s) == 0); - - pw_log_trace("stream %p: %"PRIu64, s, s->dequeued_size); - if (s->dequeued_size <= 0) - return; + pw_log_trace("stream %p:", s); + update_timing_info(s); if (s->direction == PA_STREAM_PLAYBACK) { - if (s->write_callback) - s->write_callback(s, s->dequeued_size, s->write_userdata); + if (s->ready_bytes < s->buffer_attr.tlength) + s->requested_bytes = s->buffer_attr.tlength - s->ready_bytes; + else + s->requested_bytes = 0; + + if (s->write_callback && s->requested_bytes) + s->write_callback(s, s->requested_bytes, s->write_userdata); + + push_output(s); } else { if (s->read_callback) - s->read_callback(s, s->dequeued_size, s->read_userdata); + s->read_callback(s, s->requested_bytes, s->read_userdata); } } @@ -562,7 +584,8 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->refcount = 1; s->context = c; - spa_list_init(&s->pending); + spa_list_init(&s->free); + spa_list_init(&s->ready); s->direction = PA_STREAM_NODIRECTION; s->state = PA_STREAM_UNCONNECTED; @@ -609,12 +632,12 @@ static pa_stream* stream_new(pa_context *c, const char *name, s->buffer_attr.minreq = (uint32_t) -1; s->buffer_attr.prebuf = (uint32_t) -1; s->buffer_attr.fragsize = (uint32_t) -1; + s->maxblock = INT_MAX; + s->requested_bytes = s->buffer_attr.tlength; s->device_index = PA_INVALID_INDEX; s->device_name = NULL; - spa_ringbuffer_init(&s->dequeued_ring); - spa_list_append(&c->streams, &s->link); pa_stream_ref(s); @@ -1061,48 +1084,38 @@ int pa_stream_disconnect(pa_stream *s) return 0; } -int peek_buffer(pa_stream *s) +struct pa_mem *get_mem(pa_stream *s, size_t len) { - int32_t avail; - uint32_t index; + struct pa_mem *m; + if (s->mem != NULL) + return s->mem; - if (s->buffer != NULL) - return 0; + if (spa_list_is_empty(&s->free)) { + if (len > s->maxblock) + len = s->maxblock; + m = calloc(1, sizeof(struct pa_mem) + len); + if (m == NULL) + return NULL; - if ((avail = spa_ringbuffer_get_read_index(&s->dequeued_ring, &index)) < MIN_QUEUED) - return -EPIPE; - - s->buffer = s->dequeued[index & MASK_BUFFERS]; - s->buffer_index = index; - s->buffer_data = s->buffer->buffer->datas[0].data; - if (s->direction == PA_STREAM_RECORD) { - s->buffer_size = s->buffer->buffer->datas[0].chunk->size; - s->buffer_offset = s->buffer->buffer->datas[0].chunk->offset; + m->data = SPA_MEMBER(m, sizeof(struct pa_mem), void); + m->maxsize = len; + spa_list_append(&s->free, &m->link); } - else { - s->buffer_size = s->buffer->buffer->datas[0].maxsize; - } - return 0; + m = spa_list_first(&s->free, struct pa_mem, link); + spa_list_remove(&m->link); + m->offset = 0; + m->size = 0; + return m; } -int queue_buffer(pa_stream *s) +int release_mem(pa_stream *s) { - if (s->buffer == NULL) - return 0; - - if (s->direction == PA_STREAM_PLAYBACK) - s->dequeued_size -= s->buffer->buffer->datas[0].maxsize; - else - s->dequeued_size -= s->buffer->buffer->datas[0].chunk->size; - spa_ringbuffer_read_update(&s->dequeued_ring, s->buffer_index + 1); - - s->buffer->size = s->buffer->buffer->datas[0].chunk->size; - pw_log_trace("%p %"PRIu64"/%d", s->buffer, s->buffer->size, - s->buffer->buffer->datas[0].chunk->offset); - - pw_stream_queue_buffer(s->stream, s->buffer); - s->buffer = NULL; - s->buffer_offset = 0; + if (s->mem == NULL) + return -EINVAL; + spa_list_append(&s->ready, &s->mem->link); + s->ready_bytes += s->mem->size; + s->mem = NULL; + push_output(s); return 0; } @@ -1112,7 +1125,7 @@ int pa_stream_begin_write( void **data, size_t *nbytes) { - int res; + size_t max; spa_assert(s); spa_assert(s->refcount >= 1); @@ -1123,17 +1136,17 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); - pw_log_trace("peek buffer %p %zd %d %d", *data, *nbytes, s->buffer_size, s->buffer_offset); - - if ((res = peek_buffer(s)) < 0) { + s->mem = get_mem(s, *nbytes); + if (s->mem == NULL) { *data = NULL; *nbytes = 0; - } else { - size_t max = s->buffer_size - s->buffer_offset; - *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); - *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max; + return -errno; } - pw_log_trace("peek buffer %p %zd %p %d", *data, *nbytes, s->buffer, res); + max = s->mem->maxsize - s->mem->size; + *data = SPA_MEMBER(s->mem->data, s->mem->offset, void); + *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max; + + pw_log_trace("peek buffer %p %zd %p", *data, *nbytes, s->mem); return 0; } @@ -1148,8 +1161,13 @@ int pa_stream_cancel_write(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || s->direction == PA_STREAM_UPLOAD, PA_ERR_BADSTATE); - pw_log_debug("cancel %p %p %d", s->buffer, s->buffer_data, s->buffer_size); - s->buffer = NULL; + if (s->mem == NULL) + return 0; + + pw_log_trace("cancel %p %p %zd", s->mem, s->mem->data, s->mem->size); + + spa_list_prepend(&s->free, &s->mem->link); + s->mem = NULL; return 0; } @@ -1185,54 +1203,50 @@ int pa_stream_write_ext_free(pa_stream *s, PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_PLAYBACK || (seek == PA_SEEK_RELATIVE && offset == 0), PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, - !s->buffer || - ((data >= s->buffer_data) && - ((const char*) data + nbytes <= (const char*) s->buffer_data + s->buffer_size)), + s->mem == NULL || + ((data >= s->mem->data) && + ((const char*) data + nbytes <= (const char*) s->mem->data + s->mem->maxsize)), PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, offset % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes % pa_frame_size(&s->sample_spec) == 0, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID); - if (s->buffer == NULL) { + pw_log_trace("stream %p: write %zd bytes", s, nbytes); + + if (s->mem == NULL) { void *dst; const void *src = data; size_t towrite = nbytes, dsize; - pw_log_debug("stream %p: write %zd bytes", s, nbytes); - while (towrite > 0) { dsize = towrite; if (pa_stream_begin_write(s, &dst, &dsize) < 0 || dst == NULL || dsize == 0) { - pw_log_debug("stream %p: out of buffers, wanted %zd bytes", s, nbytes); + pw_log_error("stream %p: out of buffers, wanted %zd bytes", s, nbytes); break; } memcpy(dst, src, dsize); - s->buffer_offset += dsize; + s->mem->size += dsize; + + if (s->mem->size >= s->mem->maxsize || towrite == dsize) + release_mem(s); - if (s->buffer_offset >= s->buffer_size || towrite == dsize) { - s->buffer->buffer->datas[0].chunk->offset = 0; - s->buffer->buffer->datas[0].chunk->size = s->buffer_offset; - queue_buffer(s); - } towrite -= dsize; src = SPA_MEMBER(src, dsize, void); } if (free_cb) free_cb(free_cb_data); - - s->buffer = NULL; } else { - s->buffer->buffer->datas[0].chunk->offset = SPA_PTRDIFF(data, s->buffer_data); - s->buffer->buffer->datas[0].chunk->size = nbytes; - queue_buffer(s); + s->mem->offset = SPA_PTRDIFF(data, s->mem->data); + s->mem->size = nbytes; + release_mem(s); } s->timing_info.write_index += nbytes; - pw_log_debug("stream %p: written %zd bytes", s, nbytes); + pw_log_trace("stream %p: written %zd bytes", s, nbytes); return 0; } @@ -1250,15 +1264,20 @@ int pa_stream_peek(pa_stream *s, PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); - if (peek_buffer(s) < 0) { + if (s->buffer == NULL) + s->buffer = pw_stream_dequeue_buffer(s->stream); + if (s->buffer == NULL) { + pw_log_error("stream %p: no buffer: %m", s); *data = NULL; *nbytes = 0; - pw_log_debug("stream %p: no buffer", s); return 0; } - *data = SPA_MEMBER(s->buffer_data, s->buffer_offset, void); - *nbytes = s->buffer_size; - pw_log_trace("stream %p: %p %zd %f", s, *data, *nbytes, *(float*)*data); + + *data = SPA_MEMBER(s->buffer->buffer->datas[0].data, + s->buffer->buffer->datas[0].chunk->offset, void); + *nbytes = s->buffer->buffer->datas[0].chunk->size; + + pw_log_trace("stream %p: %p %zd", s, *data, *nbytes); return 0; } @@ -1274,7 +1293,8 @@ int pa_stream_drop(pa_stream *s) PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE); pw_log_trace("stream %p", s); - queue_buffer(s); + pw_stream_queue_buffer(s->stream, s->buffer); + s->buffer = NULL; return 0; } @@ -1290,8 +1310,8 @@ size_t pa_stream_writable_size(PA_CONST pa_stream *s) PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction != PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - pw_log_trace("stream %p: %zd", s, s->dequeued_size); - return s->dequeued_size; + pw_log_trace("stream %p: %zd", s, s->requested_bytes); + return s->requested_bytes; } SPA_EXPORT @@ -1305,7 +1325,7 @@ size_t pa_stream_readable_size(PA_CONST pa_stream *s) PA_CHECK_VALIDITY_RETURN_ANY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE, (size_t) -1); - return s->dequeued_size; + return s->requested_bytes; } struct success_ack { @@ -1657,7 +1677,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) { pa_usec_t res; struct timespec ts; - uint64_t now, delay, read_time; + uint64_t now, delay, time; pa_timing_info *i; spa_assert(s); @@ -1676,16 +1696,19 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) else delay = 0; - read_time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec); + if (s->direction == PA_STREAM_PLAYBACK) + time = pa_bytes_to_usec((uint64_t) i->read_index, &s->sample_spec) + i->sink_usec; + else + time = pa_bytes_to_usec((uint64_t) i->write_index, &s->sample_spec) + i->source_usec; - res = delay + read_time; + res = delay + time; if (r_usec) *r_usec = res; - pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" read_time:%"PRIu64 + pw_log_trace("stream %p: now:%"PRIu64" delay:%"PRIu64" time:%"PRIu64 " write-index:%"PRIi64" read_index:%"PRIi64" diff:%"PRIi64" res:%"PRIu64, - s, now, delay, read_time, + s, now, delay, time, i->write_index, i->read_index, i->write_index - i->read_index, res);