stream: implement MAP_BUFFERS

video-play let the stream map our buffers
This commit is contained in:
Wim Taymans 2018-07-12 11:05:53 +02:00
parent a2d2d75a25
commit c362b1ccc5
2 changed files with 83 additions and 39 deletions

View file

@ -94,7 +94,6 @@ on_stream_process(void *_data)
struct pw_stream *stream = data->stream;
struct pw_buffer *buf;
struct spa_buffer *b;
uint8_t *map;
void *sdata, *ddata;
int sstride, dstride, ostride;
uint32_t i;
@ -108,20 +107,12 @@ on_stream_process(void *_data)
b = buf->buffer;
if (b->datas[0].type == data->t->data.MemFd ||
b->datas[0].type == data->t->data.DmaBuf) {
map = mmap(NULL, b->datas[0].maxsize + b->datas[0].mapoffset, PROT_READ,
MAP_PRIVATE, b->datas[0].fd, 0);
sdata = SPA_MEMBER(map, b->datas[0].mapoffset, uint8_t);
} else if (b->datas[0].type == data->t->data.MemPtr) {
map = NULL;
sdata = b->datas[0].data;
} else
return;
if ((sdata = b->datas[0].data) == NULL)
goto done;
if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) {
fprintf(stderr, "Couldn't lock texture: %s\n", SDL_GetError());
return;
goto done;
}
sstride = b->datas[0].chunk->stride;
ostride = SPA_MIN(sstride, dstride);
@ -139,9 +130,7 @@ on_stream_process(void *_data)
SDL_RenderCopy(data->renderer, data->texture, NULL, NULL);
SDL_RenderPresent(data->renderer);
if (map)
munmap(map, b->datas[0].maxsize + b->datas[0].mapoffset);
done:
pw_stream_queue_buffer(stream, buf);
}
@ -365,7 +354,8 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo
PW_DIRECTION_INPUT,
data->path,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_INACTIVE,
PW_STREAM_FLAG_INACTIVE |
PW_STREAM_FLAG_MAP_BUFFERS,
params, 1);
break;
}

View file

