stream: schedule process from main thread when asked

Use 2 lockfree queues so that we can queue and dequeue from different
threads.
Call the process function from the main thread when requested
Rework the data push and pull logic to go through the queues
Update the examples for this new feature, video-play does not need
to schedule the process in the main thread anymore and block for it
to complete, this avoid a deadlock between the data and mainloop when
shutting down.
This commit is contained in:
Wim Taymans 2018-07-11 17:51:57 +02:00
parent 67e567b9c7
commit bcddac6e2b
3 changed files with 253 additions and 173 deletions

View file

@ -87,12 +87,13 @@ static void handle_events(struct data *data)
}
}
static int
do_render(struct spa_loop *loop, bool async, uint32_t seq,
const void *_data, size_t size, void *user_data)
static void
on_stream_process(void *_data)
{
struct data *data = user_data;
struct spa_buffer *buf = ((struct spa_buffer **) _data)[0];
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct pw_buffer *buf;
struct spa_buffer *b;
uint8_t *map;
void *sdata, *ddata;
int sstride, dstride, ostride;
@ -101,22 +102,28 @@ do_render(struct spa_loop *loop, bool async, uint32_t seq,
handle_events(data);
if (buf->datas[0].type == data->t->data.MemFd ||
buf->datas[0].type == data->t->data.DmaBuf) {
map = mmap(NULL, buf->datas[0].maxsize + buf->datas[0].mapoffset, PROT_READ,
MAP_PRIVATE, buf->datas[0].fd, 0);
sdata = SPA_MEMBER(map, buf->datas[0].mapoffset, uint8_t);
} else if (buf->datas[0].type == data->t->data.MemPtr) {
buf = pw_stream_dequeue_buffer(stream);
if (buf == NULL)
return;
b = buf->buffer;
if (b->datas[0].type == data->t->data.MemFd ||
b->datas[0].type == data->t->data.DmaBuf) {
map = mmap(NULL, b->datas[0].maxsize + b->datas[0].mapoffset, PROT_READ,
MAP_PRIVATE, b->datas[0].fd, 0);
sdata = SPA_MEMBER(map, b->datas[0].mapoffset, uint8_t);
} else if (b->datas[0].type == data->t->data.MemPtr) {
map = NULL;
sdata = buf->datas[0].data;
sdata = b->datas[0].data;
} else
return -EINVAL;
return;
if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) {
fprintf(stderr, "Couldn't lock texture: %s\n", SDL_GetError());
return -EIO;
return;
}
sstride = buf->datas[0].chunk->stride;
sstride = b->datas[0].chunk->stride;
ostride = SPA_MIN(sstride, dstride);
src = sdata;
@ -133,23 +140,7 @@ do_render(struct spa_loop *loop, bool async, uint32_t seq,
SDL_RenderPresent(data->renderer);
if (map)
munmap(map, buf->datas[0].maxsize + buf->datas[0].mapoffset);
return 0;
}
static void
on_stream_process(void *_data)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct pw_buffer *buf;
buf = pw_stream_dequeue_buffer(stream);
pw_loop_invoke(pw_main_loop_get_loop(data->loop), do_render,
SPA_ID_INVALID, &buf->buffer, sizeof(struct spa_buffer *),
true, data);
munmap(map, b->datas[0].maxsize + b->datas[0].mapoffset);
pw_stream_queue_buffer(stream, buf);
}
@ -366,7 +357,8 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo
pw_stream_connect(data->stream,
PW_DIRECTION_INPUT,
data->path,
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_INACTIVE,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_INACTIVE,
params, 1);
break;
}

View file

@ -235,7 +235,8 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo
pw_stream_connect(data->stream,
PW_DIRECTION_OUTPUT,
NULL, PW_STREAM_FLAG_NONE,
NULL,
PW_STREAM_FLAG_DRIVER,
params, 1);
break;
}

View file

