diff --git a/src/internal.h b/src/internal.h index ed702f946..fb96bb108 100644 --- a/src/internal.h +++ b/src/internal.h @@ -330,6 +330,7 @@ struct pa_stream { struct pw_buffer *dequeued[MAX_BUFFERS]; struct spa_ringbuffer dequeued_ring; size_t dequeued_size; + struct spa_list pending; struct pw_buffer *buffer; uint32_t buffer_index; diff --git a/src/stream.c b/src/stream.c index bef1cc2db..ea44a27ed 100644 --- a/src/stream.c +++ b/src/stream.c @@ -25,10 +25,22 @@ #include #include +#include #include #include "internal.h" +struct pending_data { + struct spa_list link; + + const void *data; + size_t nbytes; + size_t offset; + + pa_free_cb_t free_cb; + void *free_cb_data; +}; + static const uint32_t audio_formats[] = { [PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8), [PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN), @@ -313,6 +325,7 @@ pa_stream* stream_new(pa_context *c, const char *name, s->refcount = 1; s->context = c; init_type(&s->type, pw_core_get_type(c->core)->map); + spa_list_init(&s->pending); pw_stream_add_listener(s->stream, &s->stream_listener, &stream_events, s); @@ -760,6 +773,41 @@ int pa_stream_cancel_write(pa_stream *s) return 0; } +static void flush_pending(pa_stream *s) +{ + struct pending_data *p; + void *data; + size_t nbytes; + bool flush; + + while(!spa_list_is_empty(&s->pending)) { + p = spa_list_first(&s->pending, struct pending_data, link); + + pa_stream_begin_write(s, &data, &nbytes); + if (data == NULL || nbytes == 0) + break; + + nbytes = SPA_MIN(nbytes, p->nbytes - p->offset); + memcpy(data, p->data + p->offset, nbytes); + + p->offset += nbytes; + s->buffer_offset += nbytes; + + flush = p->offset >= p->nbytes; + + if (flush) { + spa_list_remove(&p->link); + if (p->free_cb) + p->free_cb(p->free_cb_data); + pa_xfree(p); + } + if (flush || s->buffer_offset >= s->buffer_size) { + s->buffer->buffer->datas[0].chunk->size = s->buffer_offset; + queue_buffer(s); + } + } +} + int pa_stream_write(pa_stream *s, const void *data, size_t nbytes, @@ -778,8 +826,6 @@ int pa_stream_write_ext_free(pa_stream *s, int64_t offset, pa_seek_mode_t seek) { - int res; - spa_assert(s); spa_assert(s->refcount >= 1); spa_assert(data); @@ -800,25 +846,23 @@ int pa_stream_write_ext_free(pa_stream *s, PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID); if (s->buffer == NULL) { - void *dst; - size_t dlen; + struct pending_data *p; - if ((res = pa_stream_begin_write(s, &dst, &dlen)) < 0) - return res; + p = pa_xmalloc(sizeof(struct pending_data)); + p->data = data; + p->nbytes = nbytes; + p->offset = 0; + p->free_cb = free_cb; + p->free_cb_data = free_cb_data; + spa_list_append(&s->pending, &p->link); - if (dst == NULL || dlen == 0) - return 0; - - nbytes = SPA_MIN(nbytes, dlen); - memcpy(dst, data, nbytes); - data = dst; - - if (free_cb) - free_cb(free_cb_data); + flush_pending(s); + } + else { + s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data; + s->buffer->buffer->datas[0].chunk->size = nbytes; + queue_buffer(s); } - - s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data; - s->buffer->buffer->datas[0].chunk->size = nbytes; /* Update the write index in the already available latency data */ if (s->timing_info_valid) { @@ -831,8 +875,6 @@ int pa_stream_write_ext_free(pa_stream *s, } else s->timing_info.write_index_corrupt = true; } - queue_buffer(s); - return 0; } @@ -1116,7 +1158,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi s->corked = b; - pw_log_warn("Not Implemented"); + pw_log_warn("Not Implemented %d", b); o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack)); d = o->userdata; d->cb = cb;