Merge branch 'gst-fixes-backport-1.4' into '1.4'

[1.4] Backport all the GStreamer fixes

See merge request pipewire/pipewire!2416
This commit is contained in:
Arun Raghavan 2025-10-17 14:13:55 +00:00
commit f1dbff3d7d
9 changed files with 600 additions and 78 deletions

View file

@ -410,6 +410,7 @@ gst_deps_def = {
gst_dep = []
gst_dma_drm_found = false
gst_shm_allocator_found = false
foreach depname, kwargs: gst_deps_def
dep = dependency(depname, required: gst_option, kwargs: kwargs)
summary({depname: dep.found()}, bool_yn: true, section: 'GStreamer modules')
@ -425,9 +426,13 @@ foreach depname, kwargs: gst_deps_def
if depname == 'gstreamer-allocators-1.0' and dep.version().version_compare('>= 1.23.1')
gst_dma_drm_found = true
gst_shm_allocator_found = true
endif
endforeach
summary({'gstreamer SHM allocator': gst_shm_allocator_found}, bool_yn: true, section: 'Backend')
cdata.set('HAVE_GSTREAMER_SHM_ALLOCATOR', gst_shm_allocator_found)
# This code relies on the array being empty if any dependency was not found
gst_dp_found = gst_dep.length() > 0
summary({'gstreamer-device-provider': gst_dp_found}, bool_yn: true, section: 'Backend')

View file

@ -1665,8 +1665,8 @@ impl_node_port_use_buffers(void *object,
b->buf = buffers[i];
if (n_datas != port->blocks) {
spa_log_error(this->log, "%p: invalid blocks %d on buffer %d",
this, n_datas, i);
spa_log_error(this->log, "%p: invalid blocks %d on buffer %d, expected %d",
this, n_datas, i, port->blocks);
return -EINVAL;
}
if (SPA_FLAG_IS_SET(flags, SPA_NODE_BUFFERS_FLAG_ALLOC)) {

View file

@ -10,8 +10,12 @@
#include <gst/allocators/gstfdmemory.h>
#include <gst/allocators/gstdmabuf.h>
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
#include <gst/allocators/gstshmallocator.h>
#endif
#include <gst/video/gstvideometa.h>
#include <gst/video/gstvideopool.h>
#include "gstpipewirepool.h"
@ -61,13 +65,42 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
GstBuffer *buf;
uint32_t i;
GstPipeWirePoolData *data;
/* Default to a large enough value */
gsize plane_0_size = pool->has_rawvideo ?
pool->video_info.size :
(gsize) pool->video_info.width * pool->video_info.height;
gsize plane_sizes[GST_VIDEO_MAX_PLANES] = { plane_0_size, };
GST_DEBUG_OBJECT (pool, "wrap buffer");
GST_DEBUG_OBJECT (pool, "wrap buffer, datas:%d", b->buffer->n_datas);
data = g_slice_new (GstPipeWirePoolData);
buf = gst_buffer_new ();
if (pool->add_metavideo) {
GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf,
GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (&pool->video_info),
GST_VIDEO_INFO_WIDTH (&pool->video_info),
GST_VIDEO_INFO_HEIGHT (&pool->video_info),
GST_VIDEO_INFO_N_PLANES (&pool->video_info),
pool->video_info.offset,
pool->video_info.stride);
gst_video_meta_set_alignment (meta, pool->video_align);
if (!gst_video_meta_get_plane_size (meta, plane_sizes)) {
GST_ERROR_OBJECT (pool, "could not compute plane sizes");
}
/*
* We need to set the video meta as pooled, else gst_buffer_pool_release_buffer
* will call reset_buffer and the default_reset_buffer implementation for
* GstBufferPool removes all metadata without the POOLED flag.
*/
GST_META_FLAG_SET (meta, GST_META_FLAG_POOLED);
}
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *gmem = NULL;
@ -75,7 +108,51 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
GST_DEBUG_OBJECT (pool, "wrap data (%s %d) %d %d",
spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type,
d->mapoffset, d->maxsize);
if (d->type == SPA_DATA_MemFd) {
if (pool->allocate_memory) {
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gsize block_size = d->maxsize;
if (pool->has_video) {
/* For video, we know block sizes from the video info already */
block_size = plane_sizes[i];
} else {
/* For audio, reserve space based on the quantum limit and channel count */
g_autoptr (GstPipeWireStream) s = g_weak_ref_get (&pool->stream);
struct pw_context *context = pw_core_get_context(pw_stream_get_core(s->pwstream));
const struct pw_properties *props = pw_context_get_properties(context);
uint32_t quantum_limit = 8192; /* "reasonable" default */
const char *quantum = spa_dict_lookup(&props->dict, "clock.quantum-limit");
if (!quantum) {
quantum = spa_dict_lookup(&props->dict, "default.clock.quantum-limit");
GST_DEBUG_OBJECT (pool, "using default quantum limit %s", quantum);
}
if (quantum)
spa_atou32(quantum, &quantum_limit, 0);
GST_DEBUG_OBJECT (pool, "quantum limit %s", quantum);
block_size = quantum_limit * pool->audio_info.bpf;
}
GST_DEBUG_OBJECT (pool, "setting block size %zu", block_size);
if (!pool->shm_allocator)
pool->shm_allocator = gst_shm_allocator_get();
/* use MemFd only. That is the only supported data type when memory is remote i.e. allocated by the client */
gmem = gst_allocator_alloc (pool->shm_allocator, block_size, NULL);
d->fd = gst_fd_memory_get_fd (gmem);
d->mapoffset = 0;
d->flags = SPA_DATA_FLAG_READWRITE | SPA_DATA_FLAG_MAPPABLE;
d->type = SPA_DATA_MemFd;
d->maxsize = block_size;
d->data = NULL;
#endif
} else if (d->type == SPA_DATA_MemFd) {
gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup(d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
@ -89,18 +166,21 @@ void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
}
else {
GST_WARNING_OBJECT (pool, "unknown data type (%s %d)",
spa_debug_type_find_short_name(spa_type_data_type, d->type), d->type);
}
if (gmem)
gst_buffer_insert_memory (buf, i, gmem);
}
if (pool->add_metavideo) {
gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (&pool->video_info),
GST_VIDEO_INFO_WIDTH (&pool->video_info),
GST_VIDEO_INFO_HEIGHT (&pool->video_info),
GST_VIDEO_INFO_N_PLANES (&pool->video_info),
pool->video_info.offset,
pool->video_info.stride);
if (pool->add_metavideo && !pool->allocate_memory) {
/* Set memory sizes to expected plane sizes, so we know the valid size,
* and the offsets in the meta make sense */
for (i = 0; i < gst_buffer_n_memory (buf); i++) {
GstMemory *mem = gst_buffer_peek_memory (buf, i);
gst_memory_resize (mem, 0, plane_sizes[i]);
}
}
data->pool = gst_object_ref (pool);
@ -133,7 +213,8 @@ void gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, struct pw_buffer *b
data->crop = NULL;
data->videotransform = NULL;
gst_buffer_remove_all_memory (data->buf);
if (!pool->allocate_memory)
gst_buffer_remove_all_memory (data->buf);
/* this will also destroy the pool data, if this is the last reference */
gst_clear_buffer (&data->buf);
@ -212,7 +293,13 @@ no_more_buffers:
static const gchar **
get_options (GstBufferPool * pool)
{
static const gchar *options[] = { GST_BUFFER_POOL_OPTION_VIDEO_META, NULL };
static const gchar *options[] = {
GST_BUFFER_POOL_OPTION_VIDEO_META,
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT,
#endif
NULL
};
return options;
}
@ -223,7 +310,7 @@ set_config (GstBufferPool * pool, GstStructure * config)
GstCaps *caps;
GstStructure *structure;
guint size, min_buffers, max_buffers;
gboolean has_video;
gboolean has_videoalign;
if (!gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers)) {
GST_WARNING_OBJECT (pool, "invalid config");
@ -235,18 +322,63 @@ set_config (GstBufferPool * pool, GstStructure * config)
return FALSE;
}
/* We don't support unlimited buffers */
if (max_buffers == 0)
max_buffers = PIPEWIRE_POOL_MAX_BUFFERS;
/* Pick a sensible min to avoid starvation */
if (min_buffers == 0)
min_buffers = PIPEWIRE_POOL_MIN_BUFFERS;
if (min_buffers < PIPEWIRE_POOL_MIN_BUFFERS || max_buffers > PIPEWIRE_POOL_MAX_BUFFERS)
return FALSE;
structure = gst_caps_get_structure (caps, 0);
if (g_str_has_prefix (gst_structure_get_name (structure), "video/") ||
g_str_has_prefix (gst_structure_get_name (structure), "image/")) {
has_video = TRUE;
p->has_video = TRUE;
gst_video_info_from_caps (&p->video_info, caps);
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (p->video_info.finfo)
#ifdef HAVE_GSTREAMER_DMA_DRM
&& GST_VIDEO_FORMAT_INFO_FORMAT (p->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM
#endif
)
p->has_rawvideo = TRUE;
else
p->has_rawvideo = FALSE;
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
if (p->has_rawvideo) {
gst_video_alignment_reset (&p->video_align);
gst_video_info_align (&p->video_info, &p->video_align);
}
#endif
} else if (g_str_has_prefix(gst_structure_get_name(structure), "audio/")) {
p->has_video = FALSE;
gst_audio_info_from_caps(&p->audio_info, caps);
} else {
has_video = FALSE;
g_assert_not_reached ();
}
p->add_metavideo = has_video && gst_buffer_pool_config_has_option (config,
p->add_metavideo = p->has_rawvideo && gst_buffer_pool_config_has_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_META);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
has_videoalign = p->has_rawvideo && gst_buffer_pool_config_has_option (config,
GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT);
if (has_videoalign) {
gst_buffer_pool_config_get_video_alignment (config, &p->video_align);
gst_video_info_align (&p->video_info, &p->video_align);
gst_buffer_pool_config_set_video_alignment (config, &p->video_align);
GST_LOG_OBJECT (pool, "Set alignment: %u-%ux%u-%u",
p->video_align.padding_left, p->video_align.padding_right,
p->video_align.padding_top, p->video_align.padding_bottom);
}
#endif
if (p->video_info.size != 0)
size = p->video_info.size;
@ -321,6 +453,10 @@ gst_pipewire_pool_finalize (GObject * object)
g_weak_ref_set (&pool->stream, NULL);
g_object_unref (pool->fd_allocator);
g_object_unref (pool->dmabuf_allocator);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
if (pool->shm_allocator)
g_object_unref (pool->shm_allocator);
#endif
G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object);
}
@ -355,5 +491,8 @@ gst_pipewire_pool_init (GstPipeWirePool * pool)
{
pool->fd_allocator = gst_fd_allocator_new ();
pool->dmabuf_allocator = gst_dmabuf_allocator_new ();
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gst_shm_allocator_init_once();
#endif
g_cond_init (&pool->cond);
}

