diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index b05ffbebc..5866bb9e6 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -42,7 +42,7 @@ #define MAX_INPUTS 64 #define MAX_OUTPUTS 64 -struct mem_id { +struct mem { uint32_t id; int fd; uint32_t flags; @@ -51,15 +51,20 @@ struct mem_id { void *ptr; }; -struct buffer_id { - struct spa_list link; - uint32_t id; - bool used; - struct spa_buffer *buf; +struct buffer_mem { void *ptr; struct pw_map_range map; + uint32_t mem_id; +}; + +struct buffer { + struct spa_list link; + uint32_t id; +#define BUFFER_FLAG_OUT (1 << 0) + uint32_t flags; + struct spa_buffer *buf; + struct buffer_mem *mem; uint32_t n_mem; - struct mem_id **mem; }; struct stream { @@ -94,8 +99,8 @@ struct stream { struct spa_source *timeout_source; - struct pw_array mem_ids; - struct pw_array buffer_ids; + struct pw_array mems; + struct pw_array buffers; bool in_order; struct spa_io_buffers *io; @@ -111,95 +116,92 @@ struct stream { }; /** \endcond */ -static struct mem_id *find_mem(struct pw_stream *stream, uint32_t id) +static struct mem *find_mem(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct mem_id *mid; + struct mem *m; - pw_array_for_each(mid, &impl->mem_ids) { - if (mid->id == id) - return mid; + pw_array_for_each(m, &impl->mems) { + if (m->id == id) + return m; } return NULL; } -static void *mem_map(struct pw_stream *stream, struct mem_id *m, uint32_t offset, uint32_t size) +static void *mem_map(struct pw_stream *stream, struct pw_map_range *range, + int fd, int prot, uint32_t offset, uint32_t size) { - if (m->ptr == NULL) { - pw_map_range_init(&m->map, offset, size, stream->remote->core->sc_pagesize); + void *ptr; - m->ptr = mmap(NULL, m->map.size, PROT_READ|PROT_WRITE, - MAP_SHARED, m->fd, m->map.offset); + pw_map_range_init(range, offset, size, stream->remote->core->sc_pagesize); - if (m->ptr == MAP_FAILED) { - pw_log_error("stream %p: Failed to mmap memory %d %p: %m", stream, size, m); - m->ptr = NULL; - return NULL; - } + ptr = mmap(NULL, range->size, prot, MAP_SHARED, fd, range->offset); + if (ptr == MAP_FAILED) { + pw_log_error("stream %p: Failed to mmap memory %d: %m", stream, size); + return NULL; } - return SPA_MEMBER(m->ptr, m->map.start, void); + return SPA_MEMBER(ptr, range->start, void); } -static void mem_unmap(struct stream *impl, struct mem_id *m) +static void *mem_unmap(struct stream *impl, void *ptr, struct pw_map_range *range) { - if (m->ptr != NULL) { - if (munmap(m->ptr, m->map.size) < 0) + if (ptr != NULL) { + if (munmap(SPA_MEMBER(ptr, -range->start, void) , range->size) < 0) pw_log_warn("stream %p: failed to unmap: %m", impl); - m->ptr = NULL; } + return NULL; } -static void clear_memid(struct stream *impl, struct mem_id *mid) +static void clear_mem(struct stream *impl, struct mem *m) { - if (mid->fd != -1) { + if (m->fd != -1) { bool has_ref = false; - struct mem_id *m2; + struct mem *m2; int fd; - fd = mid->fd; - mid->fd = -1; + pw_log_debug("stream %p: clear mem %d", impl, m->id); - pw_array_for_each(m2, &impl->mem_ids) { + fd = m->fd; + m->fd = -1; + m->id = SPA_ID_INVALID; + + pw_array_for_each(m2, &impl->mems) { if (m2->fd == fd) { has_ref = true; break; } } if (!has_ref) { - mem_unmap(impl, mid); + m->ptr = mem_unmap(impl, m->ptr, &m->map); close(fd); } } } -static void clear_mems(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct mem_id *mid; - - pw_array_for_each(mid, &impl->mem_ids) - clear_memid(impl, mid); - impl->mem_ids.size = 0; -} - static void clear_buffers(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; + int i; pw_log_debug("stream %p: clear buffers", stream); - pw_array_for_each(bid, &impl->buffer_ids) { - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, bid->id); - if (bid->ptr != NULL) - if (munmap(bid->ptr, bid->map.size) < 0) - pw_log_warn("failed to unmap buffer: %m"); - bid->ptr = NULL; - free(bid->buf); - bid->buf = NULL; - bid->used = false; + pw_array_for_each(b, &impl->buffers) { + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + remove_buffer, b->id); + + for (i = 0; i < b->n_mem; i++) { + struct buffer_mem *bm = &b->mem[i]; + struct mem *m; + pw_log_debug("stream %p: clear buffer mem %d", stream, bm->mem_id); + m = find_mem(stream, bm->mem_id); + if (m && --m->ref == 0) + clear_mem(impl, m); + } + b->n_mem = 0; + free(b->buf); } - impl->buffer_ids.size = 0; + impl->buffers.size = 0; impl->in_order = true; spa_list_init(&impl->free); } @@ -281,10 +283,10 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, this->state = PW_STREAM_STATE_UNCONNECTED; - pw_array_init(&impl->mem_ids, 64); - pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem_id) * 64); - pw_array_init(&impl->buffer_ids, 32); - pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer_id) * 64); + pw_array_init(&impl->mems, 64); + pw_array_ensure_size(&impl->mems, sizeof(struct mem) * 64); + pw_array_init(&impl->buffers, 32); + pw_array_ensure_size(&impl->buffers, sizeof(struct buffer) * 64); impl->pending_seq = SPA_ID_INVALID; spa_list_init(&impl->free); @@ -417,11 +419,8 @@ void pw_stream_destroy(struct pw_stream *stream) if (stream->error) free(stream->error); - clear_buffers(stream); - pw_array_clear(&impl->buffer_ids); - - clear_mems(stream); - pw_array_clear(&impl->mem_ids); + pw_array_clear(&impl->mems); + pw_array_clear(&impl->buffers); if (stream->properties) pw_properties_free(stream->properties); @@ -549,18 +548,18 @@ static void on_timeout(void *data, uint64_t expirations) add_request_clock_update(stream); } -static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id) +static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer_id)) { - return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer_id); + if (impl->in_order && pw_array_check_index(&impl->buffers, id, struct buffer)) { + return pw_array_get_unchecked(&impl->buffers, id, struct buffer); } else { - struct buffer_id *bid; + struct buffer *b; - pw_array_for_each(bid, &impl->buffer_ids) { - if (bid->id == id) - return bid; + pw_array_for_each(b, &impl->buffers) { + if (b->id == id) + return b; } } return NULL; @@ -569,12 +568,14 @@ static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id) static inline void reuse_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; - if ((bid = find_buffer(stream, id)) && bid->used) { + if ((b = find_buffer(stream, id)) && SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { pw_log_trace("stream %p: reuse buffer %u", stream, id); - bid->used = false; - spa_list_append(&impl->free, &bid->link); + + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + spa_list_append(&impl->free, &b->link); + impl->in_new_buffer = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, new_buffer, id); impl->in_new_buffer = false; @@ -588,50 +589,41 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) { case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT: { - int i; + struct spa_io_buffers *io = impl->io; + struct buffer *b; + uint32_t buffer_id; - for (i = 0; i < impl->trans->area->n_input_ports; i++) { - struct spa_io_buffers *io = impl->io; - struct buffer_id *bid; - uint32_t buffer_id; + buffer_id = io->buffer_id; - buffer_id = io->buffer_id; + pw_log_trace("stream %p: process input %d %d", stream, io->status, + buffer_id); - pw_log_trace("stream %p: process input %d %d", stream, io->status, - buffer_id); + if ((b = find_buffer(stream, buffer_id)) == NULL) + return; - if ((bid = find_buffer(stream, buffer_id)) == NULL) - continue; + if (impl->client_reuse) + io->buffer_id = SPA_ID_INVALID; - if (impl->client_reuse) - io->buffer_id = SPA_ID_INVALID; + if (io->status == SPA_STATUS_HAVE_BUFFER) { + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); - if (io->status == SPA_STATUS_HAVE_BUFFER) { - bid->used = true; - impl->in_new_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - new_buffer, buffer_id); - impl->in_new_buffer = false; - } - - io->status = SPA_STATUS_NEED_BUFFER; + impl->in_new_buffer = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + new_buffer, buffer_id); + impl->in_new_buffer = false; } + + io->status = SPA_STATUS_NEED_BUFFER; send_need_input(stream); break; } case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT: { - int i; + struct spa_io_buffers *io = impl->io; - for (i = 0; i < impl->trans->area->n_output_ports; i++) { - struct spa_io_buffers *io = impl->io; + reuse_buffer(stream, io->buffer_id); + io->buffer_id = SPA_ID_INVALID; - if (io->buffer_id == SPA_ID_INVALID) - continue; - - reuse_buffer(stream, io->buffer_id); - io->buffer_id = SPA_ID_INVALID; - } pw_log_trace("stream %p: process output", stream); impl->in_need_buffer = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer); @@ -737,8 +729,6 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma add_async_complete(stream, seq, 0); if (stream->state == PW_STREAM_STATE_PAUSED) { - int i; - pw_log_debug("stream %p: start %d %d", stream, seq, impl->direction); pw_loop_update_io(stream->remote->core->data_loop, @@ -746,8 +736,7 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP); if (impl->direction == SPA_DIRECTION_INPUT) { - for (i = 0; i < impl->trans->area->max_input_ports; i++) - impl->io->status = SPA_STATUS_NEED_BUFFER; + impl->io->status = SPA_STATUS_NEED_BUFFER; send_need_input(stream); } else { @@ -839,21 +828,21 @@ client_node_add_mem(void *data, { struct stream *impl = data; struct pw_stream *stream = &impl->this; - struct mem_id *m; + struct mem *m; m = find_mem(stream, mem_id); if (m) { - pw_log_debug("update mem %u, fd %d, flags %d", - mem_id, memfd, flags); - clear_memid(impl, m); - } else { - m = pw_array_add(&impl->mem_ids, sizeof(struct mem_id)); - pw_log_debug("add mem %u, fd %d, flags %d", + pw_log_warn("duplicate mem %u, fd %d, flags %d", mem_id, memfd, flags); + return; } + m = pw_array_add(&impl->mems, sizeof(struct mem)); + pw_log_debug("add mem %u, fd %d, flags %d", mem_id, memfd, flags); + m->id = mem_id; m->fd = memfd; m->flags = flags; + m->ref = 0; m->map = PW_MAP_RANGE_INIT; m->ptr = NULL; } @@ -868,10 +857,10 @@ client_node_port_use_buffers(void *data, struct pw_stream *stream = &impl->this; struct pw_core *core = stream->remote->core; struct pw_type *t = &core->type; - struct buffer_id *bid; + struct buffer *bid; uint32_t i, j, len; struct spa_buffer *b; - int prot; + int res = 0, prot; prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); @@ -879,74 +868,69 @@ client_node_port_use_buffers(void *data, clear_buffers(stream); for (i = 0; i < n_buffers; i++) { + struct buffer_mem bmem; + size_t size; off_t offset; + struct mem *m; - struct mem_id *mid = find_mem(stream, buffers[i].mem_id); - if (mid == NULL) { - pw_log_warn("unknown memory id %u", buffers[i].mem_id); - continue; + if ((m = find_mem(stream, buffers[i].mem_id)) == NULL) { + pw_log_error("unknown memory id %u", buffers[i].mem_id); + res = -EINVAL; + goto error; } - len = pw_array_get_len(&impl->buffer_ids, struct buffer_id); - bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer_id)); - if (impl->direction == SPA_DIRECTION_OUTPUT) { - bid->used = false; - spa_list_append(&impl->free, &bid->link); - } else { - bid->used = true; + len = pw_array_get_len(&impl->buffers, struct buffer); + bid = pw_array_add(&impl->buffers, sizeof(struct buffer)); + bid->flags = 0; + + bmem.mem_id = m->id; + bmem.ptr = mem_map(stream, &bmem.map, m->fd, prot, + buffers[i].offset, buffers[i].size); + if (bmem.ptr == NULL) { + res = -errno; + goto error; } - b = buffers[i].buffer; - - pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); - - bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, mid->fd, bid->map.offset); - if (bid->ptr == MAP_FAILED) { - bid->ptr = NULL; - pw_log_warn("Failed to mmap memory %d %p: %s", bid->map.size, mid, - strerror(errno)); - continue; + size = sizeof(struct spa_buffer); + size += sizeof(struct buffer_mem); + for (j = 0; j < buffers[i].buffer->n_metas; j++) + size += sizeof(struct spa_meta); + for (j = 0; j < buffers[i].buffer->n_datas; j++) { + size += sizeof(struct spa_data); + size += sizeof(struct buffer_mem); } - { - size_t size; - - size = sizeof(struct spa_buffer); - size += sizeof(struct mem_id *); - for (j = 0; j < buffers[i].buffer->n_metas; j++) - size += sizeof(struct spa_meta); - for (j = 0; j < buffers[i].buffer->n_datas; j++) { - size += sizeof(struct spa_data); - size += sizeof(struct mem_id *); - } - - b = bid->buf = malloc(size); - memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); - - b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); - b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, - struct spa_data); - bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, - struct mem_id*); - bid->n_mem = 0; - - mid->ref++; - bid->mem[bid->n_mem++] = mid; + b = bid->buf = malloc(size); + if (b == NULL) { + res = -ENOMEM; + goto error; } + memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); + + b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); + b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, + struct spa_data); + bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, + struct buffer_mem); + bid->n_mem = 0; + bid->id = b->id; + bid->mem[bid->n_mem++] = bmem; + m->ref++; + if (bid->id != len) { pw_log_warn("unexpected id %u found, expected %u", bid->id, len); impl->in_order = false; } - pw_log_debug("add buffer %d %d %u %u", mid->id, - bid->id, bid->map.offset, bid->map.size); + pw_log_debug("add buffer %d %d %u %u", m->id, + bid->id, bmem.map.offset, bmem.map.size); - offset = bid->map.start; + offset = 0; for (j = 0; j < b->n_metas; j++) { struct spa_meta *m = &b->metas[j]; memcpy(m, &buffers[i].buffer->metas[j], sizeof(struct spa_meta)); - m->data = SPA_MEMBER(bid->ptr, offset, void); + m->data = SPA_MEMBER(bmem.ptr, offset, void); offset += m->size; } @@ -955,36 +939,68 @@ client_node_port_use_buffers(void *data, memcpy(d, &buffers[i].buffer->datas[j], sizeof(struct spa_data)); d->chunk = - SPA_MEMBER(bid->ptr, offset + sizeof(struct spa_chunk) * j, + SPA_MEMBER(bmem.ptr, offset + sizeof(struct spa_chunk) * j, struct spa_chunk); if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { - struct mem_id *bmid = find_mem(stream, SPA_PTR_TO_UINT32(d->data)); - d->data = NULL; - d->fd = bmid->fd; - bmid->ref++; - bid->mem[bid->n_mem++] = bmid; - pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); + uint32_t mem_id = SPA_PTR_TO_UINT32(d->data); + struct mem *bm = find_mem(stream, mem_id); + struct buffer_mem bm2; + + if (bm == NULL) { + pw_log_error("unknown memory id %u", mem_id); + continue; + } + + d->fd = bm->fd; + bm->ref++; + bm2.mem_id = bm->id; + + if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS)) + bm2.ptr = mem_map(stream, &bm2.map, d->fd, prot, + d->mapoffset, d->maxsize); + else + bm2.ptr = NULL; + + d->data = bm2.ptr; + + bid->mem[bid->n_mem++] = bm2; + + pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); } else if (d->type == t->data.MemPtr) { - d->data = SPA_MEMBER(bid->ptr, - bid->map.start + SPA_PTR_TO_INT(d->data), void); + int offs = SPA_PTR_TO_INT(d->data); + d->data = SPA_MEMBER(bmem.ptr, offs, void); d->fd = -1; pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); } else { pw_log_warn("unknown buffer data type %d", d->type); } } - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, add_buffer, bid->id); - } - add_async_complete(stream, seq, 0); + if (impl->direction == SPA_DIRECTION_OUTPUT) { + SPA_FLAG_UNSET(bid->flags, BUFFER_FLAG_OUT); + spa_list_append(&impl->free, &bid->link); + } else { + SPA_FLAG_SET(bid->flags, BUFFER_FLAG_OUT); + } + + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + add_buffer, bid->id); + } if (n_buffers) stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); else { - clear_mems(stream); stream_set_state(stream, PW_STREAM_STATE_READY, NULL); } + + exit: + add_async_complete(stream, seq, res); + return; + + error: + clear_buffers(stream); + goto exit; } static void @@ -996,6 +1012,21 @@ client_node_port_command(void *data, pw_log_warn("port command not supported"); } +static void clean_transport(struct pw_stream *stream) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + if (impl->trans == NULL) + return; + + unhandle_socket(stream); + + clear_buffers(stream); + + pw_client_node_transport_destroy(impl->trans); + impl->trans = NULL; +} + static void client_node_transport(void *data, uint32_t node_id, int readfd, int writefd, struct pw_client_node_transport *transport) @@ -1005,8 +1036,8 @@ static void client_node_transport(void *data, uint32_t node_id, stream->node_id = node_id; - if (impl->trans) - pw_client_node_transport_destroy(impl->trans); + clean_transport(stream); + impl->trans = transport; pw_log_info("stream %p: create client transport %p with fds %d %d for node %u", @@ -1030,7 +1061,7 @@ static void client_node_port_set_io(void *data, struct pw_stream *stream = &impl->this; struct pw_core *core = stream->remote->core; struct pw_type *t = &core->type; - struct mem_id *m; + struct mem *m; void *ptr; int res; @@ -1045,10 +1076,15 @@ static void client_node_port_set_io(void *data, res = -EINVAL; goto exit; } - if ((ptr = mem_map(stream, m, offset, size)) == NULL) { - res = -errno; - goto exit; + if (m->ptr == NULL) { + m->ptr = mem_map(stream, &m->map, m->fd, + PROT_READ|PROT_WRITE, offset, size); + if (m->ptr == NULL) { + res = -errno; + goto exit; + } } + ptr = m->ptr; } if (id == t->io.Buffers) { @@ -1129,8 +1165,12 @@ pw_stream_connect(struct pw_stream *stream, if (impl->node_proxy == NULL) return -ENOMEM; - pw_client_node_proxy_add_listener(impl->node_proxy, &impl->node_listener, &client_node_events, impl); - pw_proxy_add_listener((struct pw_proxy*)impl->node_proxy, &impl->proxy_listener, &proxy_events, impl); + pw_client_node_proxy_add_listener(impl->node_proxy, + &impl->node_listener, + &client_node_events, impl); + pw_proxy_add_listener((struct pw_proxy*)impl->node_proxy, + &impl->proxy_listener, + &proxy_events, impl); do_node_init(stream); @@ -1160,7 +1200,6 @@ pw_stream_finish_format(struct pw_stream *stream, if (!impl->format) { clear_buffers(stream); - clear_mems(stream); } } add_async_complete(stream, impl->pending_seq, res); @@ -1174,7 +1213,7 @@ int pw_stream_disconnect(struct pw_stream *stream) impl->disconnecting = true; - unhandle_socket(stream); + clean_transport(stream); if (impl->node_proxy) { pw_client_node_proxy_destroy(impl->node_proxy); @@ -1213,47 +1252,43 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; if (spa_list_is_empty(&impl->free)) return SPA_ID_INVALID; - bid = spa_list_first(&impl->free, struct buffer_id, link); + b = spa_list_first(&impl->free, struct buffer, link); - return bid->id; + return b->id; } int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; - if ((bid = find_buffer(stream, id)) == NULL || !bid->used) + if ((b = find_buffer(stream, id)) == NULL || + !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) return -EINVAL; - bid->used = false; - spa_list_append(&impl->free, &bid->link); + SPA_FLAG_UNSET(b->flags, BUFFER_FLAG_OUT); + spa_list_append(&impl->free, &b->link); if (impl->in_new_buffer) { - int i; - - for (i = 0; i < impl->trans->area->n_input_ports; i++) { - struct spa_io_buffers *io = impl->io; - io->buffer_id = id; - } + struct spa_io_buffers *io = impl->io; + io->buffer_id = id; } else { send_reuse_buffer(stream, id); } - return 0; } struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id) { - struct buffer_id *bid; + struct buffer *b; - if ((bid = find_buffer(stream, id))) - return bid->buf; + if ((b = find_buffer(stream, id))) + return b->buf; return NULL; } @@ -1261,7 +1296,7 @@ struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id) int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; if (impl->io->buffer_id != SPA_ID_INVALID) { pw_log_debug("can't send %u, pending buffer %u", id, @@ -1269,14 +1304,13 @@ int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) return -EIO; } - if ((bid = find_buffer(stream, id)) && !bid->used) { - bid->used = true; - spa_list_remove(&bid->link); + if ((b = find_buffer(stream, id)) && !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_OUT)) { + SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT); + spa_list_remove(&b->link); impl->io->buffer_id = id; impl->io->status = SPA_STATUS_HAVE_BUFFER; pw_log_trace("stream %p: send buffer %d", stream, id); - if (!impl->in_need_buffer) - send_have_output(stream); + send_have_output(stream); } else { pw_log_debug("stream %p: output %u was used", stream, id); } diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 5d99dab60..fda1a9616 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -215,6 +215,7 @@ enum pw_stream_flags { PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for * this stream */ PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */ + PW_STREAM_FLAG_MAP_BUFFERS = (1 << 3), /**< mmap the buffers */ }; /** A time structure \memberof pw_stream */