From c362b1ccc50b52dbe4359820293befc8cdc3f641 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 12 Jul 2018 11:05:53 +0200 Subject: [PATCH] stream: implement MAP_BUFFERS video-play let the stream map our buffers --- src/examples/video-play.c | 22 +++------ src/pipewire/stream.c | 100 +++++++++++++++++++++++++++++--------- 2 files changed, 83 insertions(+), 39 deletions(-) diff --git a/src/examples/video-play.c b/src/examples/video-play.c index aef95caf4..49b9cd0cd 100644 --- a/src/examples/video-play.c +++ b/src/examples/video-play.c @@ -94,7 +94,6 @@ on_stream_process(void *_data) struct pw_stream *stream = data->stream; struct pw_buffer *buf; struct spa_buffer *b; - uint8_t *map; void *sdata, *ddata; int sstride, dstride, ostride; uint32_t i; @@ -108,20 +107,12 @@ on_stream_process(void *_data) b = buf->buffer; - if (b->datas[0].type == data->t->data.MemFd || - b->datas[0].type == data->t->data.DmaBuf) { - map = mmap(NULL, b->datas[0].maxsize + b->datas[0].mapoffset, PROT_READ, - MAP_PRIVATE, b->datas[0].fd, 0); - sdata = SPA_MEMBER(map, b->datas[0].mapoffset, uint8_t); - } else if (b->datas[0].type == data->t->data.MemPtr) { - map = NULL; - sdata = b->datas[0].data; - } else - return; + if ((sdata = b->datas[0].data) == NULL) + goto done; if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) { fprintf(stderr, "Couldn't lock texture: %s\n", SDL_GetError()); - return; + goto done; } sstride = b->datas[0].chunk->stride; ostride = SPA_MIN(sstride, dstride); @@ -139,9 +130,7 @@ on_stream_process(void *_data) SDL_RenderCopy(data->renderer, data->texture, NULL, NULL); SDL_RenderPresent(data->renderer); - if (map) - munmap(map, b->datas[0].maxsize + b->datas[0].mapoffset); - + done: pw_stream_queue_buffer(stream, buf); } @@ -365,7 +354,8 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo PW_DIRECTION_INPUT, data->path, PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_INACTIVE, + PW_STREAM_FLAG_INACTIVE | + PW_STREAM_FLAG_MAP_BUFFERS, params, 1); break; } diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index ecb5db457..9b1064cb3 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -56,7 +56,9 @@ struct mem { struct buffer { struct pw_buffer buffer; uint32_t id; - bool queued; +#define BUFFER_FLAG_MAPPED (1 << 0) +#define BUFFER_FLAG_QUEUED (1 << 1) + uint32_t flags; void *ptr; struct pw_map_range map; uint32_t n_mem; @@ -190,11 +192,45 @@ static void clear_mems(struct pw_stream *stream) impl->mem_ids.size = 0; } +static int map_data(struct stream *impl, struct spa_data *data, int prot) +{ + void *ptr; + struct pw_map_range range; + + pw_map_range_init(&range, data->mapoffset, data->maxsize, + impl->this.remote->core->sc_pagesize); + + ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset); + if (ptr == MAP_FAILED) { + pw_log_error("stream %p: failed to mmap buffer mem: %m", impl); + return -errno; + } + data->data = SPA_MEMBER(ptr, range.start, void); + pw_log_debug("stream %p: fd %d mapped %d %d %p", impl, data->fd, + range.offset, range.size, data->data); + return 0; +} + +static int unmap_data(struct stream *impl, struct spa_data *data) +{ + struct pw_map_range range; + + pw_map_range_init(&range, data->mapoffset, data->maxsize, + impl->this.remote->core->sc_pagesize); + + if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0) + pw_log_warn("failed to unmap: %m"); + + pw_log_debug("stream %p: fd %d unmapped", impl, data->fd); + data->data = NULL; + return 0; +} + static void clear_buffers(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - int i; + int i, j; pw_log_debug("stream %p: clear buffers", stream); @@ -204,13 +240,21 @@ static void clear_buffers(struct pw_stream *stream) spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, &b->buffer); + if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_MAPPED)) { + for (j = 0; j < b->buffer.buffer->n_datas; j++) { + struct spa_data *d = &b->buffer.buffer->datas[j]; + pw_log_debug("stream %p: clear buffer %d mem", + stream, b->id); + unmap_data(impl, d); + } + } + if (b->ptr != NULL) if (munmap(b->ptr, b->map.size) < 0) pw_log_warn("failed to unmap buffer: %m"); b->ptr = NULL; free(b->buffer.buffer); b->buffer.buffer = NULL; - b->queued = false; } impl->n_buffers = 0; spa_ringbuffer_init(&impl->queue.ring); @@ -223,11 +267,11 @@ static inline int push_queue(struct stream *stream, struct queue *queue, struct uint32_t index; int32_t filled; - if (buffer->queued) + if (SPA_FLAG_CHECK(buffer->flags, BUFFER_FLAG_QUEUED)) return -EINVAL; filled = spa_ringbuffer_get_write_index(&queue->ring, &index); - buffer->queued = true; + SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED); queue->ids[index & MASK_BUFFERS] = buffer->id; spa_ringbuffer_write_update(&queue->ring, index + 1); @@ -249,7 +293,7 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu spa_ringbuffer_read_update(&queue->ring, index + 1); buffer = &stream->buffers[id]; - buffer->queued = false; + SPA_FLAG_UNSET(buffer->flags, BUFFER_FLAG_QUEUED); pw_log_trace("stream %p: dequeued buffer %d %d", stream, id, avail); @@ -593,18 +637,6 @@ static inline void send_reuse_buffer(struct pw_stream *stream, uint32_t id) write(impl->rtwritefd, &cmd, 8); } -static void add_request_clock_update(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - - pw_client_node_proxy_event(impl->node_proxy, (struct spa_event *) - &SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(stream->remote->core->type. - event_node. - RequestClockUpdate, - SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, - 0, 0)); -} - static void add_async_complete(struct pw_stream *stream, uint32_t seq, int res) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); @@ -629,18 +661,33 @@ static void do_node_init(struct pw_stream *stream) pw_client_node_proxy_set_active(impl->node_proxy, true); } +#if 0 +static void add_request_clock_update(struct pw_stream *stream) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + + pw_client_node_proxy_event(impl->node_proxy, (struct spa_event *) + &SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(stream->remote->core->type. + event_node. + RequestClockUpdate, + SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME, + 0, 0)); +} + static void on_timeout(void *data, uint64_t expirations) { struct pw_stream *stream = data; add_request_clock_update(stream); } +#endif static inline void reuse_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - if ((b = get_buffer(stream, id)) && !b->queued) { + if ((b = get_buffer(stream, id)) && + !SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_QUEUED)) { pw_log_trace("stream %p: reuse buffer %u", stream, id); push_queue(impl, &impl->dequeue, b); } @@ -793,7 +840,6 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask) static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct timespec interval; impl->rtwritefd = rtwritefd; impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop, @@ -801,12 +847,13 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd) SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, stream); - /* +#if 0 + struct timespec interval; impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream); interval.tv_sec = 0; interval.tv_nsec = 100000000; pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false); - */ +#endif return; } @@ -993,7 +1040,7 @@ client_node_port_use_buffers(void *data, bid = &impl->buffers[i]; bid->id = i; - bid->queued = false; + bid->flags = 0; b = buffers[i].buffer; pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, @@ -1059,6 +1106,12 @@ client_node_port_use_buffers(void *data, bm->ref++; bid->mem[bid->n_mem++] = bm; pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); + + if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS)) { + if (map_data(impl, d, prot) < 0) + return; + SPA_FLAG_SET(bid->flags, BUFFER_FLAG_MAPPED); + } } else if (d->type == t->data.MemPtr) { d->data = SPA_MEMBER(bid->ptr, bid->map.start + SPA_PTR_TO_INT(d->data), void); @@ -1068,6 +1121,7 @@ client_node_port_use_buffers(void *data, pw_log_warn("unknown buffer data type %d", d->type); } } + if (impl->direction == SPA_DIRECTION_OUTPUT) push_queue(impl, &impl->dequeue, bid);