Merge branch 'columbarius/vulkan-async' into 'master'

Draft: vulkan: Avoid blocking waits on the gpu on blit

See merge request pipewire/pipewire!1944
This commit is contained in:
columbarius 2024-04-08 21:56:40 +00:00
commit d53734e966
4 changed files with 128 additions and 16 deletions

View file

@ -75,7 +75,11 @@ struct impl {
struct spa_hook_list hooks;
struct spa_callbacks callbacks;
struct spa_loop *data_loop;
struct spa_source source;
bool started;
bool async;
struct vulkan_blit_state state;
struct port port[2];
@ -750,6 +754,50 @@ static int impl_node_port_reuse_buffer(void *object, uint32_t port_id, uint32_t
return 0;
}
int export_buffer(struct impl *this, struct buffer *b)
{
struct port *outport;
struct spa_io_buffers *outio;
outport = &this->port[SPA_DIRECTION_OUTPUT];
outio = outport->io;
/* update data on output buffer */
b->outbuf->datas[0].chunk->offset = 0;
b->outbuf->datas[0].chunk->size = b->outbuf->datas[0].maxsize;
if (outport->current_format.media_subtype == SPA_MEDIA_SUBTYPE_raw) {
b->outbuf->datas[0].chunk->stride =
this->state.streams[outport->stream_id].bpp * outport->current_format.info.raw.size.width;
} else {
b->outbuf->datas[0].chunk->stride = this->position->video.stride;
}
/* export output buffer */
outio->buffer_id = b->id;
outio->status = SPA_STATUS_HAVE_DATA;
return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
}
void export_buffer_async(struct spa_source *source)
{
struct impl *this = source->data;
struct port *outport;
struct buffer *b;
spa_log_info(this->log, "rendering finished %d", source->fd);
spa_loop_remove_source(this->data_loop, source);
if (source->fd >= 0)
close(source->fd);
spa_vulkan_blit_finish(&this->state);
outport = &this->port[SPA_DIRECTION_OUTPUT];
b = &outport->buffers[this->state.streams[outport->stream_id].current_buffer_id];
spa_node_call_ready(&this->callbacks, export_buffer(this, b));
}
static int impl_node_process(void *object)
{
struct impl *this = object;
@ -759,6 +807,7 @@ static int impl_node_process(void *object)
spa_return_val_if_fail(this != NULL, -EINVAL);
/* check input buffer */
inport = &this->port[SPA_DIRECTION_INPUT];
if ((inio = inport->io) == NULL)
return -EIO;
@ -771,6 +820,7 @@ static int impl_node_process(void *object)
return -EINVAL;
}
/* check output buffer */
outport = &this->port[SPA_DIRECTION_OUTPUT];
if ((outio = outport->io) == NULL)
return -EIO;
@ -787,32 +837,45 @@ static int impl_node_process(void *object)
spa_log_debug(this->log, "%p: out of buffers", this);
return -EPIPE;
}
/* attach input buffer */
b = &inport->buffers[inio->buffer_id];
this->state.streams[inport->stream_id].pending_buffer_id = b->id;
inio->status = SPA_STATUS_NEED_DATA;
/* attach output buffer */
b = spa_list_first(&outport->empty, struct buffer, link);
spa_list_remove(&b->link);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_OUT);
this->state.streams[outport->stream_id].pending_buffer_id = b->id;
/* process */
spa_log_debug(this->log, "filter into %d", b->id);
spa_vulkan_blit_process(&this->state);
b->outbuf->datas[0].chunk->offset = 0;
b->outbuf->datas[0].chunk->size = b->outbuf->datas[0].maxsize;
if (outport->current_format.media_subtype == SPA_MEDIA_SUBTYPE_raw) {
b->outbuf->datas[0].chunk->stride =
this->state.streams[outport->stream_id].bpp * outport->current_format.info.raw.size.width;
if (!this->async) {
spa_vulkan_blit_process(&this->state);
return export_buffer(this, b);
} else {
b->outbuf->datas[0].chunk->stride = this->position->video.stride;
int sync_fd;
spa_vulkan_blit_queue(&this->state, &sync_fd);
if (sync_fd < 0) {
spa_log_error(this->log, "invalid sync_fd %d", sync_fd);
spa_node_call_ready(&this->callbacks, SPA_STATUS_NEED_DATA);
return 0;
}
this->source.fd = sync_fd;
this->source.data = this;
this->source.func = export_buffer_async;
this->source.loop = this->data_loop;
this->source.mask = SPA_IO_IN | SPA_IO_ERR;
this->source.rmask = 0;
spa_log_info(this->log, "wait on fd %d", sync_fd);
spa_loop_add_source(this->data_loop, &this->source);
return 0;
}
outio->buffer_id = b->id;
outio->status = SPA_STATUS_HAVE_DATA;
return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
}
static const struct spa_node_methods impl_node = {
@ -878,6 +941,7 @@ impl_init(const struct spa_handle_factory *factory,
{
struct impl *this;
struct port *port;
const char *str;
spa_return_val_if_fail(factory != NULL, -EINVAL);
spa_return_val_if_fail(handle != NULL, -EINVAL);
@ -890,6 +954,15 @@ impl_init(const struct spa_handle_factory *factory,
this->log = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_Log);
this->state.log = this->log;
if ((str = spa_dict_lookup(info, "vulkan.render_mode"))) {
spa_log_info(this->log, "vulkan.render_mode: %s", str);
if (strcmp(str, "async") == 0)
this->async = true;
}
if (this->async)
this->data_loop = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_DataLoop);
spa_hook_list_init(&this->hooks);
this->node.iface = SPA_INTERFACE_INIT(
@ -904,6 +977,8 @@ impl_init(const struct spa_handle_factory *factory,
this->info.max_output_ports = 1;
this->info.max_input_ports = 1;
this->info.flags = SPA_NODE_FLAG_RT;
if (this->async)
this->info.flags |= SPA_NODE_FLAG_ASYNC;
this->params[0] = SPA_PARAM_INFO(SPA_PARAM_PropInfo, SPA_PARAM_INFO_READ);
this->params[1] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_READWRITE);
this->info.params = this->params;

View file

@ -103,7 +103,7 @@ static int runExportSHMBuffers(struct vulkan_blit_state *s) {
* ret = 0: queueSubmit was succsessful, but manual synchronization is required
* ret = 1: queueSubmit was succsessful and buffers can be released without synchronization
*/
static int runCommandBuffer(struct vulkan_blit_state *s)
static int runCommandBuffer(struct vulkan_blit_state *s, int *fd)
{
VULKAN_INSTANCE_FUNCTION(vkQueueSubmit2KHR);
VULKAN_INSTANCE_FUNCTION(vkGetSemaphoreFdKHR);
@ -284,7 +284,11 @@ static int runCommandBuffer(struct vulkan_blit_state *s)
ret = 0;
}
}
close(sync_file_fd);
if (fd) {
*fd = sync_file_fd;
} else {
close(sync_file_fd);
}
return ret;
}
@ -568,13 +572,43 @@ int spa_vulkan_blit_process(struct vulkan_blit_state *s)
return -1;
}
CHECK(updateBuffers(s));
CHECK(runCommandBuffer(s));
CHECK(runCommandBuffer(s, NULL));
CHECK(vulkan_wait_idle(&s->base));
CHECK(runExportSHMBuffers(s));
return 0;
}
int spa_vulkan_blit_queue(struct vulkan_blit_state *s, int *fd)
{
if (!s->initialized) {
spa_log_warn(s->log, "Renderer not initialized");
return -1;
}
if (!s->prepared) {
spa_log_warn(s->log, "Renderer not prepared");
return -1;
}
CHECK(updateBuffers(s));
CHECK(runCommandBuffer(s, fd));
return 0;
}
int spa_vulkan_blit_finish(struct vulkan_blit_state *s)
{
if (!s->initialized) {
spa_log_warn(s->log, "Renderer not initialized");
return -1;
}
if (!s->prepared) {
spa_log_warn(s->log, "Renderer not prepared");
return -1;
}
CHECK(runExportSHMBuffers(s));
return 0;
}
int spa_vulkan_blit_get_buffer_caps(struct vulkan_blit_state *s, enum spa_direction direction)
{
switch (direction) {

View file

@ -72,6 +72,8 @@ int spa_vulkan_blit_start(struct vulkan_blit_state *s);
int spa_vulkan_blit_stop(struct vulkan_blit_state *s);
int spa_vulkan_blit_ready(struct vulkan_blit_state *s);
int spa_vulkan_blit_process(struct vulkan_blit_state *s);
int spa_vulkan_blit_queue(struct vulkan_blit_state *s, int *fd);
int spa_vulkan_blit_finish(struct vulkan_blit_state *s);
int spa_vulkan_blit_cleanup(struct vulkan_blit_state *s);
int spa_vulkan_blit_get_buffer_caps(struct vulkan_blit_state *s, enum spa_direction direction);

View file

@ -96,4 +96,5 @@ context.objects = [
{ factory = spa-node-factory args = { factory.name = api.vulkan.compute.filter node.name = vulkan-compute-filter object.export = true } }
{ factory = spa-node-factory args = { factory.name = api.vulkan.blit.filter node.name = vulkan-blit-filter object.export = true } }
{ factory = spa-node-factory args = { factory.name = api.vulkan.blit.dsp-filter node.name = vulkan-blit-dsp-filter object.export = true } }
{ factory = spa-node-factory args = { factory.name = api.vulkan.blit.filter node.name = vulkan-blit-filter-async object.export = true vulkan.render_mode = async } }
]