View file

@ -9,6 +9,7 @@
#include <gst/gst.h>
#include <gst/audio/audio.h>
#include <gst/video/video.h>
#include <pipewire/pipewire.h>
@ -18,6 +19,15 @@ G_BEGIN_DECLS
#define GST_TYPE_PIPEWIRE_POOL (gst_pipewire_pool_get_type())
G_DECLARE_FINAL_TYPE (GstPipeWirePool, gst_pipewire_pool, GST, PIPEWIRE_POOL, GstBufferPool)
#define PIPEWIRE_POOL_MIN_BUFFERS 2u
#define PIPEWIRE_POOL_MAX_BUFFERS 16u
/* Only available in GStreamer 1.22+ */
#ifndef GST_VIDEO_FORMAT_INFO_IS_VALID_RAW
#define GST_VIDEO_FORMAT_INFO_IS_VALID_RAW(info) \
(info != NULL && (info)->format > GST_VIDEO_FORMAT_ENCODED)
#endif
typedef struct _GstPipeWirePoolData GstPipeWirePoolData;
struct _GstPipeWirePoolData {
GstPipeWirePool *pool;
@ -37,14 +47,20 @@ struct _GstPipeWirePool {
GWeakRef stream;
guint n_buffers;
gboolean has_video;
gboolean has_rawvideo;
gboolean add_metavideo;
GstAudioInfo audio_info;
GstVideoInfo video_info;
GstVideoAlignment video_align;
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
GstAllocator *shm_allocator;
GCond cond;
gboolean paused;
gboolean allocate_memory;
};
enum GstPipeWirePoolMode {

View file

@ -40,7 +40,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug);
#define DEFAULT_PROP_SLAVE_METHOD GST_PIPEWIRE_SINK_SLAVE_METHOD_NONE
#define DEFAULT_PROP_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
#define MIN_BUFFERS 8u
#define MAX_ERROR_MS 1
#define RESYNC_TIMEOUT_MS 10
enum
{
@ -167,7 +168,8 @@ gst_pipewire_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
GstPipeWireSink *pwsink = GST_PIPEWIRE_SINK (bsink);
if (pwsink->use_bufferpool != USE_BUFFERPOOL_NO)
gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0, 0, 0);
gst_query_add_allocation_pool (query, GST_BUFFER_POOL_CAST (pwsink->stream->pool), 0,
PIPEWIRE_POOL_MIN_BUFFERS, PIPEWIRE_POOL_MAX_BUFFERS);
gst_query_add_allocation_meta (query, GST_VIDEO_META_API_TYPE, NULL);
return TRUE;
@ -242,7 +244,8 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass)
GST_TYPE_PIPEWIRE_SINK_MODE,
DEFAULT_PROP_MODE,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
G_PARAM_STATIC_STRINGS |
GST_PARAM_MUTABLE_READY));
g_object_class_install_property (gobject_class,
PROP_FD,
@ -310,21 +313,38 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink)
config = gst_buffer_pool_get_config (GST_BUFFER_POOL (pool));
gst_buffer_pool_config_get_params (config, &caps, &size, &min_buffers, &max_buffers);
/* We cannot dynamically grow the pool */
if (max_buffers == 0) {
GST_WARNING_OBJECT (sink, "cannot support unlimited buffers in pool");
max_buffers = PIPEWIRE_POOL_MAX_BUFFERS;
}
spa_pod_builder_init (&b, buffer, sizeof (buffer));
spa_pod_builder_push_object (&b, &f, SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers);
spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_size, SPA_POD_CHOICE_RANGE_Int(size, size, INT32_MAX),
0);
if (sink->is_rawvideo) {
/* MUST have n_datas == n_planes */
spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_blocks,
SPA_POD_Int(GST_VIDEO_INFO_N_PLANES (&pool->video_info)),
0);
} else {
/* Non-planar data, get a single block */
spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_blocks,
SPA_POD_Int(1),
0);
}
spa_pod_builder_add (&b,
SPA_PARAM_BUFFERS_stride, SPA_POD_CHOICE_RANGE_Int(0, 0, INT32_MAX),
/* At this stage, we will request as many buffers as we _might_ need as
* the default, since we can't grow the pool once this is set */
SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(
SPA_MAX(MIN_BUFFERS, min_buffers),
SPA_MAX(MIN_BUFFERS, min_buffers),
max_buffers ? max_buffers : INT32_MAX),
SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(
(1<<SPA_DATA_MemFd) |
(1<<SPA_DATA_MemPtr)),
max_buffers, min_buffers, max_buffers),
SPA_PARAM_BUFFERS_dataType, SPA_POD_CHOICE_FLAGS_Int(1<<SPA_DATA_MemFd),
0);
port_params[n_params++] = spa_pod_builder_pop (&b, &f);
@ -333,7 +353,7 @@ gst_pipewire_sink_update_params (GstPipeWireSink *sink)
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Header),
SPA_PARAM_META_size, SPA_POD_Int(sizeof (struct spa_meta_header)));
if (sink->is_video) {
if (sink->is_rawvideo) {
port_params[n_params++] = spa_pod_builder_add_object (&b,
SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta,
SPA_PARAM_META_type, SPA_POD_Id(SPA_META_VideoCrop),
@ -359,7 +379,8 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
sink->mode = DEFAULT_PROP_MODE;
sink->use_bufferpool = DEFAULT_PROP_USE_BUFFERPOOL;
sink->is_video = false;
sink->is_rawvideo = false;
sink->first_buffer = true;
GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
@ -377,7 +398,7 @@ gst_pipewire_sink_sink_fixate (GstBaseSink * bsink, GstCaps * caps)
structure = gst_caps_get_structure (caps, 0);
if (gst_structure_has_name (structure, "video/x-raw")) {
pwsink->is_video = true;
pwsink->is_rawvideo = true;
gst_structure_fixate_field_nearest_int (structure, "width", 320);
gst_structure_fixate_field_nearest_int (structure, "height", 240);
gst_structure_fixate_field_nearest_fraction (structure, "framerate", 30, 1);
@ -591,14 +612,17 @@ static void
on_remove_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = _data;
GST_DEBUG_OBJECT (pwsink, "remove pw_buffer %p", b);
gst_pipewire_pool_remove_buffer (pwsink->stream->pool, b);
if (!gst_pipewire_pool_has_buffers (pwsink->stream->pool) &&
!GST_BUFFER_POOL_IS_FLUSHING (GST_BUFFER_POOL_CAST (pwsink->stream->pool))) {
GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND,
("all buffers have been removed"),
("PipeWire link to remote node was destroyed"));
if (pwsink->mode != GST_PIPEWIRE_SINK_MODE_PROVIDE) {
GST_ELEMENT_ERROR (pwsink, RESOURCE, NOT_FOUND,
("all buffers have been removed"),
("PipeWire link to remote node was destroyed"));
}
}
}
@ -633,6 +657,9 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
}
}
data->b->size = 0;
spa_assert(b->n_datas == gst_buffer_n_memory(buffer));
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
@ -646,16 +673,20 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
GstVideoMeta *meta = gst_buffer_get_video_meta (buffer);
if (meta) {
if (meta->n_planes == b->n_datas) {
uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (&data->pool->video_info);
gsize video_size = 0;
for (i = 0; i < meta->n_planes; i++) {
for (i = 0; i < n_planes; i++) {
struct spa_data *d = &b->datas[i];
d->chunk->offset += meta->offset[i] - video_size;
d->chunk->stride = meta->stride[i];
d->chunk->offset = meta->offset[i] - video_size;
video_size += d->chunk->size;
}
} else {
GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u", meta->n_planes, b->n_datas);
GST_ERROR_OBJECT (pwsink, "plane num not matching, meta:%u buffer:%u",
meta->n_planes, b->n_datas);
}
}
@ -663,7 +694,18 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
GST_WARNING_OBJECT (pwsink, "can't send buffer %s", spa_strerror(res));
} else {
data->queued = TRUE;
GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p; gstbuffer %p ",data->b, buffer);
GST_LOG_OBJECT(pwsink, "queued pwbuffer: %p size: %"PRIu64"; gstbuffer %p",
data->b, data->b->size, buffer);
if (pwsink->first_buffer) {
pwsink->first_buffer = false;
pwsink->first_buffer_pts = GST_BUFFER_PTS(buffer);
}
stream->position = gst_util_uint64_scale_int(GST_BUFFER_PTS(buffer) - pwsink->first_buffer_pts,
pwsink->rate, 1 * GST_SECOND);
// have the buffer duration value minimum as 1, in case of video where rate is 0 (not applicable)
stream->buf_duration = SPA_MAX((uint64_t)1, gst_util_uint64_scale_int(GST_BUFFER_DURATION(buffer),
pwsink->rate, 1 * GST_SECOND));
}
switch (pwsink->slave_method) {
@ -675,6 +717,56 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
}
}
static void update_time (GstPipeWireSink *pwsink)
{
struct spa_io_position *p = pwsink->stream->io_position;
double err = 0.0, corr = 1.0;
guint64 now;
double max_err = pwsink->rate * MAX_ERROR_MS/1000.0;
double resync_timeout = pwsink->rate * RESYNC_TIMEOUT_MS/1000.0;
if (pwsink->first_buffer) {
// use the target duration before the first buffer
pwsink->stream->buf_duration = p->clock.target_duration;
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration,
pwsink->rate);
}
now = pw_stream_get_nsec(pwsink->stream->pwstream);
err = (double)gst_util_uint64_scale(now, pwsink->rate, 1 * GST_SECOND) -
(double)gst_util_uint64_scale(p->clock.next_nsec, pwsink->rate, 1 * GST_SECOND);
GST_LOG_OBJECT(pwsink, "err is %f max err is %f now %"PRIu64" next is %"PRIu64"", err, max_err, now,
p->clock.next_nsec);
if (fabs(err) > max_err) {
if (fabs(err) > resync_timeout) {
GST_WARNING_OBJECT(pwsink, "err %f exceeds resync timeout, resetting", err);
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, pwsink->stream->buf_duration,
pwsink->rate);
err = 0.0;
} else {
err = SPA_CLAMPD(err, -max_err, max_err);
}
}
corr = spa_dll_update(&pwsink->stream->dll, err);
p->clock.nsec = now;
p->clock.position = pwsink->stream->position;
p->clock.duration = pwsink->stream->buf_duration;
/* we don't have a way to estimate the target (next cycle) buffer duration
* so use the current buffer duration
*/
p->clock.target_duration = pwsink->stream->buf_duration;
p->clock.rate = SPA_FRACTION(1, pwsink->rate);
// current time plus duration scaled with correlation
p->clock.next_nsec = now + (uint64_t)(p->clock.duration / corr * GST_SECOND / pwsink->rate);
p->clock.rate_diff = corr;
GST_DEBUG_OBJECT(pwsink, "now %"PRIu64", position %"PRIu64", duration %"PRIu64", rate :%d,"
"next : %"PRIu64", delay is %"PRIi64", rate_diff is %f", p->clock.nsec, p->clock.position,
p->clock.duration, pwsink->rate, p->clock.next_nsec, p->clock.delay,p->clock.rate_diff);
}
static void
on_process (void *data)
@ -684,6 +776,23 @@ on_process (void *data)
g_cond_signal (&pwsink->stream->pool->cond);
}
static int invoke_trigger_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
GstPipeWireSink *pwsink = user_data;
/* Note: We cannot use the rate for computation of other clock params
* in case of video because the rate for video is set as 0 in the _setcaps.
* So skip update time for video (i.e. when rate is 0). The video buffers
* get timestamp from the SPA_META_Header anyway
*/
if (pwsink->rate)
update_time(pwsink);
return pw_stream_trigger_process(pwsink->stream->pwstream);
}
static void
on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state state, const char *error)
{
@ -699,7 +808,8 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
break;
case PW_STREAM_STATE_STREAMING:
if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream);
pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream),
invoke_trigger_process, 1, NULL, 0 , false, pwsink);
break;
case PW_STREAM_STATE_ERROR:
/* make the error permanent, if it is not already;
@ -759,9 +869,21 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
if (pwsink->use_bufferpool != USE_BUFFERPOOL_YES)
pwsink->use_bufferpool = USE_BUFFERPOOL_NO;
} else {
GstVideoInfo video_info;
pwsink->rate = rate = 0;
pwsink->rate_match = false;
pwsink->is_video = true;
gst_video_info_from_caps (&video_info, caps);
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (video_info.finfo)
#ifdef HAVE_GSTREAMER_DMA_DRM
&& GST_VIDEO_FORMAT_INFO_FORMAT (video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM
#endif
)
pwsink->is_rawvideo = TRUE;
else
pwsink->is_rawvideo = FALSE;
}
spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate);
@ -788,6 +910,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
else
flags |= PW_STREAM_FLAG_DRIVER;
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
flags |= PW_STREAM_FLAG_ALLOC_BUFFERS;
pwsink->stream->pool->allocate_memory = true;
#endif
target_id = pwsink->stream->path ? (uint32_t)atoi(pwsink->stream->path) : PW_ID_ANY;
if (pwsink->stream->target_object) {
@ -841,8 +968,12 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool));
gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers);
gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers);
if(pwsink->is_video)
gst_buffer_pool_config_add_option(config, GST_BUFFER_POOL_OPTION_VIDEO_META);
if (pwsink->is_rawvideo) {
gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_META);
#ifdef HAVE_GSTREAMER_SHM_ALLOCATOR
gst_buffer_pool_config_add_option (config, GST_BUFFER_POOL_OPTION_VIDEO_ALIGNMENT);
#endif
}
gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->stream->pool), config);
pw_thread_loop_unlock (pwsink->stream->core->loop);
@ -924,20 +1055,17 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
if (res != GST_FLOW_OK)
goto done;
if (pwsink->is_video) {
if (pwsink->is_rawvideo) {
GstVideoFrame src, dst;
gboolean copied = FALSE;
buf_size = 0; // to break from the loop
/*
splitting of buffers in the case of video might break the frame layout
and that seems to be causing issues while retrieving the buffers on the receiver
side. Hence use the video_frame_map to copy the buffer of bigger size into the
pipewirepool's buffer
*/
/* splitting of buffers in the case of video might break the frame layout
* and that seems to be causing issues while retrieving the buffers on the receiver
* side. Hence use the video_frame_map to copy the buffer of bigger size into the
* pipewirepool's buffer */
if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b,
GST_MAP_WRITE)) {
if (!gst_video_frame_map (&dst, &pwsink->stream->pool->video_info, b, GST_MAP_WRITE)) {
GST_ERROR_OBJECT(pwsink, "Failed to map dest buffer");
return GST_FLOW_ERROR;
}
@ -957,8 +1085,6 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GST_ERROR_OBJECT(pwsink, "Failed to copy the frame");
return GST_FLOW_ERROR;
}
gst_buffer_copy_into(b, buffer, GST_BUFFER_COPY_METADATA, 0, -1);
} else {
gst_buffer_map (b, &info, GST_MAP_WRITE);
gsize extract_size = (buf_size <= info.maxsize) ? buf_size: info.maxsize;
@ -980,7 +1106,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_buffer_unref (b);
if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream);
pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream),
invoke_trigger_process, 1, NULL, 0 , false, pwsink);
}
} else {
GST_TRACE_OBJECT(pwsink, "Buffer is from pipewirepool");
@ -988,7 +1115,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
do_send_buffer (pwsink, buffer);
if (pw_stream_is_driving (pwsink->stream->pwstream))
pw_stream_trigger_process (pwsink->stream->pwstream);
pw_loop_invoke(pw_stream_get_data_loop(pwsink->stream->pwstream),
invoke_trigger_process, 1, NULL, 0 , false, pwsink);
}
done_unlock:
@ -1002,6 +1130,18 @@ not_negotiated:
}
}
static void
on_io_changed(void *data, uint32_t id, void *area, uint32_t size)
{
GstPipeWireSink *pwsink = data;
switch (id) {
case SPA_IO_Position:
pwsink->stream->io_position = area;
break;
}
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_state_changed,
@ -1009,6 +1149,7 @@ static const struct pw_stream_events stream_events = {
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.process = on_process,
.io_changed = on_io_changed,
};
static GstStateChangeReturn
@ -1023,6 +1164,14 @@ gst_pipewire_sink_change_state (GstElement * element, GstStateChange transition)
goto open_failed;
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* If we are a driver, we shouldn't try to also provide the clock, as we
* _are_ the clock for the graph. For that case, we rely on the pipeline
* clock to drive the pipeline (and thus the graph). */
if (this->mode == GST_PIPEWIRE_SINK_MODE_PROVIDE)
GST_OBJECT_FLAG_UNSET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
else
GST_OBJECT_FLAG_SET (this, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
/* the initial stream state is active, which is needed for linking and
* negotiation to happen and the bufferpool to be set up. We don't know
* if we'll go to plaing, so we deactivate the stream until that

View file

@ -69,7 +69,9 @@ struct _GstPipeWireSink {
gboolean negotiated;
gboolean rate_match;
gint rate;
gboolean is_video;
gboolean is_rawvideo;
gboolean first_buffer;
GstClockTime first_buffer_pts;
GstPipeWireSinkMode mode;
GstPipeWireSinkSlaveMethod slave_method;

View file

@ -46,7 +46,9 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug);
#define DEFAULT_RESEND_LAST false
#define DEFAULT_KEEPALIVE_TIME 0
#define DEFAULT_AUTOCONNECT true
#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
#define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE
#define DEFAULT_PROVIDE_CLOCK TRUE
enum
{
@ -64,8 +66,29 @@ enum
PROP_KEEPALIVE_TIME,
PROP_AUTOCONNECT,
PROP_USE_BUFFERPOOL,
PROP_ON_DISCONNECT,
PROP_PROVIDE_CLOCK,
};
GType
gst_pipewire_src_on_disconnect_get_type (void)
{
static gsize on_disconnect_type = 0;
static const GEnumValue on_disconnect[] = {
{GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, "GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE", "none"},
{GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, "GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS", "eos"},
{GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, "GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR", "error"},
{0, NULL, NULL},
};
if (g_once_init_enter (&on_disconnect_type)) {
GType tmp =
g_enum_register_static ("GstPipeWireSrcOnDisconnect", on_disconnect);
g_once_init_leave (&on_disconnect_type, tmp);
}
return (GType) on_disconnect_type;
}
static GstStaticPadTemplate gst_pipewire_src_template =
GST_STATIC_PAD_TEMPLATE ("src",
@ -170,6 +193,20 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id,
pwsrc->use_bufferpool = USE_BUFFERPOOL_NO;
break;
case PROP_ON_DISCONNECT:
pwsrc->on_disconnect = g_value_get_enum (value);
break;
case PROP_PROVIDE_CLOCK:
gboolean provide = g_value_get_boolean (value);
GST_OBJECT_LOCK (pwsrc);
if (provide)
GST_OBJECT_FLAG_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
else
GST_OBJECT_FLAG_UNSET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
GST_OBJECT_UNLOCK (pwsrc);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -235,6 +272,18 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id,
g_value_set_boolean (value, !!pwsrc->use_bufferpool);
break;
case PROP_ON_DISCONNECT:
g_value_set_enum (value, pwsrc->on_disconnect);
break;
case PROP_PROVIDE_CLOCK:
gboolean result;
GST_OBJECT_LOCK (pwsrc);
result = GST_OBJECT_FLAG_IS_SET (pwsrc, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
GST_OBJECT_UNLOCK (pwsrc);
g_value_set_boolean (value, result);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@ -414,6 +463,25 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_ON_DISCONNECT,
g_param_spec_enum ("on-disconnect",
"On disconnect",
"Action to take on disconnect",
GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT,
DEFAULT_ON_DISCONNECT,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_PROVIDE_CLOCK,
g_param_spec_boolean ("provide-clock",
"Provide Clock",
"Provide a clock to be used as the global pipeline clock",
DEFAULT_PROVIDE_CLOCK,
G_PARAM_READWRITE |
G_PARAM_STATIC_STRINGS));
gstelement_class->provide_clock = gst_pipewire_src_provide_clock;
gstelement_class->change_state = gst_pipewire_src_change_state;
gstelement_class->send_event = gst_pipewire_src_send_event;
@ -462,6 +530,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
src->autoconnect = DEFAULT_AUTOCONNECT;
src->min_latency = 0;
src->max_latency = GST_CLOCK_TIME_NONE;
src->n_buffers = 0;
src->flushing_on_remove_buffer = FALSE;
src->on_disconnect = DEFAULT_ON_DISCONNECT;
src->transform_value = UINT32_MAX;
}
@ -469,11 +540,26 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
static gboolean
buffer_recycle (GstMiniObject *obj)
{
GstPipeWireSrc *src;
GstPipeWirePoolData *data;
GstPipeWirePoolData *data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
GstPipeWireSrc *src = data->owner;
int res;
data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
if (src->flushing_on_remove_buffer) {
/*
* If a flush-start was initiated, this might be called by elements like
* queues downstream purging buffers from their internal queues. This can
* deadlock if queues use min-threshold-buffers/bytes/time with src_create
* trying to take the loop lock and buffer_recycle trying to take the loop
* lock down below. We return from here, to prevent deadlock with streaming
* thread in a queue thread.
*
* We will take care of queueing the buffer in on_remove_buffer.
*/
GstBuffer *buffer = GST_BUFFER_CAST(obj);
GST_DEBUG_OBJECT (src,
"flush-start initiated, skipping buffer recycle %p", buffer);
return TRUE;
}
GST_OBJECT_LOCK (data->pool);
if (!obj->dispose) {
@ -482,7 +568,6 @@ buffer_recycle (GstMiniObject *obj)
}
GST_BUFFER_FLAGS (obj) = data->flags;
src = data->owner;
pw_thread_loop_lock (src->stream->core->loop);
if (!obj->dispose) {
@ -519,6 +604,8 @@ on_add_buffer (void *_data, struct pw_buffer *b)
data->owner = pwsrc;
data->queued = TRUE;
GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle;
pwsrc->n_buffers++;
}
static void
@ -527,17 +614,76 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
GstPipeWireSrc *pwsrc = _data;
GstPipeWirePoolData *data = b->user_data;
GstBuffer *buf = data->buf;
gboolean flush_on_remove;
int res;
GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf);
GST_DEBUG_OBJECT (pwsrc, "remove buffer %p, queued: %d",
buf, data->queued);
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
flush_on_remove =
pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR ||
pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS;
if (flush_on_remove && !pwsrc->flushing_on_remove_buffer) {
pwsrc->flushing_on_remove_buffer = TRUE;
GST_DEBUG_OBJECT (pwsrc, "flush-start on remove buffer");
/*
* It is possible that when buffers are being removed, a downstream
* element can be holding on to a buffer or in the middle of rendering
* the same. Former is possible with queues min-threshold-buffers or
* similar. Latter can result in a crash during gst_video_frame_copy.
*
* We send a flush-start event downstream to make elements discard
* any buffers they may be holding on to as well as return from their
* chain function ASAP.
*/
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_flush_start ());
}
if (data->queued) {
gst_buffer_unref (buf);
} else {
if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0)
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res));
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s",
buf, spa_strerror(res));
else
GST_DEBUG_OBJECT (pwsrc, "queued buffer %p", buf);
}
pwsrc->n_buffers--;
if (pwsrc->n_buffers == 0) {
GST_DEBUG_OBJECT (pwsrc, "removed all buffers");
pwsrc->flushing_on_remove_buffer = FALSE;
switch (pwsrc->on_disconnect) {
case GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR:
GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers");
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_flush_stop (FALSE));
GST_ELEMENT_ERROR (pwsrc, RESOURCE, NOT_FOUND,
("all buffers have been removed"),
("PipeWire link to remote node was destroyed"));
break;
case GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS:
GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers");
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_flush_stop (FALSE));
GST_DEBUG_OBJECT (pwsrc, "sending eos downstream");
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
gst_event_new_eos());
break;
case GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE:
GST_DEBUG_OBJECT (pwsrc, "stream closed or removed");
break;
}
}
}
@ -660,9 +806,12 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
pwsrc->transform_value = transform_value;
}
if (pwsrc->is_video) {
gsize video_size = 0;
if (pwsrc->is_rawvideo) {
GstVideoInfo *info = &pwsrc->video_info;
uint32_t n_datas = b->buffer->n_datas;
uint32_t n_planes = GST_VIDEO_INFO_N_PLANES (info);
gsize video_size = 0;
GstVideoMeta *meta = gst_buffer_add_video_meta_full (buf, GST_VIDEO_FRAME_FLAG_NONE,
GST_VIDEO_INFO_FORMAT (info),
GST_VIDEO_INFO_WIDTH (info),
@ -671,7 +820,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
info->offset,
info->stride);
for (i = 0; i < MIN (b->buffer->n_datas, GST_VIDEO_MAX_PLANES); i++) {
for (i = 0; i < MIN (n_datas, n_planes); i++) {
struct spa_data *d = &b->buffer->datas[i];
meta->offset[i] = video_size;
meta->stride[i] = d->chunk->stride;
@ -680,6 +829,10 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
}
}
if (b->buffer->n_datas != gst_buffer_n_memory(data->buf)) {
GST_ERROR_OBJECT(pwsrc, "n_datas != n_memory, (%d != %d)", b->buffer->n_datas, gst_buffer_n_memory(data->buf));
}
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
@ -730,13 +883,29 @@ on_state_changed (void *data,
enum pw_stream_state state, const char *error)
{
GstPipeWireSrc *pwsrc = data;
GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state;
GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state));
GST_DEBUG_OBJECT (pwsrc, "got stream state %s", pw_stream_state_as_string (state));
switch (state) {
case PW_STREAM_STATE_UNCONNECTED:
case PW_STREAM_STATE_CONNECTING:
break;
case PW_STREAM_STATE_PAUSED:
/*
* We may see a driver/quantum/clock rate change on switching audio
* sources. The same is not applicable for video.
*
* We post the clock lost message here to take care of a possible
* jump or shift in base_time/clock for the pipeline. Application
* must handle the clock lost message in it's bus handler by pausing
* the pipeline and then setting it back to playing.
*/
if (current_state == GST_STATE_PLAYING && !pwsrc->is_video)
gst_element_post_message (GST_ELEMENT_CAST (pwsrc),
gst_message_new_clock_lost (GST_OBJECT_CAST (pwsrc),
GST_CLOCK_CAST (pwsrc->stream->clock)));
break;
case PW_STREAM_STATE_STREAMING:
break;
case PW_STREAM_STATE_ERROR:
@ -982,7 +1151,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s",
pwsrc->stream->path, pwsrc->stream->target_object);
pwsrc->possible_caps = possible_caps;
pwsrc->possible_caps = gst_caps_ref (possible_caps);
pwsrc->negotiated = FALSE;
enum pw_stream_flags flags;
@ -1019,7 +1188,6 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
}
negotiated_caps = g_steal_pointer (&pwsrc->caps);
pwsrc->possible_caps = NULL;
pw_thread_loop_unlock (pwsrc->stream->core->loop);
if (negotiated_caps == NULL)
@ -1140,10 +1308,21 @@ handle_format_change (GstPipeWireSrc *pwsrc,
pw_stream_set_error (pwsrc->stream->pwstream, -EINVAL, "internal error");
return;
}
pwsrc->is_rawvideo = TRUE;
} else {
gst_video_info_dma_drm_init (&pwsrc->drm_info);
#endif
gst_video_info_from_caps (&pwsrc->video_info, pwsrc->caps);
if (GST_VIDEO_FORMAT_INFO_IS_VALID_RAW (pwsrc->video_info.finfo)
#ifdef HAVE_GSTREAMER_DMA_DRM
&& GST_VIDEO_FORMAT_INFO_FORMAT (pwsrc->video_info.finfo) != GST_VIDEO_FORMAT_DMA_DRM
#endif
)
pwsrc->is_rawvideo = TRUE;
else
pwsrc->is_rawvideo = FALSE;
#ifdef HAVE_GSTREAMER_DMA_DRM
}
#endif
@ -1156,6 +1335,7 @@ handle_format_change (GstPipeWireSrc *pwsrc,
} else {
pwsrc->negotiated = FALSE;
pwsrc->is_video = FALSE;
pwsrc->is_rawvideo = FALSE;
}
if (pwsrc->caps) {
@ -1330,6 +1510,8 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
GstBuffer *buf;
gboolean update_time = FALSE, timeout = FALSE;
GstCaps *caps = NULL;
struct timespec abstime = { 0, };
bool have_abstime = false;
pwsrc = GST_PIPEWIRE_SRC (psrc);
@ -1373,13 +1555,11 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
update_time = TRUE;
GST_LOG_OBJECT (pwsrc, "EOS, send last buffer");
break;
} else if (timeout) {
if (pwsrc->last_buffer != NULL) {
update_time = TRUE;
buf = gst_buffer_ref(pwsrc->last_buffer);
GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer");
break;
}
} else if (timeout && pwsrc->last_buffer != NULL) {
update_time = TRUE;
buf = gst_buffer_ref(pwsrc->last_buffer);
GST_LOG_OBJECT (pwsrc, "timeout, send keepalive buffer");
break;
} else {
buf = dequeue_buffer (pwsrc);
GST_LOG_OBJECT (pwsrc, "popped buffer %p", buf);
@ -1391,9 +1571,13 @@ gst_pipewire_src_create (GstPushSrc * psrc, GstBuffer ** buffer)
}
timeout = FALSE;
if (pwsrc->keepalive_time > 0) {
struct timespec abstime;
pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime,
pwsrc->keepalive_time * SPA_NSEC_PER_MSEC);
if (!have_abstime) {
/* Record the time we want to timeout at once, for this loop -- the loop might get unrelated signal()s,
* and we don't want the keepalive time to get reset by that */
pw_thread_loop_get_time(pwsrc->stream->core->loop, &abstime,
pwsrc->keepalive_time * SPA_NSEC_PER_MSEC);
have_abstime = TRUE;
}
if (pw_thread_loop_timed_wait_full (pwsrc->stream->core->loop, &abstime) == -ETIMEDOUT)
timeout = TRUE;
} else {
@ -1464,6 +1648,7 @@ gst_pipewire_src_stop (GstBaseSrc * basesrc)
pwsrc->eos = false;
gst_buffer_replace (&pwsrc->last_buffer, NULL);
gst_caps_replace(&pwsrc->caps, NULL);
gst_caps_replace(&pwsrc->possible_caps, NULL);
pwsrc->transform_value = UINT32_MAX;
pw_thread_loop_unlock (pwsrc->stream->core->loop);

View file

@ -24,6 +24,22 @@ G_BEGIN_DECLS
#define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj))
G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc)
/**
* GstPipeWireSrcOnDisconnect:
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: send EoS downstream
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: raise pipeline error
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: no action
*
* Different actions on disconnect.
*/
typedef enum
{
GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE,
GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS,
GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR,
} GstPipeWireSrcOnDisconnect;
#define GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT (gst_pipewire_src_on_disconnect_get_type ())
/**
* GstPipeWireSrc:
@ -36,6 +52,7 @@ struct _GstPipeWireSrc {
GstPipeWireStream *stream;
/*< private >*/
gint n_buffers;
gint use_bufferpool;
gint min_buffers;
gint max_buffers;
@ -47,6 +64,7 @@ struct _GstPipeWireSrc {
GstCaps *possible_caps;
gboolean is_video;
gboolean is_rawvideo;
GstVideoInfo video_info;
#ifdef HAVE_GSTREAMER_DMA_DRM
GstVideoInfoDmaDrm drm_info;
@ -56,6 +74,7 @@ struct _GstPipeWireSrc {
gboolean flushing;
gboolean started;
gboolean eos;
gboolean flushing_on_remove_buffer;
gboolean is_live;
int64_t delay;
@ -65,8 +84,12 @@ struct _GstPipeWireSrc {
GstBuffer *last_buffer;
enum spa_meta_videotransform_value transform_value;
GstPipeWireSrcOnDisconnect on_disconnect;
};
GType gst_pipewire_src_on_stream_disconnect_get_type (void);
G_END_DECLS
#endif /* __GST_PIPEWIRE_SRC_H__ */

View file

@ -31,6 +31,7 @@ struct _GstPipeWireStream {
GstClock *clock;
guint64 position;
guint64 buf_duration;
struct spa_dll dll;
double err_avg, err_var, err_wdw;
guint64 last_ts;
@ -41,6 +42,8 @@ struct _GstPipeWireStream {
struct pw_stream *pwstream;
struct spa_hook pwstream_listener;
struct spa_io_position *io_position;
/* common properties */
int fd;
gchar *path;