@ -24,6 +24,7 @@
#include <errno.h>
#include <time.h>
#include "spa/utils/ringbuffer.h"
#include "spa/lib/debug.h"
#include "pipewire/pipewire.h"
@ -37,10 +38,11 @@
/** \cond */
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 32
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
#define MAX_BUFFERS 64
#define MASK_BUFFERS (MAX_BUFFERS-1)
#define MIN_QUEUED 1
#define MAX_PORTS 1
struct mem {
uint32_t id;
@ -53,7 +55,7 @@ struct mem {
struct buffer {
struct pw_buffer buffer;
struct spa_list link;
uint32_t id;
bool queued;
void *ptr;
struct pw_map_range map;
@ -61,6 +63,11 @@ struct buffer {
struct mem **mem;
};
struct queue {
uint32_t ids[MAX_BUFFERS];
struct spa_ringbuffer ring;
};
struct stream {
struct pw_stream this;
@ -94,15 +101,18 @@ struct stream {
struct spa_source *timeout_source;
struct pw_array mem_ids;
struct pw_array buffer_ids;
bool in_order;
struct spa_io_buffers *io;
bool client_reuse;
struct spa_list queue;
struct queue dequeue;
struct queue queue;
bool in_process;
struct buffer buffers[MAX_BUFFERS];
int n_buffers;
int64_t last_ticks;
int32_t last_rate;
int64_t last_monotonic;
@ -184,11 +194,16 @@ static void clear_buffers(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
int i;
pw_log_debug("stream %p: clear buffers", stream);
pw_array_for_each(b, &impl->buffer_ids) {
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, &b->buffer);
for (i = 0; i < impl->n_buffers; i++) {
b = &impl->buffers[i];
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
remove_buffer, &b->buffer);
if (b->ptr != NULL)
if (munmap(b->ptr, b->map.size) < 0)
pw_log_warn("failed to unmap buffer: %m");
@ -197,9 +212,48 @@ static void clear_buffers(struct pw_stream *stream)
b->buffer.buffer = NULL;
b->queued = false;
}
impl->buffer_ids.size = 0;
impl->in_order = true;
spa_list_init(&impl->queue);
impl->n_buffers = 0;
spa_ringbuffer_init(&impl->queue.ring);
spa_ringbuffer_init(&impl->dequeue.ring);
}
static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
{
uint32_t index;
int32_t filled;
if (buffer->queued)
return -EINVAL;
filled = spa_ringbuffer_get_write_index(&queue->ring, &index);
buffer->queued = true;
queue->ids[index & MASK_BUFFERS] = buffer->id;
spa_ringbuffer_write_update(&queue->ring, index + 1);
pw_log_trace("stream %p: queued buffer %d %d", stream, buffer->id, filled);
return filled;
}
static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue)
{
int32_t avail;
uint32_t index, id;
struct buffer *buffer;
if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) < MIN_QUEUED)
return NULL;
id = queue->ids[index & MASK_BUFFERS];
spa_ringbuffer_read_update(&queue->ring, index + 1);
buffer = &stream->buffers[id];
buffer->queued = false;
pw_log_trace("stream %p: dequeued buffer %d %d", stream, id, avail);
return buffer;
}
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error)
@ -222,6 +276,37 @@ static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state stat
return res;
}
static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (id < impl->n_buffers)
return &impl->buffers[id];
return NULL;
}
static int
do_call_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
struct pw_stream *stream = &impl->this;
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, process);
impl->in_process = false;
return 0;
}
static void call_process(struct stream *impl)
{
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) {
do_call_process(NULL, false, 1, NULL, 0, impl);
}
else {
pw_loop_invoke(impl->this.remote->core->main_loop,
do_call_process, 1, NULL, 0, false, impl);
}
}
const char *pw_stream_state_as_string(enum pw_stream_state state)
{
switch (state) {
@ -281,10 +366,11 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
pw_array_init(&impl->mem_ids, 64);
pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem) * 64);
pw_array_init(&impl->buffer_ids, 32);
pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer) * 64);
impl->pending_seq = SPA_ID_INVALID;
spa_list_init(&impl->queue);
spa_ringbuffer_init(&impl->queue.ring);
spa_ringbuffer_init(&impl->dequeue.ring);
spa_list_append(&remote->stream_list, &this->link);
@ -416,7 +502,6 @@ void pw_stream_destroy(struct pw_stream *stream)
free(stream->error);
clear_buffers(stream);
pw_array_clear(&impl->buffer_ids);
clear_mems(stream);
pw_array_clear(&impl->mem_ids);
@ -480,6 +565,7 @@ static inline void send_need_input(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1;
pw_log_trace("send");
pw_client_node_transport_add_message(impl->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_NEED_INPUT));
write(impl->rtwritefd, &cmd, 8);
@ -490,6 +576,7 @@ static inline void send_have_output(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1;
pw_log_trace("send");
pw_client_node_transport_add_message(impl->trans,
&PW_CLIENT_NODE_MESSAGE_INIT(PW_CLIENT_NODE_MESSAGE_HAVE_OUTPUT));
write(impl->rtwritefd, &cmd, 8);
@ -500,6 +587,7 @@ static inline void send_reuse_buffer(struct pw_stream *stream, uint32_t id)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
uint64_t cmd = 1;
pw_log_trace("send");
pw_client_node_transport_add_message(impl->trans, (struct pw_client_node_message*)
&PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER_INIT(impl->port_id, id));
write(impl->rtwritefd, &cmd, 8);
@ -547,92 +635,114 @@ static void on_timeout(void *data, uint64_t expirations)
add_request_clock_update(stream);
}
static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer)) {
return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer);
} else {
struct buffer *b;
pw_array_for_each(b, &impl->buffer_ids) {
if (b->buffer.buffer->id == id)
return b;
}
}
return NULL;
}
static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if ((b = find_buffer(stream, id)) && !b->queued) {
if ((b = get_buffer(stream, id)) && !b->queued) {
pw_log_trace("stream %p: reuse buffer %u", stream, id);
spa_list_append(&impl->queue, &b->link);
b->queued = true;
push_queue(impl, &impl->dequeue, b);
}
}
static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_node_message *message)
static int process_input(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
{
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i];
struct buffer *b;
uint32_t buffer_id;
int status;
buffer_id = input->buffer_id;
status = input->status;
pw_log_trace("stream %p: process input %d %d", stream, input->status,
pw_log_trace("stream %p: process input %d %d", stream, status,
buffer_id);
if ((b = find_buffer(stream, buffer_id)) == NULL)
continue;
if (status != SPA_STATUS_HAVE_BUFFER)
goto done;
if (impl->client_reuse)
input->buffer_id = SPA_ID_INVALID;
if ((b = get_buffer(stream, buffer_id)) == NULL)
goto done;
if (input->status == SPA_STATUS_HAVE_BUFFER) {
spa_list_append(&impl->queue, &b->link);
b->queued = true;
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
process);
impl->in_process = false;
}
if (push_queue(impl, &impl->dequeue, b) >= 0)
call_process(impl);
done:
/* pop buffer to recycle if we can */
b = pop_queue(impl, &impl->queue);
input->buffer_id = b ? b->id : SPA_ID_INVALID;
input->status = SPA_STATUS_NEED_BUFFER;
pw_log_trace("stream %p: reuse %d", stream, input->buffer_id);
}
send_need_input(stream);
break;
}
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
{
int i;
return SPA_STATUS_NEED_BUFFER;
}
static int process_output(struct pw_stream *stream)
{
int i, res = 0;
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
for (i = 0; i < impl->trans->area->n_output_ports; i++) {
struct spa_io_buffers *output = &impl->trans->outputs[i];
struct spa_io_buffers *io = &impl->trans->outputs[i];
struct buffer *b;
uint32_t index;
if (output->buffer_id == SPA_ID_INVALID)
continue;
again:
pw_log_trace("stream %p: process out %d %d", stream,
io->status, io->buffer_id);
reuse_buffer(stream, output->buffer_id);
output->buffer_id = SPA_ID_INVALID;
if (io->status != SPA_STATUS_HAVE_BUFFER) {
/* recycle old buffer */
if ((b = get_buffer(stream, io->buffer_id)) != NULL)
push_queue(impl, &impl->dequeue, b);
/* pop new buffer */
if ((b = pop_queue(impl, &impl->queue)) != NULL) {
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: pop %d %p", stream, b->id, io);
} else {
io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_NEED_BUFFER;
pw_log_trace("stream %p: no more buffers %p", stream, io);
}
pw_log_trace("stream %p: process output", stream);
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, process);
impl->in_process = false;
}
if (!SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) {
call_process(impl);
if (spa_ringbuffer_get_read_index(&impl->queue.ring, &index) >= MIN_QUEUED &&
io->status == SPA_STATUS_NEED_BUFFER)
goto again;
}
res = io->status;
}
return res;
}
static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_node_message *message)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
pw_log_trace("stream %p: %d", stream, PW_CLIENT_NODE_MESSAGE_TYPE(message));
switch (PW_CLIENT_NODE_MESSAGE_TYPE(message)) {
case PW_CLIENT_NODE_MESSAGE_PROCESS_INPUT:
if (process_input(stream) == SPA_STATUS_NEED_BUFFER)
send_need_input(stream);
break;
}
case PW_CLIENT_NODE_MESSAGE_PROCESS_OUTPUT:
if (process_output(stream) == SPA_STATUS_HAVE_BUFFER)
send_have_output(stream);
break;
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER:
{
struct pw_client_node_message_port_reuse_buffer *p =
@ -643,6 +753,7 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
if (impl->direction != SPA_DIRECTION_OUTPUT)
return;
reuse_buffer(stream, p->body.buffer_id.value);
break;
}
@ -690,10 +801,12 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, stream);
/*
impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream);
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false);
*/
return;
}
@ -745,10 +858,7 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma
send_need_input(stream);
}
else {
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
process);
impl->in_process = false;
call_process(impl);
}
stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL);
}
@ -863,7 +973,7 @@ client_node_port_use_buffers(void *data,
struct pw_core *core = stream->remote->core;
struct pw_type *t = &core->type;
struct buffer *bid;
uint32_t i, j, len;
uint32_t i, j;
struct spa_buffer *b;
int prot;
@ -881,18 +991,13 @@ client_node_port_use_buffers(void *data,
continue;
}
len = pw_array_get_len(&impl->buffer_ids, struct buffer);
bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer));
if (impl->direction == SPA_DIRECTION_OUTPUT) {
bid->queued = true;
spa_list_append(&impl->queue, &bid->link);
} else {
bid = &impl->buffers[i];
bid->id = i;
bid->queued = false;
}
b = buffers[i].buffer;
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize);
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size,
core->sc_pagesize);
bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, m->fd, bid->map.offset);
if (bid->ptr == MAP_FAILED) {
@ -928,10 +1033,6 @@ client_node_port_use_buffers(void *data,
bid->mem[bid->n_mem++] = m;
}
if (b->id != len) {
pw_log_warn("unexpected id %u found, expected %u", b->id, len);
impl->in_order = false;
}
pw_log_debug("add buffer %d %d %u %u", m->id,
b->id, bid->map.offset, bid->map.size);
@ -967,12 +1068,17 @@ client_node_port_use_buffers(void *data,
pw_log_warn("unknown buffer data type %d", d->type);
}
}
if (impl->direction == SPA_DIRECTION_OUTPUT)
push_queue(impl, &impl->dequeue, bid);
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
add_buffer, &bid->buffer);
}
add_async_complete(stream, seq, 0);
impl->n_buffers = n_buffers;
if (n_buffers)
stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
else {
@ -1224,13 +1330,11 @@ struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if (spa_list_is_empty(&impl->queue))
if ((b = pop_queue(impl, &impl->dequeue)) == NULL) {
pw_log_trace("stream %p: no more buffers", stream);
return NULL;
b = spa_list_first(&impl->queue, struct buffer, link);
b->queued = false;
spa_list_remove(&b->link);
}
pw_log_trace("stream %p: dequeue buffer %d", stream, b->id);
return &b->buffer;
}
@ -1239,42 +1343,25 @@ int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
uint32_t id;
int res;
if ((b = find_buffer(stream, buffer->buffer->id)) == NULL)
if ((b = get_buffer(stream, buffer->buffer->id)) == NULL)
return -EINVAL;
if (b->queued)
return -EINVAL;
id = buffer->buffer->id;
pw_log_trace("stream %p: queue buffer %d", stream, b->id);
if ((res = push_queue(impl, &impl->queue, b)) < 0)
return res;
if (impl->direction == SPA_DIRECTION_OUTPUT) {
if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) {
pw_log_debug("can't send %u, pending buffer %u", id,
impl->trans->outputs[0].buffer_id);
return -EIO;
}
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: send buffer %d", stream, id);
if (!impl->in_process)
if (res == 0 &&
SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER) &&
process_output(stream) == SPA_STATUS_HAVE_BUFFER)
send_have_output(stream);
}
else {
b->queued = true;
spa_list_append(&impl->queue, &b->link);
if (impl->in_process) {
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i];
input->buffer_id = id;
}
}
else {
send_reuse_buffer(stream, id);
}
if (impl->client_reuse)
if ((b = pop_queue(impl, &impl->queue)))
send_reuse_buffer(stream, b->id);
}
return 0;
}