diff --git a/meson.build b/meson.build index c3852e362..199ba27b0 100644 --- a/meson.build +++ b/meson.build @@ -410,6 +410,7 @@ gst_deps_def = { gst_dep = [] gst_dma_drm_found = false +gst_shm_allocator_found = false foreach depname, kwargs: gst_deps_def dep = dependency(depname, required: gst_option, kwargs: kwargs) summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules') @@ -425,9 +426,13 @@ foreach depname, kwargs: gst_deps_def if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1') gst_dma_drm_found = true + gst_shm_allocator_found = true endif endforeach +summary({'gstreamer SHM allocator': gst_shm_allocator_found}, bool_yn: true, section: 'Backend') +cdata.set('HAVE_GSTREAMER_SHM_ALLOCATOR', gst_shm_allocator_found) + # This code relies on the array being empty if any dependency was not found gst_dp_found = gst_dep.length() > 0 summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend') diff --git a/spa/plugins/videoconvert/videoconvert-ffmpeg.c b/spa/plugins/videoconvert/videoconvert-ffmpeg.c index 3da0e5677..5d16326e9 100644 --- a/spa/plugins/videoconvert/videoconvert-ffmpeg.c +++ b/spa/plugins/videoconvert/videoconvert-ffmpeg.c @@ -1665,8 +1665,8 @@ impl_node_port_use_buffers(void *object, b->buf = buffers[i]; if (n_datas != port->blocks) { - spa_log_error(this->log, "%p: invalid blocks %d on buffer %d", - this, n_datas, i); + spa_log_error(this->log, "%p: invalid blocks %d on buffer %d, expected %d", + this, n_datas, i, port->blocks); return -EINVAL; } if (SPA_FLAG_IS_SET(flags, SPA_NODE_BUFFERS_FLAG_ALLOC)) { diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index 78869e163..f82dfbf46 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -10,8 +10,12 @@ #include #include +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR +#include +#endif #include +#include #include "gstpipewirepool.h" @@ -61,13 +65,42 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GstBuffer *buf; uint32_t i; GstPipeWirePoolData *data; + /* Default to a large enough value */ + gsize plane_0_size = pool->has_rawvideo ? + pool->video_info.size : + (gsize) pool->video_info.width * pool->video_info.height; + gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { plane_0_size, }; - GST_DEBUG_OBJECT (pool, "wrap buffer"); + GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas); data = g_slice_new (GstPipeWirePoolData); buf = gst_buffer_new (); + if (pool->add_metavideo) { + GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, + GST_VIDEO_FRAME_FLAG_NONE, + GST_VIDEO_INFO_FORMAT (&pool->video_info), + GST_VIDEO_INFO_WIDTH (&pool->video_info), + GST_VIDEO_INFO_HEIGHT (&pool->video_info), + GST_VIDEO_INFO_N_PLANES (&pool->video_info), + pool->video_info.offset, + pool->video_info.stride); + + gst_video_meta_set_alignment (meta, pool->video_align); + + if (!gst_video_meta_get_plane_size (meta, plane_sizes)) { + GST_ERROR_OBJECT (pool, "could not compute plane sizes"); + } + + /* + * We need to set the video meta as pooled, else gst_buffer_pool_release_buffer + * will call reset_buffer and the default_reset_buffer implementation for + * GstBufferPool removes all metadata without the POOLED flag. + */ + GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED); + } + for (i = 0; i < b->buffer->n_datas; i++) { struct spa_data *d = &b->buffer->datas[i]; GstMemory *gmem = NULL; @@ -75,7 +108,51 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d", spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type, d->mapoffset, d->maxsize); - if (d->type == SPA_DATA_MemFd) { + + if (pool->allocate_memory) { +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gsize block_size = d->maxsize; + + if (pool->has_video) { + /* For video, we know block sizes from the video info already */ + block_size = plane_sizes[i]; + } else { + /* For audio, reserve space based on the quantum limit and channel count */ + g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&pool->stream); + + struct pw_context *context = pw_core_get_context(pw_stream_get_core(s->pwstream)); + const struct pw_properties *props = pw_context_get_properties(context); + uint32_t quantum_limit = 8192; /* "reasonable" default */ + + const char *quantum = spa_dict_lookup(&props->dict, "clock.quantum-limit"); + if (!quantum) { + quantum = spa_dict_lookup(&props->dict, "default.clock.quantum-limit"); + GST_DEBUG_OBJECT (pool, "using default quantum limit %s", quantum); + } + + if (quantum) + spa_atou32(quantum, &quantum_limit, 0); + GST_DEBUG_OBJECT (pool, "quantum limit %s", quantum); + + block_size = quantum_limit * pool->audio_info.bpf; + } + + GST_DEBUG_OBJECT (pool, "setting block size %zu", block_size); + + if (!pool->shm_allocator) + pool->shm_allocator = gst_shm_allocator_get(); + + /* use MemFd only. That is the only supported data type when memory is remote i.e. allocated by the client */ + gmem = gst_allocator_alloc (pool->shm_allocator, block_size, NULL); + d->fd = gst_fd_memory_get_fd (gmem); + d->mapoffset = 0; + d->flags = SPA_DATA_FLAG_READWRITE | SPA_DATA_FLAG_MAPPABLE; + + d->type = SPA_DATA_MemFd; + d->maxsize = block_size; + d->data = NULL; +#endif + } else if (d->type == SPA_DATA_MemFd) { gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd), d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); gst_memory_resize (gmem, d->mapoffset, d->maxsize); @@ -89,18 +166,21 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, d->maxsize, NULL, NULL); } + else { + GST_WARNING_OBJECT (pool, "unknown data type (%s %d)", + spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type); + } if (gmem) gst_buffer_insert_memory (buf, i, gmem); } - if (pool->add_metavideo) { - gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, - GST_VIDEO_INFO_FORMAT (&pool->video_info), - GST_VIDEO_INFO_WIDTH (&pool->video_info), - GST_VIDEO_INFO_HEIGHT (&pool->video_info), - GST_VIDEO_INFO_N_PLANES (&pool->video_info), - pool->video_info.offset, - pool->video_info.stride); + if (pool->add_metavideo && !pool->allocate_memory) { + /* Set memory sizes to expected plane sizes, so we know the valid size, + * and the offsets in the meta make sense */ + for (i = 0; i < gst_buffer_n_memory (buf); i++) { + GstMemory *mem = gst_buffer_peek_memory (buf, i); + gst_memory_resize (mem, 0, plane_sizes[i]); + } } data->pool = gst_object_ref (pool); @@ -133,7 +213,8 @@ void gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, struct pw_buffer *b data->crop = NULL; data->videotransform = NULL; - gst_buffer_remove_all_memory (data->buf); + if (!pool->allocate_memory) + gst_buffer_remove_all_memory (data->buf); /* this will also destroy the pool data, if this is the last reference */ gst_clear_buffer (&data->buf); @@ -212,7 +293,13 @@ no_more_buffers: static const gchar ** get_options (GstBufferPool * pool) { - static const gchar *options[] = { GST_BUFFER_POOL_OPTION_VIDEO_META, NULL }; + static const gchar *options[] = { + GST_BUFFER_POOL_OPTION_VIDEO_META, +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT, +#endif + NULL + }; return options; } @@ -223,7 +310,7 @@ set_config (GstBufferPool * pool, GstStructure * config) GstCaps *caps; GstStructure *structure; guint size, min_buffers, max_buffers; - gboolean has_video; + gboolean has_videoalign; if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) { GST_WARNING_OBJECT (pool, "invalid config"); @@ -235,18 +322,63 @@ set_config (GstBufferPool * pool, GstStructure * config) return FALSE; } + /* We don't support unlimited buffers */ + if (max_buffers == 0) + max_buffers = PIPEWIRE_POOL_MAX_BUFFERS; + /* Pick a sensible min to avoid starvation */ + if (min_buffers == 0) + min_buffers = PIPEWIRE_POOL_MIN_BUFFERS; + + if (min_buffers < PIPEWIRE_POOL_MIN_BUFFERS || max_buffers > PIPEWIRE_POOL_MAX_BUFFERS) + return FALSE; + structure = gst_caps_get_structure (caps, 0); if (g_str_has_prefix (gst_structure_get_name (structure), "video/") || g_str_has_prefix (gst_structure_get_name (structure), "image/")) { - has_video = TRUE; + p->has_video = TRUE; + gst_video_info_from_caps (&p->video_info, caps); + + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo) +#ifdef HAVE_GSTREAMER_DMA_DRM + && GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM +#endif + ) + p->has_rawvideo = TRUE; + else + p->has_rawvideo = FALSE; + +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + if (p->has_rawvideo) { + gst_video_alignment_reset (&p->video_align); + gst_video_info_align (&p->video_info, &p->video_align); + } +#endif + } else if (g_str_has_prefix(gst_structure_get_name(structure), "audio/")) { + p->has_video = FALSE; + gst_audio_info_from_caps(&p->audio_info, caps); } else { - has_video = FALSE; + g_assert_not_reached (); } - p->add_metavideo = has_video && gst_buffer_pool_config_has_option (config, + p->add_metavideo = p->has_rawvideo && gst_buffer_pool_config_has_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + has_videoalign = p->has_rawvideo && gst_buffer_pool_config_has_option (config, + GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); + + if (has_videoalign) { + gst_buffer_pool_config_get_video_alignment (config, &p->video_align); + gst_video_info_align (&p->video_info, &p->video_align); + gst_buffer_pool_config_set_video_alignment (config, &p->video_align); + + GST_LOG_OBJECT (pool, "Set alignment: %u-%ux%u-%u", + p->video_align.padding_left, p->video_align.padding_right, + p->video_align.padding_top, p->video_align.padding_bottom); + } +#endif + if (p->video_info.size != 0) size = p->video_info.size; @@ -321,6 +453,10 @@ gst_pipewire_pool_finalize (GObject * object) g_weak_ref_set (&pool->stream, NULL); g_object_unref (pool->fd_allocator); g_object_unref (pool->dmabuf_allocator); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + if (pool->shm_allocator) + g_object_unref (pool->shm_allocator); +#endif G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); } @@ -355,5 +491,8 @@ gst_pipewire_pool_init (GstPipeWirePool * pool) { pool->fd_allocator = gst_fd_allocator_new (); pool->dmabuf_allocator = gst_dmabuf_allocator_new (); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gst_shm_allocator_init_once(); +#endif g_cond_init (&pool->cond); } diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index fb00a100c..f71344354 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -9,6 +9,7 @@ #include +#include #include #include @@ -18,6 +19,15 @@ G_BEGIN_DECLS #define GST_TYPE_PIPEWIRE_POOL (gst_pipewire_pool_get_type()) G_DECLARE_FINAL_TYPE (GstPipeWirePool, gst_pipewire_pool, GST, PIPEWIRE_POOL, GstBufferPool) +#define PIPEWIRE_POOL_MIN_BUFFERS 2u +#define PIPEWIRE_POOL_MAX_BUFFERS 16u + +/* Only available in GStreamer 1.22+ */ +#ifndef GST_VIDEO_FORMAT_INFO_IS_VALID_RAW +#define GST_VIDEO_FORMAT_INFO_IS_VALID_RAW(info) \ + (info != NULL && (info)->format > GST_VIDEO_FORMAT_ENCODED) +#endif + typedef struct _GstPipeWirePoolData GstPipeWirePoolData; struct _GstPipeWirePoolData { GstPipeWirePool *pool; @@ -37,14 +47,20 @@ struct _GstPipeWirePool { GWeakRef stream; guint n_buffers; + gboolean has_video; + gboolean has_rawvideo; gboolean add_metavideo; + GstAudioInfo audio_info; GstVideoInfo video_info; + GstVideoAlignment video_align; GstAllocator *fd_allocator; GstAllocator *dmabuf_allocator; + GstAllocator *shm_allocator; GCond cond; gboolean paused; + gboolean allocate_memory; }; enum GstPipeWirePoolMode { diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index b79ae6a14..c7ac851ae 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -40,7 +40,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE #define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO -#define MIN_BUFFERS 8u +#define MAX_ERROR_MS 1 +#define RESYNC_TIMEOUT_MS 10 enum { @@ -167,7 +168,8 @@ gst_pipewire_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query) GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink); if (pwsink->use_bufferpool != USE_BUFFERPOOL_NO) - gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, 0, 0); + gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, + PIPEWIRE_POOL_MIN_BUFFERS, PIPEWIRE_POOL_MAX_BUFFERS); gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL); return TRUE; @@ -242,7 +244,8 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) GST_TYPE_PIPEWIRE_SINK_MODE, DEFAULT_PROP_MODE, G_PARAM_READWRITE | - G_PARAM_STATIC_STRINGS)); + G_PARAM_STATIC_STRINGS | + GST_PARAM_MUTABLE_READY)); g_object_class_install_property (gobject_class, PROP_FD, @@ -310,21 +313,38 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink) config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool)); gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers); + /* We cannot dynamically grow the pool */ + if (max_buffers == 0) { + GST_WARNING_OBJECT (sink, "cannot support unlimited buffers in pool"); + max_buffers = PIPEWIRE_POOL_MAX_BUFFERS; + } + spa_pod_builder_init (&b, buffer, sizeof (buffer)); spa_pod_builder_push_object (&b, &f, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers); spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX), 0); + if (sink->is_rawvideo) { + /* MUST have n_datas == n_planes */ + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_blocks, + SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info)), + 0); + } else { + /* Non-planar data, get a single block */ + spa_pod_builder_add (&b, + SPA_PARAM_BUFFERS_blocks, + SPA_POD_Int(1), + 0); + } spa_pod_builder_add (&b, SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX), + /* At this stage, we will request as many buffers as we _might_ need as + * the default, since we can't grow the pool once this is set */ SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int( - SPA_MAX(MIN_BUFFERS, min_buffers), - SPA_MAX(MIN_BUFFERS, min_buffers), - max_buffers ? max_buffers : INT32_MAX), - SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int( - (1<is_video) { + if (sink->is_rawvideo) { port_params[n_params++] = spa_pod_builder_add_object (&b, SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta, SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop), @@ -359,7 +379,8 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) sink->mode = DEFAULT_PROP_MODE; sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL; - sink->is_video = false; + sink->is_rawvideo = false; + sink->first_buffer = true; GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK); @@ -377,7 +398,7 @@ gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, GstCaps * caps) structure = gst_caps_get_structure (caps, 0); if (gst_structure_has_name (structure, "video/x-raw")) { - pwsink->is_video = true; + pwsink->is_rawvideo = true; gst_structure_fixate_field_nearest_int (structure, "width", 320); gst_structure_fixate_field_nearest_int (structure, "height", 240); gst_structure_fixate_field_nearest_fraction (structure, "framerate", 30, 1); @@ -591,14 +612,17 @@ static void on_remove_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSink *pwsink = _data; + GST_DEBUG_OBJECT (pwsink, "remove pw_buffer %p", b); gst_pipewire_pool_remove_buffer (pwsink->stream->pool, b); if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) && !GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) { - GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, - ("all buffers have been removed"), - ("PipeWire link to remote node was destroyed")); + if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) { + GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND, + ("all buffers have been removed"), + ("PipeWire link to remote node was destroyed")); + } } } @@ -633,6 +657,9 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } data->b->size = 0; + + spa_assert(b->n_datas == gst_buffer_n_memory(buffer)); + for (i = 0; i < b->n_datas; i++) { struct spa_data *d = &b->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buffer, i); @@ -646,16 +673,20 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GstVideoMeta *meta = gst_buffer_get_video_meta (buffer); if (meta) { if (meta->n_planes == b->n_datas) { + uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (&data->pool->video_info); gsize video_size = 0; - for (i = 0; i < meta->n_planes; i++) { + + for (i = 0; i < n_planes; i++) { struct spa_data *d = &b->datas[i]; - d->chunk->offset += meta->offset[i] - video_size; + d->chunk->stride = meta->stride[i]; + d->chunk->offset = meta->offset[i] - video_size; video_size += d->chunk->size; } } else { - GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u", meta->n_planes, b->n_datas); + GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u", + meta->n_planes, b->n_datas); } } @@ -663,7 +694,18 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res)); } else { data->queued = TRUE; - GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer); + GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p", + data->b, data->b->size, buffer); + if (pwsink->first_buffer) { + pwsink->first_buffer = false; + pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer); + } + stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts, + pwsink->rate, 1 * GST_SECOND); + + // have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable) + stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer), + pwsink->rate, 1 * GST_SECOND)); } switch (pwsink->slave_method) { @@ -675,6 +717,56 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } +static void update_time (GstPipeWireSink *pwsink) +{ + struct spa_io_position *p = pwsink->stream->io_position; + double err = 0.0, corr = 1.0; + guint64 now; + double max_err = pwsink->rate * MAX_ERROR_MS/1000.0; + double resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000.0; + + if (pwsink->first_buffer) { + // use the target duration before the first buffer + pwsink->stream->buf_duration = p->clock.target_duration; + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, + pwsink->rate); + } + + now = pw_stream_get_nsec(pwsink->stream->pwstream); + err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 * GST_SECOND) - + (double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 * GST_SECOND); + + GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %"PRIu64" next is %"PRIu64"", err, max_err, now, + p->clock.next_nsec); + + if (fabs(err) > max_err) { + if (fabs(err) > resync_timeout) { + GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err); + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration, + pwsink->rate); + err = 0.0; + } else { + err = SPA_CLAMPD(err, -max_err, max_err); + } + } + corr = spa_dll_update(&pwsink->stream->dll, err); + + p->clock.nsec = now; + p->clock.position = pwsink->stream->position; + p->clock.duration = pwsink->stream->buf_duration; + /* we don't have a way to estimate the target (next cycle) buffer duration + * so use the current buffer duration + */ + p->clock.target_duration = pwsink->stream->buf_duration; + p->clock.rate = SPA_FRACTION(1, pwsink->rate); + // current time plus duration scaled with correlation + p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate); + p->clock.rate_diff = corr; + + GST_DEBUG_OBJECT(pwsink, "now %"PRIu64", position %"PRIu64", duration %"PRIu64", rate :%d," + "next : %"PRIu64", delay is %"PRIi64", rate_diff is %f", p->clock.nsec, p->clock.position, + p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff); +} static void on_process (void *data) @@ -684,6 +776,23 @@ on_process (void *data) g_cond_signal (&pwsink->stream->pool->cond); } +static int invoke_trigger_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + + GstPipeWireSink *pwsink = user_data; + + /* Note: We cannot use the rate for computation of other clock params + * in case of video because the rate for video is set as 0 in the _setcaps. + * So skip update time for video (i.e. when rate is 0). The video buffers + * get timestamp from the SPA_META_Header anyway + */ + + if (pwsink->rate) + update_time(pwsink); + return pw_stream_trigger_process(pwsink->stream->pwstream); +} + static void on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error) { @@ -699,7 +808,8 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta break; case PW_STREAM_STATE_STREAMING: if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), + invoke_trigger_process, 1, NULL, 0 , false, pwsink); break; case PW_STREAM_STATE_ERROR: /* make the error permanent, if it is not already; @@ -759,9 +869,21 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) if (pwsink->use_bufferpool != USE_BUFFERPOOL_YES) pwsink->use_bufferpool = USE_BUFFERPOOL_NO; } else { + GstVideoInfo video_info; + pwsink->rate = rate = 0; pwsink->rate_match = false; - pwsink->is_video = true; + + gst_video_info_from_caps (&video_info, caps); + + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (video_info.finfo) +#ifdef HAVE_GSTREAMER_DMA_DRM + && GST_VIDEO_FORMAT_INFO_FORMAT (video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM +#endif + ) + pwsink->is_rawvideo = TRUE; + else + pwsink->is_rawvideo = FALSE; } spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate); @@ -788,6 +910,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) else flags |= PW_STREAM_FLAG_DRIVER; +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + flags |= PW_STREAM_FLAG_ALLOC_BUFFERS; + pwsink->stream->pool->allocate_memory = true; +#endif + target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY; if (pwsink->stream->target_object) { @@ -841,8 +968,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool)); gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers); gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers); - if(pwsink->is_video) - gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META); + if (pwsink->is_rawvideo) { + gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META); +#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR + gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT); +#endif + } gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config); pw_thread_loop_unlock (pwsink->stream->core->loop); @@ -924,20 +1055,17 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) if (res != GST_FLOW_OK) goto done; - if (pwsink->is_video) { + if (pwsink->is_rawvideo) { GstVideoFrame src, dst; gboolean copied = FALSE; buf_size = 0; // to break from the loop - /* - splitting of buffers in the case of video might break the frame layout - and that seems to be causing issues while retrieving the buffers on the receiver - side. Hence use the video_frame_map to copy the buffer of bigger size into the - pipewirepool's buffer - */ + /* splitting of buffers in the case of video might break the frame layout + * and that seems to be causing issues while retrieving the buffers on the receiver + * side. Hence use the video_frame_map to copy the buffer of bigger size into the + * pipewirepool's buffer */ - if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, - GST_MAP_WRITE)) { + if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, GST_MAP_WRITE)) { GST_ERROR_OBJECT(pwsink, "Failed to map dest buffer"); return GST_FLOW_ERROR; } @@ -957,8 +1085,6 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) GST_ERROR_OBJECT(pwsink, "Failed to copy the frame"); return GST_FLOW_ERROR; } - - gst_buffer_copy_into(b, buffer, GST_BUFFER_COPY_METADATA, 0, -1); } else { gst_buffer_map (b, &info, GST_MAP_WRITE); gsize extract_size = (buf_size <= info.maxsize) ? buf_size: info.maxsize; @@ -980,7 +1106,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) gst_buffer_unref (b); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), + invoke_trigger_process, 1, NULL, 0 , false, pwsink); } } else { GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool"); @@ -988,7 +1115,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer) do_send_buffer (pwsink, buffer); if (pw_stream_is_driving (pwsink->stream->pwstream)) - pw_stream_trigger_process (pwsink->stream->pwstream); + pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream), + invoke_trigger_process, 1, NULL, 0 , false, pwsink); } done_unlock: @@ -1002,6 +1130,18 @@ not_negotiated: } } +static void +on_io_changed(void *data, uint32_t id, void *area, uint32_t size) +{ + GstPipeWireSink *pwsink = data; + + switch (id) { + case SPA_IO_Position: + pwsink->stream->io_position = area; + break; + } +} + static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, @@ -1009,6 +1149,7 @@ static const struct pw_stream_events stream_events = { .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, .process = on_process, + .io_changed = on_io_changed, }; static GstStateChangeReturn @@ -1023,6 +1164,14 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition) goto open_failed; break; case GST_STATE_CHANGE_READY_TO_PAUSED: + /* If we are a driver, we shouldn't try to also provide the clock, as we + * _are_ the clock for the graph. For that case, we rely on the pipeline + * clock to drive the pipeline (and thus the graph). */ + if (this->mode == GST_PIPEWIRE_SINK_MODE_PROVIDE) + GST_OBJECT_FLAG_UNSET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + else + GST_OBJECT_FLAG_SET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + /* the initial stream state is active, which is needed for linking and * negotiation to happen and the bufferpool to be set up. We don't know * if we'll go to plaing, so we deactivate the stream until that diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 60eb3b79f..5816f7a15 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -69,7 +69,9 @@ struct _GstPipeWireSink { gboolean negotiated; gboolean rate_match; gint rate; - gboolean is_video; + gboolean is_rawvideo; + gboolean first_buffer; + GstClockTime first_buffer_pts; GstPipeWireSinkMode mode; GstPipeWireSinkSlaveMethod slave_method; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 994534c60..ca55baa46 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -46,7 +46,9 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define DEFAULT_RESEND_LAST false #define DEFAULT_KEEPALIVE_TIME 0 #define DEFAULT_AUTOCONNECT true -#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE +#define DEFAULT_PROVIDE_CLOCK TRUE enum { @@ -64,8 +66,29 @@ enum PROP_KEEPALIVE_TIME, PROP_AUTOCONNECT, PROP_USE_BUFFERPOOL, + PROP_ON_DISCONNECT, + PROP_PROVIDE_CLOCK, }; +GType +gst_pipewire_src_on_disconnect_get_type (void) +{ + static gsize on_disconnect_type = 0; + static const GEnumValue on_disconnect[] = { + {GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, "GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE", "none"}, + {GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, "GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS", "eos"}, + {GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, "GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR", "error"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&on_disconnect_type)) { + GType tmp = + g_enum_register_static ("GstPipeWireSrcOnDisconnect", on_disconnect); + g_once_init_leave (&on_disconnect_type, tmp); + } + + return (GType) on_disconnect_type; +} static GstStaticPadTemplate gst_pipewire_src_template = GST_STATIC_PAD_TEMPLATE ("src", @@ -170,6 +193,20 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->use_bufferpool = USE_BUFFERPOOL_NO; break; + case PROP_ON_DISCONNECT: + pwsrc->on_disconnect = g_value_get_enum (value); + break; + + case PROP_PROVIDE_CLOCK: + gboolean provide = g_value_get_boolean (value); + GST_OBJECT_LOCK (pwsrc); + if (provide) + GST_OBJECT_FLAG_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + else + GST_OBJECT_FLAG_UNSET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + GST_OBJECT_UNLOCK (pwsrc); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -235,6 +272,18 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, !!pwsrc->use_bufferpool); break; + case PROP_ON_DISCONNECT: + g_value_set_enum (value, pwsrc->on_disconnect); + break; + + case PROP_PROVIDE_CLOCK: + gboolean result; + GST_OBJECT_LOCK (pwsrc); + result = GST_OBJECT_FLAG_IS_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK); + GST_OBJECT_UNLOCK (pwsrc); + g_value_set_boolean (value, result); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -414,6 +463,25 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_ON_DISCONNECT, + g_param_spec_enum ("on-disconnect", + "On disconnect", + "Action to take on disconnect", + GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT, + DEFAULT_ON_DISCONNECT, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, + PROP_PROVIDE_CLOCK, + g_param_spec_boolean ("provide-clock", + "Provide Clock", + "Provide a clock to be used as the global pipeline clock", + DEFAULT_PROVIDE_CLOCK, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->change_state = gst_pipewire_src_change_state; gstelement_class->send_event = gst_pipewire_src_send_event; @@ -462,6 +530,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->autoconnect = DEFAULT_AUTOCONNECT; src->min_latency = 0; src->max_latency = GST_CLOCK_TIME_NONE; + src->n_buffers = 0; + src->flushing_on_remove_buffer = FALSE; + src->on_disconnect = DEFAULT_ON_DISCONNECT; src->transform_value = UINT32_MAX; } @@ -469,11 +540,26 @@ gst_pipewire_src_init (GstPipeWireSrc * src) static gboolean buffer_recycle (GstMiniObject *obj) { - GstPipeWireSrc *src; - GstPipeWirePoolData *data; + GstPipeWirePoolData *data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + GstPipeWireSrc *src = data->owner; int res; - data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + if (src->flushing_on_remove_buffer) { + /* + * If a flush-start was initiated, this might be called by elements like + * queues downstream purging buffers from their internal queues. This can + * deadlock if queues use min-threshold-buffers/bytes/time with src_create + * trying to take the loop lock and buffer_recycle trying to take the loop + * lock down below. We return from here, to prevent deadlock with streaming + * thread in a queue thread. + * + * We will take care of queueing the buffer in on_remove_buffer. + */ + GstBuffer *buffer = GST_BUFFER_CAST(obj); + GST_DEBUG_OBJECT (src, + "flush-start initiated, skipping buffer recycle %p", buffer); + return TRUE; + } GST_OBJECT_LOCK (data->pool); if (!obj->dispose) { @@ -482,7 +568,6 @@ buffer_recycle (GstMiniObject *obj) } GST_BUFFER_FLAGS (obj) = data->flags; - src = data->owner; pw_thread_loop_lock (src->stream->core->loop); if (!obj->dispose) { @@ -519,6 +604,8 @@ on_add_buffer (void *_data, struct pw_buffer *b) data->owner = pwsrc; data->queued = TRUE; GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; + + pwsrc->n_buffers++; } static void @@ -527,17 +614,76 @@ on_remove_buffer (void *_data, struct pw_buffer *b) GstPipeWireSrc *pwsrc = _data; GstPipeWirePoolData *data = b->user_data; GstBuffer *buf = data->buf; + gboolean flush_on_remove; int res; - GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf); + GST_DEBUG_OBJECT (pwsrc, "remove buffer %p, queued: %d", + buf, data->queued); GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + flush_on_remove = + pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR || + pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS; + + if (flush_on_remove && !pwsrc->flushing_on_remove_buffer) { + pwsrc->flushing_on_remove_buffer = TRUE; + + GST_DEBUG_OBJECT (pwsrc, "flush-start on remove buffer"); + /* + * It is possible that when buffers are being removed, a downstream + * element can be holding on to a buffer or in the middle of rendering + * the same. Former is possible with queues min-threshold-buffers or + * similar. Latter can result in a crash during gst_video_frame_copy. + * + * We send a flush-start event downstream to make elements discard + * any buffers they may be holding on to as well as return from their + * chain function ASAP. + */ + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_start ()); + } + if (data->queued) { gst_buffer_unref (buf); } else { if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0) - GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); + GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", + buf, spa_strerror(res)); + else + GST_DEBUG_OBJECT (pwsrc, "queued buffer %p", buf); + } + + pwsrc->n_buffers--; + + if (pwsrc->n_buffers == 0) { + GST_DEBUG_OBJECT (pwsrc, "removed all buffers"); + + pwsrc->flushing_on_remove_buffer = FALSE; + + switch (pwsrc->on_disconnect) { + case GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: + GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_stop (FALSE)); + + GST_ELEMENT_ERROR (pwsrc, RESOURCE, NOT_FOUND, + ("all buffers have been removed"), + ("PipeWire link to remote node was destroyed")); + break; + case GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: + GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_stop (FALSE)); + + GST_DEBUG_OBJECT (pwsrc, "sending eos downstream"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_eos()); + break; + case GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: + GST_DEBUG_OBJECT (pwsrc, "stream closed or removed"); + break; + } } } @@ -660,9 +806,12 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) pwsrc->transform_value = transform_value; } - if (pwsrc->is_video) { - gsize video_size = 0; + if (pwsrc->is_rawvideo) { GstVideoInfo *info = &pwsrc->video_info; + uint32_t n_datas = b->buffer->n_datas; + uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (info); + gsize video_size = 0; + GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE, GST_VIDEO_INFO_FORMAT (info), GST_VIDEO_INFO_WIDTH (info), @@ -671,7 +820,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) info->offset, info->stride); - for (i = 0; i < MIN (b->buffer->n_datas, GST_VIDEO_MAX_PLANES); i++) { + for (i = 0; i < MIN (n_datas, n_planes); i++) { struct spa_data *d = &b->buffer->datas[i]; meta->offset[i] = video_size; meta->stride[i] = d->chunk->stride; @@ -680,6 +829,10 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) } } + if (b->buffer->n_datas != gst_buffer_n_memory(data->buf)) { + GST_ERROR_OBJECT(pwsrc, "n_datas != n_memory, (%d != %d)", b->buffer->n_datas, gst_buffer_n_memory(data->buf)); + } + for (i = 0; i < b->buffer->n_datas; i++) { struct spa_data *d = &b->buffer->datas[i]; @@ -730,13 +883,29 @@ on_state_changed (void *data, enum pw_stream_state state, const char *error) { GstPipeWireSrc *pwsrc = data; + GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state; - GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state)); + GST_DEBUG_OBJECT (pwsrc, "got stream state %s", pw_stream_state_as_string (state)); switch (state) { case PW_STREAM_STATE_UNCONNECTED: case PW_STREAM_STATE_CONNECTING: + break; case PW_STREAM_STATE_PAUSED: + /* + * We may see a driver/quantum/clock rate change on switching audio + * sources. The same is not applicable for video. + * + * We post the clock lost message here to take care of a possible + * jump or shift in base_time/clock for the pipeline. Application + * must handle the clock lost message in it's bus handler by pausing + * the pipeline and then setting it back to playing. + */ + if (current_state == GST_STATE_PLAYING && !pwsrc->is_video) + gst_element_post_message (GST_ELEMENT_CAST (pwsrc), + gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc), + GST_CLOCK_CAST (pwsrc->stream->clock))); + break; case PW_STREAM_STATE_STREAMING: break; case PW_STREAM_STATE_ERROR: @@ -982,7 +1151,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", pwsrc->stream->path, pwsrc->stream->target_object); - pwsrc->possible_caps = possible_caps; + pwsrc->possible_caps = gst_caps_ref (possible_caps); pwsrc->negotiated = FALSE; enum pw_stream_flags flags; @@ -1019,7 +1188,6 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) } negotiated_caps = g_steal_pointer (&pwsrc->caps); - pwsrc->possible_caps = NULL; pw_thread_loop_unlock (pwsrc->stream->core->loop); if (negotiated_caps == NULL) @@ -1140,10 +1308,21 @@ handle_format_change (GstPipeWireSrc *pwsrc, pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error"); return; } + pwsrc->is_rawvideo = TRUE; } else { gst_video_info_dma_drm_init (&pwsrc->drm_info); #endif gst_video_info_from_caps (&pwsrc->video_info, pwsrc->caps); + + if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (pwsrc->video_info.finfo) +#ifdef HAVE_GSTREAMER_DMA_DRM + && GST_VIDEO_FORMAT_INFO_FORMAT (pwsrc->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM +#endif + ) + pwsrc->is_rawvideo = TRUE; + else + pwsrc->is_rawvideo = FALSE; + #ifdef HAVE_GSTREAMER_DMA_DRM } #endif @@ -1156,6 +1335,7 @@ handle_format_change (GstPipeWireSrc *pwsrc, } else { pwsrc->negotiated = FALSE; pwsrc->is_video = FALSE; + pwsrc->is_rawvideo = FALSE; } if (pwsrc->caps) { @@ -1330,6 +1510,8 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) GstBuffer *buf; gboolean update_time = FALSE, timeout = FALSE; GstCaps *caps = NULL; + struct timespec abstime = { 0, }; + bool have_abstime = false; pwsrc = GST_PIPEWIRE_SRC (psrc); @@ -1373,13 +1555,11 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) update_time = TRUE; GST_LOG_OBJECT (pwsrc, "EOS, send last buffer"); break; - } else if (timeout) { - if (pwsrc->last_buffer != NULL) { - update_time = TRUE; - buf = gst_buffer_ref(pwsrc->last_buffer); - GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer"); - break; - } + } else if (timeout && pwsrc->last_buffer != NULL) { + update_time = TRUE; + buf = gst_buffer_ref(pwsrc->last_buffer); + GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer"); + break; } else { buf = dequeue_buffer (pwsrc); GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf); @@ -1391,9 +1571,13 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer) } timeout = FALSE; if (pwsrc->keepalive_time > 0) { - struct timespec abstime; - pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime, - pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); + if (!have_abstime) { + /* Record the time we want to timeout at once, for this loop -- the loop might get unrelated signal()s, + * and we don't want the keepalive time to get reset by that */ + pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime, + pwsrc->keepalive_time * SPA_NSEC_PER_MSEC); + have_abstime = TRUE; + } if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) == -ETIMEDOUT) timeout = TRUE; } else { @@ -1464,6 +1648,7 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc) pwsrc->eos = false; gst_buffer_replace (&pwsrc->last_buffer, NULL); gst_caps_replace(&pwsrc->caps, NULL); + gst_caps_replace(&pwsrc->possible_caps, NULL); pwsrc->transform_value = UINT32_MAX; pw_thread_loop_unlock (pwsrc->stream->core->loop); diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index d5728cdc9..3ac88b10b 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -24,6 +24,22 @@ G_BEGIN_DECLS #define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj)) G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc) +/** + * GstPipeWireSrcOnDisconnect: + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: send EoS downstream + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: raise pipeline error + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: no action + * + * Different actions on disconnect. + */ +typedef enum +{ + GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, + GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, + GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, +} GstPipeWireSrcOnDisconnect; + +#define GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT (gst_pipewire_src_on_disconnect_get_type ()) /** * GstPipeWireSrc: @@ -36,6 +52,7 @@ struct _GstPipeWireSrc { GstPipeWireStream *stream; /*< private >*/ + gint n_buffers; gint use_bufferpool; gint min_buffers; gint max_buffers; @@ -47,6 +64,7 @@ struct _GstPipeWireSrc { GstCaps *possible_caps; gboolean is_video; + gboolean is_rawvideo; GstVideoInfo video_info; #ifdef HAVE_GSTREAMER_DMA_DRM GstVideoInfoDmaDrm drm_info; @@ -56,6 +74,7 @@ struct _GstPipeWireSrc { gboolean flushing; gboolean started; gboolean eos; + gboolean flushing_on_remove_buffer; gboolean is_live; int64_t delay; @@ -65,8 +84,12 @@ struct _GstPipeWireSrc { GstBuffer *last_buffer; enum spa_meta_videotransform_value transform_value; + + GstPipeWireSrcOnDisconnect on_disconnect; }; +GType gst_pipewire_src_on_stream_disconnect_get_type (void); + G_END_DECLS #endif /* __GST_PIPEWIRE_SRC_H__ */ diff --git a/src/gst/gstpipewirestream.h b/src/gst/gstpipewirestream.h index a301375c7..23dc996a9 100644 --- a/src/gst/gstpipewirestream.h +++ b/src/gst/gstpipewirestream.h @@ -31,6 +31,7 @@ struct _GstPipeWireStream { GstClock *clock; guint64 position; + guint64 buf_duration; struct spa_dll dll; double err_avg, err_var, err_wdw; guint64 last_ts; @@ -41,6 +42,8 @@ struct _GstPipeWireStream { struct pw_stream *pwstream; struct spa_hook pwstream_listener; + struct spa_io_position *io_position; + /* common properties */ int fd; gchar *path;