@ -56,7 +56,9 @@ struct mem {
struct buffer {
struct pw_buffer buffer;
uint32_t id;
bool queued;
#define BUFFER_FLAG_MAPPED (1 << 0)
#define BUFFER_FLAG_QUEUED (1 << 1)
uint32_t flags;
void *ptr;
struct pw_map_range map;
uint32_t n_mem;
@ -190,11 +192,45 @@ static void clear_mems(struct pw_stream *stream)
impl->mem_ids.size = 0;
}
static int map_data(struct stream *impl, struct spa_data *data, int prot)
{
void *ptr;
struct pw_map_range range;
pw_map_range_init(&range, data->mapoffset, data->maxsize,
impl->this.remote->core->sc_pagesize);
ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset);
if (ptr == MAP_FAILED) {
pw_log_error("stream %p: failed to mmap buffer mem: %m", impl);
return -errno;
}
data->data = SPA_MEMBER(ptr, range.start, void);
pw_log_debug("stream %p: fd %d mapped %d %d %p", impl, data->fd,
range.offset, range.size, data->data);
return 0;
}
static int unmap_data(struct stream *impl, struct spa_data *data)
{
struct pw_map_range range;
pw_map_range_init(&range, data->mapoffset, data->maxsize,
impl->this.remote->core->sc_pagesize);
if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0)
pw_log_warn("failed to unmap: %m");
pw_log_debug("stream %p: fd %d unmapped", impl, data->fd);
data->data = NULL;
return 0;
}
static void clear_buffers(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
int i;
int i, j;
pw_log_debug("stream %p: clear buffers", stream);
@ -204,13 +240,21 @@ static void clear_buffers(struct pw_stream *stream)
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
remove_buffer, &b->buffer);
if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_MAPPED)) {
for (j = 0; j < b->buffer.buffer->n_datas; j++) {
struct spa_data *d = &b->buffer.buffer->datas[j];
pw_log_debug("stream %p: clear buffer %d mem",
stream, b->id);
unmap_data(impl, d);
}
}
if (b->ptr != NULL)
if (munmap(b->ptr, b->map.size) < 0)
pw_log_warn("failed to unmap buffer: %m");
b->ptr = NULL;
free(b->buffer.buffer);
b->buffer.buffer = NULL;
b->queued = false;
}
impl->n_buffers = 0;
spa_ringbuffer_init(&impl->queue.ring);
@ -223,11 +267,11 @@ static inline int push_queue(struct stream *stream, struct queue *queue, struct
uint32_t index;
int32_t filled;
if (buffer->queued)
if (SPA_FLAG_CHECK(buffer->flags, BUFFER_FLAG_QUEUED))
return -EINVAL;
filled = spa_ringbuffer_get_write_index(&queue->ring, &index);
buffer->queued = true;
SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED);
queue->ids[index & MASK_BUFFERS] = buffer->id;
spa_ringbuffer_write_update(&queue->ring, index + 1);
@ -249,7 +293,7 @@ static inline struct buffer *pop_queue(struct stream *stream, struct queue *queu
spa_ringbuffer_read_update(&queue->ring, index + 1);
buffer = &stream->buffers[id];
buffer->queued = false;
SPA_FLAG_UNSET(buffer->flags, BUFFER_FLAG_QUEUED);
pw_log_trace("stream %p: dequeued buffer %d %d", stream, id, avail);
@ -593,18 +637,6 @@ static inline void send_reuse_buffer(struct pw_stream *stream, uint32_t id)
write(impl->rtwritefd, &cmd, 8);
}
static void add_request_clock_update(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_client_node_proxy_event(impl->node_proxy, (struct spa_event *)
&SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(stream->remote->core->type.
event_node.
RequestClockUpdate,
SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME,
0, 0));
}
static void add_async_complete(struct pw_stream *stream, uint32_t seq, int res)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
@ -629,18 +661,33 @@ static void do_node_init(struct pw_stream *stream)
pw_client_node_proxy_set_active(impl->node_proxy, true);
}
#if 0
static void add_request_clock_update(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_client_node_proxy_event(impl->node_proxy, (struct spa_event *)
&SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_INIT(stream->remote->core->type.
event_node.
RequestClockUpdate,
SPA_EVENT_NODE_REQUEST_CLOCK_UPDATE_TIME,
0, 0));
}
static void on_timeout(void *data, uint64_t expirations)
{
struct pw_stream *stream = data;
add_request_clock_update(stream);
}
#endif
static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if ((b = get_buffer(stream, id)) && !b->queued) {
if ((b = get_buffer(stream, id)) &&
!SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_QUEUED)) {
pw_log_trace("stream %p: reuse buffer %u", stream, id);
push_queue(impl, &impl->dequeue, b);
}
@ -793,7 +840,6 @@ on_rtsocket_condition(void *data, int fd, enum spa_io mask)
static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct timespec interval;
impl->rtwritefd = rtwritefd;
impl->rtsocket_source = pw_loop_add_io(stream->remote->core->data_loop,
@ -801,12 +847,13 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, stream);
/*
#if 0
struct timespec interval;
impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream);
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false);
*/
#endif
return;
}
@ -993,7 +1040,7 @@ client_node_port_use_buffers(void *data,
bid = &impl->buffers[i];
bid->id = i;
bid->queued = false;
bid->flags = 0;
b = buffers[i].buffer;
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size,
@ -1059,6 +1106,12 @@ client_node_port_use_buffers(void *data,
bm->ref++;
bid->mem[bid->n_mem++] = bm;
pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd);
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS)) {
if (map_data(impl, d, prot) < 0)
return;
SPA_FLAG_SET(bid->flags, BUFFER_FLAG_MAPPED);
}
} else if (d->type == t->data.MemPtr) {
d->data = SPA_MEMBER(bid->ptr,
bid->map.start + SPA_PTR_TO_INT(d->data), void);
@ -1068,6 +1121,7 @@ client_node_port_use_buffers(void *data,
pw_log_warn("unknown buffer data type %d", d->type);
}
}
if (impl->direction == SPA_DIRECTION_OUTPUT)
push_queue(impl, &impl->dequeue, bid);