From ee2c6eb41ebab02c91faeb567cba86ece44dd996 Mon Sep 17 00:00:00 2001 From: Taruntej Kanakamalla Date: Thu, 20 Mar 2025 21:13:03 -0400 Subject: [PATCH] gst: sink: Manage buffer pool memory manually Let's make sure we own the memory in buffers, so that we can be resilient to the PW link going away. This currently maintains the status quo of copying data into the pipewirepool for sending to the remote end, but moves the allocation of buffers so that ownership is maintained by the sink in all cases. There are some tricky corners, especially with bufferpool vs. buffers param negotiation -- bufferpool parameters can be negotiated in GStreamer before the link even comes up, so we try to adapt the buffers param to use the negotiated value. For now, that is more brittle than tying those two aspects together. We can revisit this if we can find a way to tie pipeline state and link state more closely. Co-authored-by: Arun Raghavan --- meson.build | 5 ++ src/gst/gstpipewirepool.c | 151 ++++++++++++++++++++++++++++++-------- src/gst/gstpipewirepool.h | 6 ++ src/gst/gstpipewiresink.c | 26 +++++-- 4 files changed, 151 insertions(+), 37 deletions(-) diff --git a/meson.build b/meson.build index 42775e2ea..c7a15bda0 100644 --- a/meson.build +++ b/meson.build @@ -410,6 +410,7 @@ gst_deps_def = { gst_dep = [] gst_dma_drm_found = false +gst_shm_allocator_found = false foreach depname, kwargs: gst_deps_def dep = dependency(depname, required: gst_option, kwargs: kwargs) summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules') @@ -425,9 +426,13 @@ foreach depname, kwargs: gst_deps_def if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1') gst_dma_drm_found = true + gst_shm_allocator_found = true endif endforeach +summary({'gstreamer SHM allocator': gst_shm_allocator_found}, bool_yn: true, section: 'Backend') +cdata.set('HAVE_GSTREAMER_SHM_ALLOCATOR', gst_shm_allocator_found) + # This code relies on the array being empty if any dependency was not found gst_dp_found = gst_dep.length() > 0 summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend') diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index f2a925c94..3e061a0fd 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -10,8 +10,12 @@ #include #include +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR +#include +#endif #include +#include #include "gstpipewirepool.h" @@ -61,6 +65,8 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GstBuffer *buf; uint32_t i; GstPipeWirePoolData *data; + /* Default to a large enough value */ + gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { pool->video_info.size, }; GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas); @@ -68,6 +74,30 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) buf = gst_buffer_new (); + if (pool->add_metavideo) { + GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, + GST_VIDEO_FRAME_FLAG_NONE, + GST_VIDEO_INFO_FORMAT (&pool->video_info), + GST_VIDEO_INFO_WIDTH (&pool->video_info), + GST_VIDEO_INFO_HEIGHT (&pool->video_info), + GST_VIDEO_INFO_N_PLANES (&pool->video_info), + pool->video_info.offset, + pool->video_info.stride); + + gst_video_meta_set_alignment (meta, pool->video_align); + + if (!gst_video_meta_get_plane_size (meta, plane_sizes)) { + GST_ERROR_OBJECT (pool, "could not compute plane sizes"); + } + + /* + * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer + * will call reset_buffer and the default_reset_buffer implementation for + * GstBufferPool removes all metadata without the POOLED flag. + */ + GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED); + } + for (i = 0; i < b->buffer->n_datas; i++) { struct spa_data *d = &b->buffer->datas[i]; GstMemory *gmem = NULL; @@ -75,7 +105,51 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d", spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type, d->mapoffset, d->maxsize); - if (d->type == SPA_DATA_MemFd) { + + if (pool->allocate_memory) { +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gsize block_size = d->maxsize; + + if (pool->has_video) { + /* For video, we know block sizes from the video info already */ + block_size = plane_sizes[i]; + } else { + /* For audio, reserve space based on the quantum limit and channel count */ + g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&pool->stream); + + struct pw_context *context = pw_core_get_context(pw_stream_get_core(s->pwstream)); + const struct pw_properties *props = pw_context_get_properties(context); + uint32_t quantum_limit = 8192; /* "reasonable" default */ + + const char *quantum = spa_dict_lookup(&props->dict, "clock.quantum-limit"); + if (!quantum) { + quantum = spa_dict_lookup(&props->dict, "default.clock.quantum-limit"); + GST_DEBUG_OBJECT (pool, "using default quantum limit %s", quantum); + } + + if (quantum) + spa_atou32(quantum, &quantum_limit, 0); + GST_DEBUG_OBJECT (pool, "quantum limit %s", quantum); + + block_size = quantum_limit * pool->audio_info.bpf; + } + + GST_DEBUG_OBJECT (pool, "setting block size %lu", block_size); + + if (!pool->shm_allocator) + pool->shm_allocator = gst_shm_allocator_get(); + + /* use MemFd only. That is the only supported data type when memory is remote i.e. allocated by the client */ + gmem = gst_allocator_alloc (pool->shm_allocator, block_size, NULL); + d->fd = gst_fd_memory_get_fd (gmem); + d->mapoffset = 0; + d->flags = SPA_DATA_FLAG_READWRITE | SPA_DATA_FLAG_MAPPABLE; + + d->type = SPA_DATA_MemFd; + d->maxsize = block_size; + d->data = NULL; +#endif + } else if (d->type == SPA_DATA_MemFd) { gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd), d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); gst_memory_resize (gmem, d->mapoffset, d->maxsize); @@ -97,34 +171,13 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) gst_buffer_insert_memory (buf, i, gmem); } - if (pool->add_metavideo) { - GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, - GST_VIDEO_FRAME_FLAG_NONE, - GST_VIDEO_INFO_FORMAT (&pool->video_info), - GST_VIDEO_INFO_WIDTH (&pool->video_info), - GST_VIDEO_INFO_HEIGHT (&pool->video_info), - GST_VIDEO_INFO_N_PLANES (&pool->video_info), - pool->video_info.offset, - pool->video_info.stride); - gsize plane_sizes[GST_VIDEO_MAX_PLANES]; - - if (!gst_video_meta_get_plane_size (meta, plane_sizes)) { - GST_ERROR_OBJECT (pool, "could not compute plane sizes"); - } else { + if (pool->add_metavideo && !pool->allocate_memory) { /* Set memory sizes to expected plane sizes, so we know the valid size, * and the offsets in the meta make sense */ for (i = 0; i < gst_buffer_n_memory (buf); i++) { GstMemory *mem = gst_buffer_peek_memory (buf, i); gst_memory_resize (mem, 0, plane_sizes[i]); - } } - - /* - * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer - * will call reset_buffer and the default_reset_buffer implementation for - * GstBufferPool removes all metadata without the POOLED flag. - */ - GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED); } data->pool = gst_object_ref (pool); @@ -157,7 +210,8 @@ void gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, struct pw_buffer *b data->crop = NULL; data->videotransform = NULL; - gst_buffer_remove_all_memory (data->buf); + if (!pool->allocate_memory) + gst_buffer_remove_all_memory (data->buf); /* this will also destroy the pool data, if this is the last reference */ gst_clear_buffer (&data->buf); @@ -236,7 +290,13 @@ no_more_buffers: static const gchar ** get_options (GstBufferPool * pool) { - static const gchar *options[] = { GST_BUFFER_POOL_OPTION_VIDEO_META, NULL }; + static const gchar *options[] = { + GST_BUFFER_POOL_OPTION_VIDEO_META, +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT, +#endif + NULL + }; return options; } @@ -247,7 +307,7 @@ set_config (GstBufferPool * pool, GstStructure * config) GstCaps *caps; GstStructure *structure; guint size, min_buffers, max_buffers; - gboolean has_video; + gboolean has_videoalign; if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) { GST_WARNING_OBJECT (pool, "invalid config"); @@ -272,15 +332,41 @@ set_config (GstBufferPool * pool, GstStructure * config) structure = gst_caps_get_structure (caps, 0); if (g_str_has_prefix (gst_structure_get_name (structure), "video/") || g_str_has_prefix (gst_structure_get_name (structure), "image/")) { - has_video = TRUE; + p->has_video = TRUE; gst_video_info_from_caps (&p->video_info, caps); + +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo) && + GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM) { + gst_video_alignment_reset (&p->video_align); + gst_video_info_align (&p->video_info, &p->video_align); + } +#endif + } else if (g_str_has_prefix(gst_structure_get_name(structure), "audio/")) { + p->has_video = FALSE; + gst_audio_info_from_caps(&p->audio_info, caps); } else { - has_video = FALSE; + g_assert_not_reached (); } - p->add_metavideo = has_video && gst_buffer_pool_config_has_option (config, + p->add_metavideo = p->has_video && gst_buffer_pool_config_has_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + has_videoalign = p->has_video && gst_buffer_pool_config_has_option (config, + GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); + + if (has_videoalign) { + gst_buffer_pool_config_get_video_alignment (config, &p->video_align); + gst_video_info_align (&p->video_info, &p->video_align); + gst_buffer_pool_config_set_video_alignment (config, &p->video_align); + + GST_LOG_OBJECT (pool, "Set alignment: %u-%ux%u-%u", + p->video_align.padding_left, p->video_align.padding_right, + p->video_align.padding_top, p->video_align.padding_bottom); + } +#endif + if (p->video_info.size != 0) size = p->video_info.size; @@ -355,6 +441,10 @@ gst_pipewire_pool_finalize (GObject * object) g_weak_ref_set (&pool->stream, NULL); g_object_unref (pool->fd_allocator); g_object_unref (pool->dmabuf_allocator); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + if (pool->shm_allocator) + g_object_unref (pool->shm_allocator); +#endif G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); } @@ -389,5 +479,8 @@ gst_pipewire_pool_init (GstPipeWirePool * pool) { pool->fd_allocator = gst_fd_allocator_new (); pool->dmabuf_allocator = gst_dmabuf_allocator_new (); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gst_shm_allocator_init_once(); +#endif g_cond_init (&pool->cond); } diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index 15b7f0d59..c62d79e7d 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -9,6 +9,7 @@ #include +#include #include #include @@ -40,14 +41,19 @@ struct _GstPipeWirePool { GWeakRef stream; guint n_buffers; + gboolean has_video; gboolean add_metavideo; + GstAudioInfo audio_info; GstVideoInfo video_info; + GstVideoAlignment video_align; GstAllocator *fd_allocator; GstAllocator *dmabuf_allocator; + GstAllocator *shm_allocator; GCond cond; gboolean paused; + gboolean allocate_memory; }; enum GstPipeWirePoolMode { diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index c40fd32ec..981058bb1 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -334,9 +334,7 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) * the default, since we can't grow the pool once this is set */ SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( max_buffers, min_buffers, max_buffers), - SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int( - (1<stream->pool, b); if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) && !GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) { - GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, - ("all buffers have been removed"), - ("PipeWire link to remote node was destroyed")); + if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) { + GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, + ("all buffers have been removed"), + ("PipeWire link to remote node was destroyed")); + } } } @@ -807,6 +808,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) else flags |= PW_STREAM_FLAG_DRIVER; +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + flags |= PW_STREAM_FLAG_ALLOC_BUFFERS; + pwsink->stream->pool->allocate_memory = true; +#endif + target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY; if (pwsink->stream->target_object) { @@ -860,8 +866,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); - if(pwsink->is_video) - gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META); + if (pwsink->is_video) { + gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); +#endif + } gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config); pw_thread_loop_unlock (pwsink->stream->core->loop);