gst: sink: Manage buffer pool memory manually

Let's make sure we own the memory in buffers, so that we can be
resilient to the PW link going away. This currently maintains the status
quo of copying data into the pipewirepool for sending to the remote end,
but moves the allocation of buffers so that ownership is maintained by
the sink in all cases.

There are some tricky corners, especially with bufferpool vs. buffers
param negotiation -- bufferpool parameters can be negotiated in
GStreamer before the link even comes up, so we try to adapt the buffers
param to use the negotiated value. For now, that is more brittle than
tying those two aspects together. We can revisit this if we can find a
way to tie pipeline state and link state more closely.

Co-authored-by: Arun Raghavan <arun@asymptotic.io>
This commit is contained in:
Taruntej Kanakamalla 2025-03-20 21:13:03 -04:00 committed by Wim Taymans
parent 1b258f4ecc
commit ee2c6eb41e
4 changed files with 151 additions and 37 deletions

View file

@ -410,6 +410,7 @@ gst_deps_def = {
gst_dep = []
gst_dma_drm_found = false
gst_shm_allocator_found = false
foreach depname, kwargs: gst_deps_def
dep = dependency(depname, required: gst_option, kwargs: kwargs)
summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules')
@ -425,9 +426,13 @@ foreach depname, kwargs: gst_deps_def
if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1')
gst_dma_drm_found = true
gst_shm_allocator_found = true
endif
endforeach
summary({'gstreamer SHM allocator': gst_shm_allocator_found}, bool_yn: true, section: 'Backend')
cdata.set('HAVE_GSTREAMER_SHM_ALLOCATOR', gst_shm_allocator_found)
# This code relies on the array being empty if any dependency was not found
gst_dp_found = gst_dep.length() > 0
summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend')

View file

@ -10,8 +10,12 @@
#include <gst/allocators/gstfdmemory.h>
#include <gst/allocators/gstdmabuf.h>
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
#include <gst/allocators/gstshmallocator.h>
#endif
#include <gst/video/gstvideometa.h>
#include <gst/video/gstvideopool.h>
#include "gstpipewirepool.h"
@ -61,6 +65,8 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
GstBuffer *buf;
uint32_t i;
GstPipeWirePoolData *data;
/* Default to a large enough value */
gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { pool->video_info.size, };
GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas);
@ -68,6 +74,30 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
buf = gst_buffer_new ();
if (pool->add_metavideo) {
GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf,
GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (&pool->video_info),
GST_VIDEO_INFO_WIDTH (&pool->video_info),
GST_VIDEO_INFO_HEIGHT (&pool->video_info),
GST_VIDEO_INFO_N_PLANES (&pool->video_info),
pool->video_info.offset,
pool->video_info.stride);
gst_video_meta_set_alignment (meta, pool->video_align);
if (!gst_video_meta_get_plane_size (meta, plane_sizes)) {
GST_ERROR_OBJECT (pool, "could not compute plane sizes");
}
/*
* We need to set the video meta as pooled, else gst_buffer_pool_release_buffer
* will call reset_buffer and the default_reset_buffer implementation for
* GstBufferPool removes all metadata without the POOLED flag.
*/
GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED);
}
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *gmem = NULL;
@ -75,7 +105,51 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d",
spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type,
d->mapoffset, d->maxsize);
if (d->type == SPA_DATA_MemFd) {
if (pool->allocate_memory) {
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gsize block_size = d->maxsize;
if (pool->has_video) {
/* For video, we know block sizes from the video info already */
block_size = plane_sizes[i];
} else {
/* For audio, reserve space based on the quantum limit and channel count */
g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&pool->stream);
struct pw_context *context = pw_core_get_context(pw_stream_get_core(s->pwstream));
const struct pw_properties *props = pw_context_get_properties(context);
uint32_t quantum_limit = 8192; /* "reasonable" default */
const char *quantum = spa_dict_lookup(&props->dict, "clock.quantum-limit");
if (!quantum) {
quantum = spa_dict_lookup(&props->dict, "default.clock.quantum-limit");
GST_DEBUG_OBJECT (pool, "using default quantum limit %s", quantum);
}
if (quantum)
spa_atou32(quantum, &quantum_limit, 0);
GST_DEBUG_OBJECT (pool, "quantum limit %s", quantum);
block_size = quantum_limit * pool->audio_info.bpf;
}
GST_DEBUG_OBJECT (pool, "setting block size %lu", block_size);
if (!pool->shm_allocator)
pool->shm_allocator = gst_shm_allocator_get();
/* use MemFd only. That is the only supported data type when memory is remote i.e. allocated by the client */
gmem = gst_allocator_alloc (pool->shm_allocator, block_size, NULL);
d->fd = gst_fd_memory_get_fd (gmem);
d->mapoffset = 0;
d->flags = SPA_DATA_FLAG_READWRITE | SPA_DATA_FLAG_MAPPABLE;
d->type = SPA_DATA_MemFd;
d->maxsize = block_size;
d->data = NULL;
#endif
} else if (d->type == SPA_DATA_MemFd) {
gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
@ -97,20 +171,7 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
gst_buffer_insert_memory (buf, i, gmem);
}
if (pool->add_metavideo) {
GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf,
GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (&pool->video_info),
GST_VIDEO_INFO_WIDTH (&pool->video_info),
GST_VIDEO_INFO_HEIGHT (&pool->video_info),
GST_VIDEO_INFO_N_PLANES (&pool->video_info),
pool->video_info.offset,
pool->video_info.stride);
gsize plane_sizes[GST_VIDEO_MAX_PLANES];
if (!gst_video_meta_get_plane_size (meta, plane_sizes)) {
GST_ERROR_OBJECT (pool, "could not compute plane sizes");
} else {
if (pool->add_metavideo && !pool->allocate_memory) {
/* Set memory sizes to expected plane sizes, so we know the valid size,
* and the offsets in the meta make sense */
for (i = 0; i < gst_buffer_n_memory (buf); i++) {
@ -119,14 +180,6 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
}
}
/*
* We need to set the video meta as pooled, else gst_buffer_pool_release_buffer
* will call reset_buffer and the default_reset_buffer implementation for
* GstBufferPool removes all metadata without the POOLED flag.
*/
GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED);
}
data->pool = gst_object_ref (pool);
data->owner = NULL;
data->header = spa_buffer_find_meta_data (b->buffer, SPA_META_Header, sizeof(*data->header));
@ -157,6 +210,7 @@ void gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, struct pw_buffer *b
data->crop = NULL;
data->videotransform = NULL;
if (!pool->allocate_memory)
gst_buffer_remove_all_memory (data->buf);
/* this will also destroy the pool data, if this is the last reference */
@ -236,7 +290,13 @@ no_more_buffers:
static const gchar **
get_options (GstBufferPool * pool)
{
static const gchar *options[] = { GST_BUFFER_POOL_OPTION_VIDEO_META, NULL };
static const gchar *options[] = {
GST_BUFFER_POOL_OPTION_VIDEO_META,
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT,
#endif
NULL
};
return options;
}
@ -247,7 +307,7 @@ set_config (GstBufferPool * pool, GstStructure * config)
GstCaps *caps;
GstStructure *structure;
guint size, min_buffers, max_buffers;
gboolean has_video;
gboolean has_videoalign;
if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) {
GST_WARNING_OBJECT (pool, "invalid config");
@ -272,15 +332,41 @@ set_config (GstBufferPool * pool, GstStructure * config)
structure = gst_caps_get_structure (caps, 0);
if (g_str_has_prefix (gst_structure_get_name (structure), "video/") ||
g_str_has_prefix (gst_structure_get_name (structure), "image/")) {
has_video = TRUE;
p->has_video = TRUE;
gst_video_info_from_caps (&p->video_info, caps);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo) &&
GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM) {
gst_video_alignment_reset (&p->video_align);
gst_video_info_align (&p->video_info, &p->video_align);
}
#endif
} else if (g_str_has_prefix(gst_structure_get_name(structure), "audio/")) {
p->has_video = FALSE;
gst_audio_info_from_caps(&p->audio_info, caps);
} else {
has_video = FALSE;
g_assert_not_reached ();
}
p->add_metavideo = has_video && gst_buffer_pool_config_has_option (config,
p->add_metavideo = p->has_video && gst_buffer_pool_config_has_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_META);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
has_videoalign = p->has_video && gst_buffer_pool_config_has_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT);
if (has_videoalign) {
gst_buffer_pool_config_get_video_alignment (config, &p->video_align);
gst_video_info_align (&p->video_info, &p->video_align);
gst_buffer_pool_config_set_video_alignment (config, &p->video_align);
GST_LOG_OBJECT (pool, "Set alignment: %u-%ux%u-%u",
p->video_align.padding_left, p->video_align.padding_right,
p->video_align.padding_top, p->video_align.padding_bottom);
}
#endif
if (p->video_info.size != 0)
size = p->video_info.size;
@ -355,6 +441,10 @@ gst_pipewire_pool_finalize (GObject * object)
g_weak_ref_set (&pool->stream, NULL);
g_object_unref (pool->fd_allocator);
g_object_unref (pool->dmabuf_allocator);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
if (pool->shm_allocator)
g_object_unref (pool->shm_allocator);
#endif
G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object);
}
@ -389,5 +479,8 @@ gst_pipewire_pool_init (GstPipeWirePool * pool)
{
pool->fd_allocator = gst_fd_allocator_new ();
pool->dmabuf_allocator = gst_dmabuf_allocator_new ();
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gst_shm_allocator_init_once();
#endif
g_cond_init (&pool->cond);
}

