stream: add option to map buffers

Add an option to automatically map the buffers
Cleanups to the memory mapping
This commit is contained in:
Wim Taymans 2018-03-14 11:07:14 +01:00
parent 6f095f4cc5
commit 0bf03abafb
2 changed files with 250 additions and 215 deletions

View file

@ -42,7 +42,7 @@
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
struct mem_id {
struct mem {
uint32_t id;
int fd;
uint32_t flags;
@ -51,15 +51,20 @@ struct mem_id {
void *ptr;
};
struct buffer_id {
struct spa_list link;
uint32_t id;
bool used;
struct spa_buffer *buf;
struct buffer_mem {
void *ptr;
struct pw_map_range map;
uint32_t mem_id;
};
struct buffer {
struct spa_list link;
uint32_t id;
#define BUFFER_FLAG_OUT (1 << 0)
uint32_t flags;
struct spa_buffer *buf;
struct buffer_mem *mem;
uint32_t n_mem;
struct mem_id **mem;
};
struct stream {
@ -94,8 +99,8 @@ struct stream {
struct spa_source *timeout_source;
struct pw_array mem_ids;
struct pw_array buffer_ids;
struct pw_array mems;
struct pw_array buffers;
bool in_order;
struct spa_io_buffers *io;
@ -111,95 +116,92 @@ struct stream {
};
/** \endcond */
static struct mem_id *find_mem(struct pw_stream *stream, uint32_t id)
static struct mem *find_mem(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct mem_id *mid;
struct mem *m;
pw_array_for_each(mid, &impl->mem_ids) {
if (mid->id == id)
return mid;
pw_array_for_each(m, &impl->mems) {
if (m->id == id)
return m;
}
return NULL;
}
static void *mem_map(struct pw_stream *stream, struct mem_id *m, uint32_t offset, uint32_t size)
static void *mem_map(struct pw_stream *stream, struct pw_map_range *range,
int fd, int prot, uint32_t offset, uint32_t size)
{
if (m->ptr == NULL) {
pw_map_range_init(&m->map, offset, size, stream->remote->core->sc_pagesize);
void *ptr;
m->ptr = mmap(NULL, m->map.size, PROT_READ|PROT_WRITE,
MAP_SHARED, m->fd, m->map.offset);
pw_map_range_init(range, offset, size, stream->remote->core->sc_pagesize);
if (m->ptr == MAP_FAILED) {
pw_log_error("stream %p: Failed to mmap memory %d %p: %m", stream, size, m);
m->ptr = NULL;
return NULL;
}
ptr = mmap(NULL, range->size, prot, MAP_SHARED, fd, range->offset);
if (ptr == MAP_FAILED) {
pw_log_error("stream %p: Failed to mmap memory %d: %m", stream, size);
return NULL;
}
return SPA_MEMBER(m->ptr, m->map.start, void);
return SPA_MEMBER(ptr, range->start, void);
}
static void mem_unmap(struct stream *impl, struct mem_id *m)
static void *mem_unmap(struct stream *impl, void *ptr, struct pw_map_range *range)
{
if (m->ptr != NULL) {
if (munmap(m->ptr, m->map.size) < 0)
if (ptr != NULL) {
if (munmap(SPA_MEMBER(ptr, -range->start, void) , range->size) < 0)
pw_log_warn("stream %p: failed to unmap: %m", impl);
m->ptr = NULL;
}
return NULL;
}
static void clear_memid(struct stream *impl, struct mem_id *mid)
static void clear_mem(struct stream *impl, struct mem *m)
{
if (mid->fd != -1) {
if (m->fd != -1) {
bool has_ref = false;
struct mem_id *m2;
struct mem *m2;
int fd;
fd = mid->fd;
mid->fd = -1;
pw_log_debug("stream %p: clear mem %d", impl, m->id);
pw_array_for_each(m2, &impl->mem_ids) {
fd = m->fd;
m->fd = -1;
m->id = SPA_ID_INVALID;
pw_array_for_each(m2, &impl->mems) {
if (m2->fd == fd) {
has_ref = true;
break;
}
}
if (!has_ref) {
mem_unmap(impl, mid);
m->ptr = mem_unmap(impl, m->ptr, &m->map);
close(fd);
}
}
}
static void clear_mems(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct mem_id *mid;
pw_array_for_each(mid, &impl->mem_ids)
clear_memid(impl, mid);
impl->mem_ids.size = 0;
}
static void clear_buffers(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
int i;
pw_log_debug("stream %p: clear buffers", stream);
pw_array_for_each(bid, &impl->buffer_ids) {
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, bid->id);
if (bid->ptr != NULL)
if (munmap(bid->ptr, bid->map.size) < 0)
pw_log_warn("failed to unmap buffer: %m");
bid->ptr = NULL;
free(bid->buf);
bid->buf = NULL;
bid->used = false;
pw_array_for_each(b, &impl->buffers) {
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
remove_buffer, b->id);
for (i = 0; i < b->n_mem; i++) {
struct buffer_mem *bm = &b->mem[i];
struct mem *m;
pw_log_debug("stream %p: clear buffer mem %d", stream, bm->mem_id);
m = find_mem(stream, bm->mem_id);
if (m && --m->ref == 0)
clear_mem(impl, m);
}
b->n_mem = 0;
free(b->buf);
}
impl->buffer_ids.size = 0;
impl->buffers.size = 0;
impl->in_order = true;
spa_list_init(&impl->free);
}
@ -281,10 +283,10 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
this->state = PW_STREAM_STATE_UNCONNECTED;
pw_array_init(&impl->mem_ids, 64);
pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem_id) * 64);
pw_array_init(&impl->buffer_ids, 32);
pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer_id) * 64);
pw_array_init(&impl->mems, 64);
pw_array_ensure_size(&impl->mems, sizeof(struct mem) * 64);
pw_array_init(&impl->buffers, 32);
pw_array_ensure_size(&impl->buffers, sizeof(struct buffer) * 64);
impl->pending_seq = SPA_ID_INVALID;
spa_list_init(&impl->free);
@ -417,11 +419,8 @@ void pw_stream_destroy(struct pw_stream *stream)
if (stream->error)
free(stream->error);
clear_buffers(stream);
pw_array_clear(&impl->buffer_ids);
clear_mems(stream);
pw_array_clear(&impl->mem_ids);
pw_array_clear(&impl->mems);
pw_array_clear(&impl->buffers);
if (stream->properties)
pw_properties_free(stream->properties);
@ -549,18 +548,18 @@ static void on_timeout(void *data, uint64_t expirations)
add_request_clock_update(stream);
}
static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id)
static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer_id)) {
return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer_id);
if (impl->in_order && pw_array_check_index(&impl->buffers, id, struct buffer)) {
return pw_array_get_unchecked(&impl->buffers, id, struct buffer);
} else {
struct buffer_id *bid;
struct buffer *b;
pw_array_for_each(bid, &impl->buffer_ids) {
if (bid->id == id)
return bid;
pw_array_for_each(b, &impl->buffers) {
if (b->id == id)
return b;
}
}
return NULL;
@ -569,12 +568,14 @@ static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id)
static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
if ((bid = find_buffer(stream, id)) && bid->used) {
if ((b = find_buffer(stream, id)) && SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) {
pw_log_trace("stream %p: reuse buffer %u", stream, id);
bid->used = false;
spa_list_append(&impl->free, &bid->link);
SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
spa_list_append(&impl->free, &b->link);
impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, new_buffer, id);
impl->in_new_buffer = false;
@ -588,50 +589,41 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
{
int i;
struct spa_io_buffers *io = impl->io;
struct buffer *b;
uint32_t buffer_id;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *io = impl->io;
struct buffer_id *bid;
uint32_t buffer_id;
buffer_id = io->buffer_id;
buffer_id = io->buffer_id;
pw_log_trace("stream %p: process input %d %d", stream, io->status,
buffer_id);
pw_log_trace("stream %p: process input %d %d", stream, io->status,
buffer_id);
if ((b = find_buffer(stream, buffer_id)) == NULL)
return;
if ((bid = find_buffer(stream, buffer_id)) == NULL)
continue;
if (impl->client_reuse)
io->buffer_id = SPA_ID_INVALID;
if (impl->client_reuse)
io->buffer_id = SPA_ID_INVALID;
if (io->status == SPA_STATUS_HAVE_BUFFER) {
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
if (io->status == SPA_STATUS_HAVE_BUFFER) {
bid->used = true;
impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, buffer_id);
impl->in_new_buffer = false;
}
io->status = SPA_STATUS_NEED_BUFFER;
impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, buffer_id);
impl->in_new_buffer = false;
}
io->status = SPA_STATUS_NEED_BUFFER;
send_need_input(stream);
break;
}
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
{
int i;
struct spa_io_buffers *io = impl->io;
for (i = 0; i < impl->trans->area->n_output_ports; i++) {
struct spa_io_buffers *io = impl->io;
reuse_buffer(stream, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
if (io->buffer_id == SPA_ID_INVALID)
continue;
reuse_buffer(stream, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
}
pw_log_trace("stream %p: process output", stream);
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer);
@ -737,8 +729,6 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma
add_async_complete(stream, seq, 0);
if (stream->state == PW_STREAM_STATE_PAUSED) {
int i;
pw_log_debug("stream %p: start %d %d", stream, seq, impl->direction);
pw_loop_update_io(stream->remote->core->data_loop,
@ -746,8 +736,7 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma
SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP);
if (impl->direction == SPA_DIRECTION_INPUT) {
for (i = 0; i < impl->trans->area->max_input_ports; i++)
impl->io->status = SPA_STATUS_NEED_BUFFER;
impl->io->status = SPA_STATUS_NEED_BUFFER;
send_need_input(stream);
}
else {
@ -839,21 +828,21 @@ client_node_add_mem(void *data,
{
struct stream *impl = data;
struct pw_stream *stream = &impl->this;
struct mem_id *m;
struct mem *m;
m = find_mem(stream, mem_id);
if (m) {
pw_log_debug("update mem %u, fd %d, flags %d",
mem_id, memfd, flags);
clear_memid(impl, m);
} else {
m = pw_array_add(&impl->mem_ids, sizeof(struct mem_id));
pw_log_debug("add mem %u, fd %d, flags %d",
pw_log_warn("duplicate mem %u, fd %d, flags %d",
mem_id, memfd, flags);
return;
}
m = pw_array_add(&impl->mems, sizeof(struct mem));
pw_log_debug("add mem %u, fd %d, flags %d", mem_id, memfd, flags);
m->id = mem_id;
m->fd = memfd;
m->flags = flags;
m->ref = 0;
m->map = PW_MAP_RANGE_INIT;
m->ptr = NULL;
}
@ -868,10 +857,10 @@ client_node_port_use_buffers(void *data,
struct pw_stream *stream = &impl->this;
struct pw_core *core = stream->remote->core;
struct pw_type *t = &core->type;
struct buffer_id *bid;
struct buffer *bid;
uint32_t i, j, len;
struct spa_buffer *b;
int prot;
int res = 0, prot;
prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
@ -879,74 +868,69 @@ client_node_port_use_buffers(void *data,
clear_buffers(stream);
for (i = 0; i < n_buffers; i++) {
struct buffer_mem bmem;
size_t size;
off_t offset;
struct mem *m;
struct mem_id *mid = find_mem(stream, buffers[i].mem_id);
if (mid == NULL) {
pw_log_warn("unknown memory id %u", buffers[i].mem_id);
continue;
if ((m = find_mem(stream, buffers[i].mem_id)) == NULL) {
pw_log_error("unknown memory id %u", buffers[i].mem_id);
res = -EINVAL;
goto error;
}
len = pw_array_get_len(&impl->buffer_ids, struct buffer_id);
bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer_id));
if (impl->direction == SPA_DIRECTION_OUTPUT) {
bid->used = false;
spa_list_append(&impl->free, &bid->link);
} else {
bid->used = true;
len = pw_array_get_len(&impl->buffers, struct buffer);
bid = pw_array_add(&impl->buffers, sizeof(struct buffer));
bid->flags = 0;
bmem.mem_id = m->id;
bmem.ptr = mem_map(stream, &bmem.map, m->fd, prot,
buffers[i].offset, buffers[i].size);
if (bmem.ptr == NULL) {
res = -errno;
goto error;
}
b = buffers[i].buffer;
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize);
bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, mid->fd, bid->map.offset);
if (bid->ptr == MAP_FAILED) {
bid->ptr = NULL;
pw_log_warn("Failed to mmap memory %d %p: %s", bid->map.size, mid,
strerror(errno));
continue;
size = sizeof(struct spa_buffer);
size += sizeof(struct buffer_mem);
for (j = 0; j < buffers[i].buffer->n_metas; j++)
size += sizeof(struct spa_meta);
for (j = 0; j < buffers[i].buffer->n_datas; j++) {
size += sizeof(struct spa_data);
size += sizeof(struct buffer_mem);
}
{
size_t size;
size = sizeof(struct spa_buffer);
size += sizeof(struct mem_id *);
for (j = 0; j < buffers[i].buffer->n_metas; j++)
size += sizeof(struct spa_meta);
for (j = 0; j < buffers[i].buffer->n_datas; j++) {
size += sizeof(struct spa_data);
size += sizeof(struct mem_id *);
}
b = bid->buf = malloc(size);
memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer));
b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta);
b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas,
struct spa_data);
bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas,
struct mem_id*);
bid->n_mem = 0;
mid->ref++;
bid->mem[bid->n_mem++] = mid;
b = bid->buf = malloc(size);
if (b == NULL) {
res = -ENOMEM;
goto error;
}
memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer));
b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta);
b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas,
struct spa_data);
bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas,
struct buffer_mem);
bid->n_mem = 0;
bid->id = b->id;
bid->mem[bid->n_mem++] = bmem;
m->ref++;
if (bid->id != len) {
pw_log_warn("unexpected id %u found, expected %u", bid->id, len);
impl->in_order = false;
}
pw_log_debug("add buffer %d %d %u %u", mid->id,
bid->id, bid->map.offset, bid->map.size);
pw_log_debug("add buffer %d %d %u %u", m->id,
bid->id, bmem.map.offset, bmem.map.size);
offset = bid->map.start;
offset = 0;
for (j = 0; j < b->n_metas; j++) {
struct spa_meta *m = &b->metas[j];
memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta));
m->data = SPA_MEMBER(bid->ptr, offset, void);
m->data = SPA_MEMBER(bmem.ptr, offset, void);
offset += m->size;
}
@ -955,36 +939,68 @@ client_node_port_use_buffers(void *data,
memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data));
d->chunk =
SPA_MEMBER(bid->ptr, offset + sizeof(struct spa_chunk) * j,
SPA_MEMBER(bmem.ptr, offset + sizeof(struct spa_chunk) * j,
struct spa_chunk);
if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) {
struct mem_id *bmid = find_mem(stream, SPA_PTR_TO_UINT32(d->data));
d->data = NULL;
d->fd = bmid->fd;
bmid->ref++;
bid->mem[bid->n_mem++] = bmid;
pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd);
uint32_t mem_id = SPA_PTR_TO_UINT32(d->data);
struct mem *bm = find_mem(stream, mem_id);
struct buffer_mem bm2;
if (bm == NULL) {
pw_log_error("unknown memory id %u", mem_id);
continue;
}
d->fd = bm->fd;
bm->ref++;
bm2.mem_id = bm->id;
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS))
bm2.ptr = mem_map(stream, &bm2.map, d->fd, prot,
d->mapoffset, d->maxsize);
else
bm2.ptr = NULL;
d->data = bm2.ptr;
bid->mem[bid->n_mem++] = bm2;
pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd);
} else if (d->type == t->data.MemPtr) {
d->data = SPA_MEMBER(bid->ptr,
bid->map.start + SPA_PTR_TO_INT(d->data), void);
int offs = SPA_PTR_TO_INT(d->data);
d->data = SPA_MEMBER(bmem.ptr, offs, void);
d->fd = -1;
pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data);
} else {
pw_log_warn("unknown buffer data type %d", d->type);
}
}
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, add_buffer, bid->id);
}
add_async_complete(stream, seq, 0);
if (impl->direction == SPA_DIRECTION_OUTPUT) {
SPA_FLAG_UNSET(bid->flags, BUFFER_FLAG_OUT);
spa_list_append(&impl->free, &bid->link);
} else {
SPA_FLAG_SET(bid->flags, BUFFER_FLAG_OUT);
}
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
add_buffer, bid->id);
}
if (n_buffers)
stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
else {
clear_mems(stream);
stream_set_state(stream, PW_STREAM_STATE_READY, NULL);
}
exit:
add_async_complete(stream, seq, res);
return;
error:
clear_buffers(stream);
goto exit;
}
static void
@ -996,6 +1012,21 @@ client_node_port_command(void *data,
pw_log_warn("port command not supported");
}
static void clean_transport(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (impl->trans == NULL)
return;
unhandle_socket(stream);
clear_buffers(stream);
pw_client_node_transport_destroy(impl->trans);
impl->trans = NULL;
}
static void client_node_transport(void *data, uint32_t node_id,
int readfd, int writefd,
struct pw_client_node_transport *transport)
@ -1005,8 +1036,8 @@ static void client_node_transport(void *data, uint32_t node_id,
stream->node_id = node_id;
if (impl->trans)
pw_client_node_transport_destroy(impl->trans);
clean_transport(stream);
impl->trans = transport;
pw_log_info("stream %p: create client transport %p with fds %d %d for node %u",
@ -1030,7 +1061,7 @@ static void client_node_port_set_io(void *data,
struct pw_stream *stream = &impl->this;
struct pw_core *core = stream->remote->core;
struct pw_type *t = &core->type;
struct mem_id *m;
struct mem *m;
void *ptr;
int res;
@ -1045,10 +1076,15 @@ static void client_node_port_set_io(void *data,
res = -EINVAL;
goto exit;
}
if ((ptr = mem_map(stream, m, offset, size)) == NULL) {
res = -errno;
goto exit;
if (m->ptr == NULL) {
m->ptr = mem_map(stream, &m->map, m->fd,
PROT_READ|PROT_WRITE, offset, size);
if (m->ptr == NULL) {
res = -errno;
goto exit;
}
}
ptr = m->ptr;
}
if (id == t->io.Buffers) {
@ -1129,8 +1165,12 @@ pw_stream_connect(struct pw_stream *stream,
if (impl->node_proxy == NULL)
return -ENOMEM;
pw_client_node_proxy_add_listener(impl->node_proxy, &impl->node_listener, &client_node_events, impl);
pw_proxy_add_listener((struct pw_proxy*)impl->node_proxy, &impl->proxy_listener, &proxy_events, impl);
pw_client_node_proxy_add_listener(impl->node_proxy,
&impl->node_listener,
&client_node_events, impl);
pw_proxy_add_listener((struct pw_proxy*)impl->node_proxy,
&impl->proxy_listener,
&proxy_events, impl);
do_node_init(stream);
@ -1160,7 +1200,6 @@ pw_stream_finish_format(struct pw_stream *stream,
if (!impl->format) {
clear_buffers(stream);
clear_mems(stream);
}
}
add_async_complete(stream, impl->pending_seq, res);
@ -1174,7 +1213,7 @@ int pw_stream_disconnect(struct pw_stream *stream)
impl->disconnecting = true;
unhandle_socket(stream);
clean_transport(stream);
if (impl->node_proxy) {
pw_client_node_proxy_destroy(impl->node_proxy);
@ -1213,47 +1252,43 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
if (spa_list_is_empty(&impl->free))
return SPA_ID_INVALID;
bid = spa_list_first(&impl->free, struct buffer_id, link);
b = spa_list_first(&impl->free, struct buffer, link);
return bid->id;
return b->id;
}
int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
if ((bid = find_buffer(stream, id)) == NULL || !bid->used)
if ((b = find_buffer(stream, id)) == NULL ||
!SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT))
return -EINVAL;
bid->used = false;
spa_list_append(&impl->free, &bid->link);
SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT);
spa_list_append(&impl->free, &b->link);
if (impl->in_new_buffer) {
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *io = impl->io;
io->buffer_id = id;
}
struct spa_io_buffers *io = impl->io;
io->buffer_id = id;
} else {
send_reuse_buffer(stream, id);
}
return 0;
}
struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id)
{
struct buffer_id *bid;
struct buffer *b;
if ((bid = find_buffer(stream, id)))
return bid->buf;
if ((b = find_buffer(stream, id)))
return b->buf;
return NULL;
}
@ -1261,7 +1296,7 @@ struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id)
int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
if (impl->io->buffer_id != SPA_ID_INVALID) {
pw_log_debug("can't send %u, pending buffer %u", id,
@ -1269,14 +1304,13 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id)
return -EIO;
}
if ((bid = find_buffer(stream, id)) && !bid->used) {
bid->used = true;
spa_list_remove(&bid->link);
if ((b = find_buffer(stream, id)) && !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) {
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
spa_list_remove(&b->link);
impl->io->buffer_id = id;
impl->io->status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: send buffer %d", stream, id);
if (!impl->in_need_buffer)
send_have_output(stream);
send_have_output(stream);
} else {
pw_log_debug("stream %p: output %u was used", stream, id);
}

View file

@ -215,6 +215,7 @@ enum pw_stream_flags {
PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for
* this stream */
PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */
PW_STREAM_FLAG_MAP_BUFFERS = (1 << 3), /**< mmap the buffers */
};
/** A time structure \memberof pw_stream */