diff --git a/pipewire-pulseaudio/src/internal.h b/pipewire-pulseaudio/src/internal.h index e13d7b5ba..7df4b7846 100644 --- a/pipewire-pulseaudio/src/internal.h +++ b/pipewire-pulseaudio/src/internal.h @@ -331,6 +331,7 @@ struct pa_mem { size_t maxsize; size_t size; size_t offset; + void *user_data; }; #define MAX_BUFFERS 64u diff --git a/pipewire-pulseaudio/src/stream.c b/pipewire-pulseaudio/src/stream.c index abdc587af..0387de131 100644 --- a/pipewire-pulseaudio/src/stream.c +++ b/pipewire-pulseaudio/src/stream.c @@ -432,7 +432,7 @@ static void update_timing_info(pa_stream *s) struct pw_time pwt; pa_timing_info *ti = &s->timing_info; size_t stride = pa_frame_size(&s->sample_spec); - int64_t delay, index; + int64_t delay, pos; pw_stream_get_time(s->stream, &pwt); s->timing_info_valid = false; @@ -448,32 +448,35 @@ static void update_timing_info(pa_stream *s) if (s->ticks_base == (uint64_t)-1) s->ticks_base = pwt.ticks + pwt.delay; if (pwt.ticks > s->ticks_base) - index = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; + pos = ((pwt.ticks - s->ticks_base) * s->sample_spec.rate / pwt.rate.denom) * stride; else - index = 0; + pos = 0; delay = pwt.delay * SPA_USEC_PER_SEC / pwt.rate.denom; s->have_time = true; } else { - index = delay = 0; + pos = delay = 0; s->have_time = false; } if (s->direction == PA_STREAM_PLAYBACK) { ti->sink_usec = delay; ti->configured_sink_usec = delay; - ti->read_index = index; + ti->read_index = pos; } else { ti->source_usec = delay; ti->configured_source_usec = delay; - ti->write_index = index; + ti->write_index = pos; } ti->since_underrun = 0; s->timing_info_valid = true; - pw_log_debug("stream %p: %"PRIu64" rate:%d delay:%"PRIi64 " read:%"PRIu64" write:%"PRIu64, - s, pwt.queued, pwt.rate.denom, delay, ti->read_index, ti->write_index); + pw_log_debug("stream %p: %"PRIu64" rate:%d/%d ticks:%"PRIu64" pos:%"PRIu64" delay:%"PRIi64 " read:%"PRIu64 + " write:%"PRIu64" diff:%"PRIi64, + s, pwt.queued, s->sample_spec.rate, pwt.rate.denom, pwt.ticks, pos, delay, + ti->read_index, ti->write_index, + ti->read_index - ti->write_index); } -static void push_output(pa_stream *s) +static void queue_output(pa_stream *s) { struct pa_mem *m, *t, *old; struct pw_buffer *buf; @@ -499,6 +502,47 @@ static void push_output(pa_stream *s) } } +struct pa_mem *alloc_mem(pa_stream *s, size_t len) +{ + struct pa_mem *m; + if (spa_list_is_empty(&s->free)) { + if (len > s->maxblock) + len = s->maxblock; + m = calloc(1, sizeof(struct pa_mem) + len); + if (m == NULL) + return NULL; + m->data = SPA_MEMBER(m, sizeof(struct pa_mem), void); + m->maxsize = len; + } else { + m = spa_list_first(&s->free, struct pa_mem, link); + spa_list_remove(&m->link); + } + return m; +} + +static void pull_input(pa_stream *s) +{ + struct pw_buffer *buf; + struct pa_mem *m; + + while ((buf = pw_stream_dequeue_buffer(s->stream)) != NULL) { + if ((m = alloc_mem(s, 0)) == NULL) { + pw_log_error("stream %p: Can't alloc mem: %m", s); + pw_stream_queue_buffer(s->stream, buf); + continue; + } + m->data = buf->buffer->datas[0].data; + m->maxsize = buf->buffer->datas[0].maxsize; + m->offset = buf->buffer->datas[0].chunk->offset; + m->size = buf->buffer->datas[0].chunk->size; + m->user_data = buf; + buf->user_data = m; + + spa_list_append(&s->ready, &m->link); + s->ready_bytes += m->size; + } +} + static void stream_process(void *data) { pa_stream *s = data; @@ -507,8 +551,8 @@ static void stream_process(void *data) update_timing_info(s); if (s->direction == PA_STREAM_PLAYBACK) { - s->requested_bytes += s->buffer_attr.tlength; - if (s->ready_bytes < s->buffer_attr.tlength) + s->requested_bytes += s->maxblock; + if (s->ready_bytes < s->maxblock) s->requested_bytes -= s->ready_bytes; else s->requested_bytes = 0; @@ -516,10 +560,13 @@ static void stream_process(void *data) if (s->write_callback && s->requested_bytes > 0) s->write_callback(s, s->requested_bytes, s->write_userdata); - push_output(s); + queue_output(s); } else { - if (s->read_callback) + pull_input(s); + + s->requested_bytes = s->ready_bytes; + if (s->read_callback && s->requested_bytes > 0) s->read_callback(s, s->requested_bytes, s->read_userdata); } } @@ -1091,49 +1138,12 @@ int pa_stream_disconnect(pa_stream *s) return 0; } -struct pa_mem *get_mem(pa_stream *s, size_t len) -{ - struct pa_mem *m; - if (s->mem != NULL) - return s->mem; - - if (spa_list_is_empty(&s->free)) { - if (len > s->maxblock) - len = s->maxblock; - m = calloc(1, sizeof(struct pa_mem) + len); - if (m == NULL) - return NULL; - - m->data = SPA_MEMBER(m, sizeof(struct pa_mem), void); - m->maxsize = len; - spa_list_append(&s->free, &m->link); - } - m = spa_list_first(&s->free, struct pa_mem, link); - spa_list_remove(&m->link); - m->offset = 0; - m->size = 0; - return m; -} - -int release_mem(pa_stream *s) -{ - if (s->mem == NULL) - return -EINVAL; - spa_list_append(&s->ready, &s->mem->link); - s->ready_bytes += s->mem->size; - s->mem = NULL; - push_output(s); - return 0; -} - SPA_EXPORT int pa_stream_begin_write( pa_stream *s, void **data, size_t *nbytes) { - size_t max; - spa_assert(s); spa_assert(s->refcount >= 1); @@ -1143,15 +1153,16 @@ int pa_stream_begin_write( PA_CHECK_VALIDITY(s->context, data, PA_ERR_INVALID); PA_CHECK_VALIDITY(s->context, nbytes && *nbytes != 0, PA_ERR_INVALID); - s->mem = get_mem(s, *nbytes); + if (s->mem == NULL) + s->mem = alloc_mem(s, *nbytes); if (s->mem == NULL) { *data = NULL; *nbytes = 0; return -errno; } - max = s->mem->maxsize - s->mem->size; - *data = SPA_MEMBER(s->mem->data, s->mem->offset, void); - *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, max) : max; + s->mem->offset = s->mem->size = 0; + *data = s->mem->data; + *nbytes = *nbytes != (size_t)-1 ? SPA_MIN(*nbytes, s->mem->maxsize) : s->mem->maxsize; pw_log_trace("peek buffer %p %zd %p", *data, *nbytes, s->mem); @@ -1199,6 +1210,9 @@ int pa_stream_write_ext_free(pa_stream *s, int64_t offset, pa_seek_mode_t seek) { + const void *src = data; + size_t towrite; + spa_assert(s); spa_assert(s->refcount >= 1); spa_assert(data); @@ -1220,38 +1234,33 @@ int pa_stream_write_ext_free(pa_stream *s, pw_log_trace("stream %p: write %zd bytes", s, nbytes); - if (s->mem == NULL) { - void *dst; - const void *src = data; - size_t towrite = nbytes, dsize; - - while (towrite > 0) { - dsize = towrite; - + towrite = nbytes; + while (towrite > 0) { + size_t dsize = towrite; + if (s->mem == NULL) { + void *dst; if (pa_stream_begin_write(s, &dst, &dsize) < 0 || dst == NULL || dsize == 0) { pw_log_error("stream %p: out of buffers, wanted %zd bytes", s, nbytes); break; } - memcpy(dst, src, dsize); - - s->mem->size += dsize; - - if (s->mem->size >= s->mem->maxsize || towrite == dsize) - release_mem(s); - - towrite -= dsize; src = SPA_MEMBER(src, dsize, void); + } else { + s->mem->offset = SPA_PTRDIFF(src, s->mem->data); + } + towrite -= dsize; + s->mem->size = dsize; + if (s->mem->size >= s->mem->maxsize || towrite == 0) { + spa_list_append(&s->ready, &s->mem->link); + s->ready_bytes += s->mem->size; + s->mem = NULL; + queue_output(s); } - if (free_cb) - free_cb(free_cb_data); - } - else { - s->mem->offset = SPA_PTRDIFF(data, s->mem->data); - s->mem->size = nbytes; - release_mem(s); } + if (free_cb) + free_cb(free_cb_data); + s->timing_info.write_index += nbytes; s->requested_bytes = s->requested_bytes > nbytes ? s->requested_bytes - nbytes : 0; pw_log_trace("stream %p: written %zd bytes", s, nbytes); @@ -1272,18 +1281,16 @@ int pa_stream_peek(pa_stream *s, PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); - if (s->buffer == NULL) - s->buffer = pw_stream_dequeue_buffer(s->stream); - if (s->buffer == NULL) { + if (spa_list_is_empty(&s->ready)) { pw_log_error("stream %p: no buffer: %m", s); *data = NULL; *nbytes = 0; return 0; } + s->mem = spa_list_first(&s->ready, struct pa_mem, link); - *data = SPA_MEMBER(s->buffer->buffer->datas[0].data, - s->buffer->buffer->datas[0].chunk->offset, void); - *nbytes = s->buffer->buffer->datas[0].chunk->size; + *data = SPA_MEMBER(s->mem->data, s->mem->offset, void); + *nbytes = s->mem->size; pw_log_trace("stream %p: %p %zd", s, *data, *nbytes); @@ -1293,16 +1300,31 @@ int pa_stream_peek(pa_stream *s, SPA_EXPORT int pa_stream_drop(pa_stream *s) { + size_t nbytes; + struct pw_buffer *buf; + spa_assert(s); spa_assert(s->refcount >= 1); PA_CHECK_VALIDITY(s->context, s->state == PA_STREAM_READY, PA_ERR_BADSTATE); PA_CHECK_VALIDITY(s->context, s->direction == PA_STREAM_RECORD, PA_ERR_BADSTATE); - PA_CHECK_VALIDITY(s->context, s->buffer, PA_ERR_BADSTATE); + PA_CHECK_VALIDITY(s->context, s->mem, PA_ERR_BADSTATE); - pw_log_trace("stream %p", s); - pw_stream_queue_buffer(s->stream, s->buffer); - s->buffer = NULL; + nbytes = s->mem->size; + + pw_log_trace("stream %p %zd", s, nbytes); + spa_list_remove(&s->mem->link); + + s->timing_info.read_index += nbytes; + s->ready_bytes -= nbytes; + s->requested_bytes = s->ready_bytes; + + buf = s->mem->user_data; + pw_stream_queue_buffer(s->stream, buf); + buf->user_data = NULL; + + spa_list_append(&s->free, &s->mem->link); + s->mem = NULL; return 0; } @@ -1722,7 +1744,7 @@ int pa_stream_get_time(pa_stream *s, pa_usec_t *r_usec) if (r_usec) *r_usec = res; - pw_log_debug("stream %p: now:%"PRIu64" diff:%"PRIi64 + pw_log_trace("stream %p: now:%"PRIu64" diff:%"PRIi64 " write-index:%"PRIi64" read_index:%"PRIi64" res:%"PRIu64, s, now, now - res, i->write_index, i->read_index, res);