View file

@ -9,6 +9,7 @@
#include <gst/gst.h>
#include <gst/audio/audio.h>
#include <gst/video/video.h>
#include <pipewire/pipewire.h>
@ -40,14 +41,19 @@ struct _GstPipeWirePool {
GWeakRef stream;
guint n_buffers;
gboolean has_video;
gboolean add_metavideo;
GstAudioInfo audio_info;
GstVideoInfo video_info;
GstVideoAlignment video_align;
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
GstAllocator *shm_allocator;
GCond cond;
gboolean paused;
gboolean allocate_memory;
};
enum GstPipeWirePoolMode {

View file

@ -334,9 +334,7 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink)
* the default, since we can't grow the pool once this is set */
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(
max_buffers, min_buffers, max_buffers),
SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(
(1<<SPA_DATA_MemFd) |
(1<<SPA_DATA_MemPtr)),
SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(1<<SPA_DATA_MemFd),
0);
port_params[n_params++] = spa_pod_builder_pop (&b, &f);
@ -603,15 +601,18 @@ static void
on_remove_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = _data;
GST_DEBUG_OBJECT (pwsink, "remove pw_buffer %p", b);
gst_pipewire_pool_remove_buffer (pwsink->stream->pool, b);
if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) &&
!GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) {
if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) {
GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND,
("all buffers have been removed"),
("PipeWire link to remote node was destroyed"));
}
}
}
static void
@ -807,6 +808,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
else
flags |= PW_STREAM_FLAG_DRIVER;
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
flags |= PW_STREAM_FLAG_ALLOC_BUFFERS;
pwsink->stream->pool->allocate_memory = true;
#endif
target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY;
if (pwsink->stream->target_object) {
@ -860,8 +866,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool));
gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers);
gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers);
if(pwsink->is_video)
gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META);
if (pwsink->is_video) {
gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT);
#endif
}
gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config);
pw_thread_loop_unlock (pwsink->stream->core->loop);