mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
gstpipewiresrc: Handle stream being disconnected
When PW source is used with something like Camera and the camera is disconnected, all buffers are removed and stream will be paused. When using PW sink with source, the sink side pipeline can go to EOS. This again results in all the buffers being removed and stream being paused on the source side. PW source side pipeline can also crash if the sink was in the middle of frame copying a buffer to render which got removed. Handle this scenario by sending a flush-start event at the start of buffer removal and flush-stop at the end followed by an end of stream or pipeline error depending on user selection.
This commit is contained in:
parent
e9a2406314
commit
bb1bb07f6c
2 changed files with 147 additions and 8 deletions
|
|
@ -46,7 +46,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug);
|
|||
#define DEFAULT_RESEND_LAST false
|
||||
#define DEFAULT_KEEPALIVE_TIME 0
|
||||
#define DEFAULT_AUTOCONNECT true
|
||||
#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
|
||||
#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO
|
||||
#define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE
|
||||
|
||||
enum
|
||||
{
|
||||
|
|
@ -64,8 +65,28 @@ enum
|
|||
PROP_KEEPALIVE_TIME,
|
||||
PROP_AUTOCONNECT,
|
||||
PROP_USE_BUFFERPOOL,
|
||||
PROP_ON_DISCONNECT,
|
||||
};
|
||||
|
||||
GType
|
||||
gst_pipewire_src_on_disconnect_get_type (void)
|
||||
{
|
||||
static gsize on_disconnect_type = 0;
|
||||
static const GEnumValue on_disconnect[] = {
|
||||
{GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, "GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE", "none"},
|
||||
{GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, "GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS", "eos"},
|
||||
{GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, "GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR", "error"},
|
||||
{0, NULL, NULL},
|
||||
};
|
||||
|
||||
if (g_once_init_enter (&on_disconnect_type)) {
|
||||
GType tmp =
|
||||
g_enum_register_static ("GstPipeWireSrcOnDisconnect", on_disconnect);
|
||||
g_once_init_leave (&on_disconnect_type, tmp);
|
||||
}
|
||||
|
||||
return (GType) on_disconnect_type;
|
||||
}
|
||||
|
||||
static GstStaticPadTemplate gst_pipewire_src_template =
|
||||
GST_STATIC_PAD_TEMPLATE ("src",
|
||||
|
|
@ -170,6 +191,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id,
|
|||
pwsrc->use_bufferpool = USE_BUFFERPOOL_NO;
|
||||
break;
|
||||
|
||||
case PROP_ON_DISCONNECT:
|
||||
pwsrc->on_disconnect = g_value_get_enum (value);
|
||||
break;
|
||||
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
|
|
@ -235,6 +260,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id,
|
|||
g_value_set_boolean (value, !!pwsrc->use_bufferpool);
|
||||
break;
|
||||
|
||||
case PROP_ON_DISCONNECT:
|
||||
g_value_set_enum (value, pwsrc->on_disconnect);
|
||||
break;
|
||||
|
||||
default:
|
||||
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
|
||||
break;
|
||||
|
|
@ -414,6 +443,16 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass)
|
|||
G_PARAM_READWRITE |
|
||||
G_PARAM_STATIC_STRINGS));
|
||||
|
||||
g_object_class_install_property (gobject_class,
|
||||
PROP_ON_DISCONNECT,
|
||||
g_param_spec_enum ("on-disconnect",
|
||||
"On disconnect",
|
||||
"Action to take on disconnect",
|
||||
GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT,
|
||||
DEFAULT_ON_DISCONNECT,
|
||||
G_PARAM_READWRITE |
|
||||
G_PARAM_STATIC_STRINGS));
|
||||
|
||||
gstelement_class->provide_clock = gst_pipewire_src_provide_clock;
|
||||
gstelement_class->change_state = gst_pipewire_src_change_state;
|
||||
gstelement_class->send_event = gst_pipewire_src_send_event;
|
||||
|
|
@ -462,6 +501,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
|
|||
src->autoconnect = DEFAULT_AUTOCONNECT;
|
||||
src->min_latency = 0;
|
||||
src->max_latency = GST_CLOCK_TIME_NONE;
|
||||
src->n_buffers = 0;
|
||||
src->flushing_on_remove_buffer = FALSE;
|
||||
src->on_disconnect = DEFAULT_ON_DISCONNECT;
|
||||
|
||||
src->transform_value = UINT32_MAX;
|
||||
}
|
||||
|
|
@ -469,11 +511,26 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
|
|||
static gboolean
|
||||
buffer_recycle (GstMiniObject *obj)
|
||||
{
|
||||
GstPipeWireSrc *src;
|
||||
GstPipeWirePoolData *data;
|
||||
GstPipeWirePoolData *data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
|
||||
GstPipeWireSrc *src = data->owner;
|
||||
int res;
|
||||
|
||||
data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
|
||||
if (src->flushing_on_remove_buffer) {
|
||||
/*
|
||||
* If a flush-start was initiated, this might be called by elements like
|
||||
* queues downstream purging buffers from their internal queues. This can
|
||||
* deadlock if queues use min-threshold-buffers/bytes/time with src_create
|
||||
* trying to take the loop lock and buffer_recycle trying to take the loop
|
||||
* lock down below. We return from here, to prevent deadlock with streaming
|
||||
* thread in a queue thread.
|
||||
*
|
||||
* We will take care of queueing the buffer in on_remove_buffer.
|
||||
*/
|
||||
GstBuffer *buffer = GST_BUFFER_CAST(obj);
|
||||
GST_DEBUG_OBJECT (src,
|
||||
"flush-start initiated, skipping buffer recycle %p", buffer);
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
GST_OBJECT_LOCK (data->pool);
|
||||
if (!obj->dispose) {
|
||||
|
|
@ -482,7 +539,6 @@ buffer_recycle (GstMiniObject *obj)
|
|||
}
|
||||
|
||||
GST_BUFFER_FLAGS (obj) = data->flags;
|
||||
src = data->owner;
|
||||
|
||||
pw_thread_loop_lock (src->stream->core->loop);
|
||||
if (!obj->dispose) {
|
||||
|
|
@ -519,6 +575,8 @@ on_add_buffer (void *_data, struct pw_buffer *b)
|
|||
data->owner = pwsrc;
|
||||
data->queued = TRUE;
|
||||
GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle;
|
||||
|
||||
pwsrc->n_buffers++;
|
||||
}
|
||||
|
||||
static void
|
||||
|
|
@ -527,17 +585,76 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
|
|||
GstPipeWireSrc *pwsrc = _data;
|
||||
GstPipeWirePoolData *data = b->user_data;
|
||||
GstBuffer *buf = data->buf;
|
||||
gboolean flush_on_remove;
|
||||
int res;
|
||||
|
||||
GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf);
|
||||
GST_DEBUG_OBJECT (pwsrc, "remove buffer %p, queued: %d",
|
||||
buf, data->queued);
|
||||
|
||||
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
|
||||
|
||||
flush_on_remove =
|
||||
pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR ||
|
||||
pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS;
|
||||
|
||||
if (flush_on_remove && !pwsrc->flushing_on_remove_buffer) {
|
||||
pwsrc->flushing_on_remove_buffer = TRUE;
|
||||
|
||||
GST_DEBUG_OBJECT (pwsrc, "flush-start on remove buffer");
|
||||
/*
|
||||
* It is possible that when buffers are being removed, a downstream
|
||||
* element can be holding on to a buffer or in the middle of rendering
|
||||
* the same. Former is possible with queues min-threshold-buffers or
|
||||
* similar. Latter can result in a crash during gst_video_frame_copy.
|
||||
*
|
||||
* We send a flush-start event downstream to make elements discard
|
||||
* any buffers they may be holding on to as well as return from their
|
||||
* chain function ASAP.
|
||||
*/
|
||||
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
|
||||
gst_event_new_flush_start ());
|
||||
}
|
||||
|
||||
if (data->queued) {
|
||||
gst_buffer_unref (buf);
|
||||
} else {
|
||||
if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0)
|
||||
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res));
|
||||
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s",
|
||||
buf, spa_strerror(res));
|
||||
else
|
||||
GST_DEBUG_OBJECT (pwsrc, "queued buffer %p", buf);
|
||||
}
|
||||
|
||||
pwsrc->n_buffers--;
|
||||
|
||||
if (pwsrc->n_buffers == 0) {
|
||||
GST_DEBUG_OBJECT (pwsrc, "removed all buffers");
|
||||
|
||||
pwsrc->flushing_on_remove_buffer = FALSE;
|
||||
|
||||
switch (pwsrc->on_disconnect) {
|
||||
case GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR:
|
||||
GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers");
|
||||
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
|
||||
gst_event_new_flush_stop (FALSE));
|
||||
|
||||
GST_ELEMENT_ERROR (pwsrc, RESOURCE, NOT_FOUND,
|
||||
("all buffers have been removed"),
|
||||
("PipeWire link to remote node was destroyed"));
|
||||
break;
|
||||
case GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS:
|
||||
GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers");
|
||||
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
|
||||
gst_event_new_flush_stop (FALSE));
|
||||
|
||||
GST_DEBUG_OBJECT (pwsrc, "sending eos downstream");
|
||||
gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc),
|
||||
gst_event_new_eos());
|
||||
break;
|
||||
case GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE:
|
||||
GST_DEBUG_OBJECT (pwsrc, "stream closed or removed");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -741,7 +858,7 @@ on_state_changed (void *data,
|
|||
GstPipeWireSrc *pwsrc = data;
|
||||
GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state;
|
||||
|
||||
GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state));
|
||||
GST_DEBUG_OBJECT (pwsrc, "got stream state %s", pw_stream_state_as_string (state));
|
||||
|
||||
switch (state) {
|
||||
case PW_STREAM_STATE_UNCONNECTED:
|
||||
|
|
|
|||
|
|
@ -24,6 +24,22 @@ G_BEGIN_DECLS
|
|||
#define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj))
|
||||
G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc)
|
||||
|
||||
/**
|
||||
* GstPipeWireSrcOnDisconnect:
|
||||
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: send EoS downstream
|
||||
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: raise pipeline error
|
||||
* @GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: no action
|
||||
*
|
||||
* Different actions on disconnect.
|
||||
*/
|
||||
typedef enum
|
||||
{
|
||||
GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE,
|
||||
GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS,
|
||||
GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR,
|
||||
} GstPipeWireSrcOnDisconnect;
|
||||
|
||||
#define GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT (gst_pipewire_src_on_disconnect_get_type ())
|
||||
|
||||
/**
|
||||
* GstPipeWireSrc:
|
||||
|
|
@ -36,6 +52,7 @@ struct _GstPipeWireSrc {
|
|||
GstPipeWireStream *stream;
|
||||
|
||||
/*< private >*/
|
||||
gint n_buffers;
|
||||
gint use_bufferpool;
|
||||
gint min_buffers;
|
||||
gint max_buffers;
|
||||
|
|
@ -56,6 +73,7 @@ struct _GstPipeWireSrc {
|
|||
gboolean flushing;
|
||||
gboolean started;
|
||||
gboolean eos;
|
||||
gboolean flushing_on_remove_buffer;
|
||||
|
||||
gboolean is_live;
|
||||
int64_t delay;
|
||||
|
|
@ -65,8 +83,12 @@ struct _GstPipeWireSrc {
|
|||
GstBuffer *last_buffer;
|
||||
|
||||
enum spa_meta_videotransform_value transform_value;
|
||||
|
||||
GstPipeWireSrcOnDisconnect on_disconnect;
|
||||
};
|
||||
|
||||
GType gst_pipewire_src_on_stream_disconnect_get_type (void);
|
||||
|
||||
G_END_DECLS
|
||||
|
||||
#endif /* __GST_PIPEWIRE_SRC_H__ */
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue