stream: work on pending buffers

This commit is contained in:
Wim Taymans 2018-06-26 16:57:49 +02:00
parent e29a35f0ca
commit 11ee416bb0
2 changed files with 64 additions and 21 deletions

View file

@ -330,6 +330,7 @@ struct pa_stream {
struct pw_buffer *dequeued[MAX_BUFFERS];
struct spa_ringbuffer dequeued_ring;
size_t dequeued_size;
struct spa_list pending;
struct pw_buffer *buffer;
uint32_t buffer_index;

View file

@ -25,10 +25,22 @@
#include <pulse/stream.h>
#include <pulse/timeval.h>
#include <pulse/xmalloc.h>
#include <pipewire/stream.h>
#include "internal.h"
struct pending_data {
struct spa_list link;
const void *data;
size_t nbytes;
size_t offset;
pa_free_cb_t free_cb;
void *free_cb_data;
};
static const uint32_t audio_formats[] = {
[PA_SAMPLE_U8] = offsetof(struct spa_type_audio_format, U8),
[PA_SAMPLE_ALAW] = offsetof(struct spa_type_audio_format, UNKNOWN),
@ -313,6 +325,7 @@ pa_stream* stream_new(pa_context *c, const char *name,
s->refcount = 1;
s->context = c;
init_type(&s->type, pw_core_get_type(c->core)->map);
spa_list_init(&s->pending);
pw_stream_add_listener(s->stream, &s->stream_listener, &stream_events, s);
@ -760,6 +773,41 @@ int pa_stream_cancel_write(pa_stream *s)
return 0;
}
static void flush_pending(pa_stream *s)
{
struct pending_data *p;
void *data;
size_t nbytes;
bool flush;
while(!spa_list_is_empty(&s->pending)) {
p = spa_list_first(&s->pending, struct pending_data, link);
pa_stream_begin_write(s, &data, &nbytes);
if (data == NULL || nbytes == 0)
break;
nbytes = SPA_MIN(nbytes, p->nbytes - p->offset);
memcpy(data, p->data + p->offset, nbytes);
p->offset += nbytes;
s->buffer_offset += nbytes;
flush = p->offset >= p->nbytes;
if (flush) {
spa_list_remove(&p->link);
if (p->free_cb)
p->free_cb(p->free_cb_data);
pa_xfree(p);
}
if (flush || s->buffer_offset >= s->buffer_size) {
s->buffer->buffer->datas[0].chunk->size = s->buffer_offset;
queue_buffer(s);
}
}
}
int pa_stream_write(pa_stream *s,
const void *data,
size_t nbytes,
@ -778,8 +826,6 @@ int pa_stream_write_ext_free(pa_stream *s,
int64_t offset,
pa_seek_mode_t seek)
{
int res;
spa_assert(s);
spa_assert(s->refcount >= 1);
spa_assert(data);
@ -800,25 +846,23 @@ int pa_stream_write_ext_free(pa_stream *s,
PA_CHECK_VALIDITY(s->context, !free_cb || !s->buffer, PA_ERR_INVALID);
if (s->buffer == NULL) {
void *dst;
size_t dlen;
struct pending_data *p;
if ((res = pa_stream_begin_write(s, &dst, &dlen)) < 0)
return res;
p = pa_xmalloc(sizeof(struct pending_data));
p->data = data;
p->nbytes = nbytes;
p->offset = 0;
p->free_cb = free_cb;
p->free_cb_data = free_cb_data;
spa_list_append(&s->pending, &p->link);
if (dst == NULL || dlen == 0)
return 0;
nbytes = SPA_MIN(nbytes, dlen);
memcpy(dst, data, nbytes);
data = dst;
if (free_cb)
free_cb(free_cb_data);
flush_pending(s);
}
else {
s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data;
s->buffer->buffer->datas[0].chunk->size = nbytes;
queue_buffer(s);
}
s->buffer->buffer->datas[0].chunk->offset = data - s->buffer_data;
s->buffer->buffer->datas[0].chunk->size = nbytes;
/* Update the write index in the already available latency data */
if (s->timing_info_valid) {
@ -831,8 +875,6 @@ int pa_stream_write_ext_free(pa_stream *s,
} else
s->timing_info.write_index_corrupt = true;
}
queue_buffer(s);
return 0;
}
@ -1116,7 +1158,7 @@ pa_operation* pa_stream_cork(pa_stream *s, int b, pa_stream_success_cb_t cb, voi
s->corked = b;
pw_log_warn("Not Implemented");
pw_log_warn("Not Implemented %d", b);
o = pa_operation_new(s->context, s, on_success, sizeof(struct success_ack));
d = o->userdata;
d->cb = cb;