diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 3f6ef340a..1cfb04db5 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -46,7 +46,8 @@ GST_DEBUG_CATEGORY_STATIC (pipewire_src_debug); #define DEFAULT_RESEND_LAST false #define DEFAULT_KEEPALIVE_TIME 0 #define DEFAULT_AUTOCONNECT true -#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define DEFAULT_USE_BUFFERPOOL USE_BUFFERPOOL_AUTO +#define DEFAULT_ON_DISCONNECT GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE enum { @@ -64,8 +65,28 @@ enum PROP_KEEPALIVE_TIME, PROP_AUTOCONNECT, PROP_USE_BUFFERPOOL, + PROP_ON_DISCONNECT, }; +GType +gst_pipewire_src_on_disconnect_get_type (void) +{ + static gsize on_disconnect_type = 0; + static const GEnumValue on_disconnect[] = { + {GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, "GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE", "none"}, + {GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, "GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS", "eos"}, + {GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, "GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR", "error"}, + {0, NULL, NULL}, + }; + + if (g_once_init_enter (&on_disconnect_type)) { + GType tmp = + g_enum_register_static ("GstPipeWireSrcOnDisconnect", on_disconnect); + g_once_init_leave (&on_disconnect_type, tmp); + } + + return (GType) on_disconnect_type; +} static GstStaticPadTemplate gst_pipewire_src_template = GST_STATIC_PAD_TEMPLATE ("src", @@ -170,6 +191,10 @@ gst_pipewire_src_set_property (GObject * object, guint prop_id, pwsrc->use_bufferpool = USE_BUFFERPOOL_NO; break; + case PROP_ON_DISCONNECT: + pwsrc->on_disconnect = g_value_get_enum (value); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -235,6 +260,10 @@ gst_pipewire_src_get_property (GObject * object, guint prop_id, g_value_set_boolean (value, !!pwsrc->use_bufferpool); break; + case PROP_ON_DISCONNECT: + g_value_set_enum (value, pwsrc->on_disconnect); + break; + default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -414,6 +443,16 @@ gst_pipewire_src_class_init (GstPipeWireSrcClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_ON_DISCONNECT, + g_param_spec_enum ("on-disconnect", + "On disconnect", + "Action to take on disconnect", + GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT, + DEFAULT_ON_DISCONNECT, + G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + gstelement_class->provide_clock = gst_pipewire_src_provide_clock; gstelement_class->change_state = gst_pipewire_src_change_state; gstelement_class->send_event = gst_pipewire_src_send_event; @@ -462,6 +501,9 @@ gst_pipewire_src_init (GstPipeWireSrc * src) src->autoconnect = DEFAULT_AUTOCONNECT; src->min_latency = 0; src->max_latency = GST_CLOCK_TIME_NONE; + src->n_buffers = 0; + src->flushing_on_remove_buffer = FALSE; + src->on_disconnect = DEFAULT_ON_DISCONNECT; src->transform_value = UINT32_MAX; } @@ -469,11 +511,26 @@ gst_pipewire_src_init (GstPipeWireSrc * src) static gboolean buffer_recycle (GstMiniObject *obj) { - GstPipeWireSrc *src; - GstPipeWirePoolData *data; + GstPipeWirePoolData *data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + GstPipeWireSrc *src = data->owner; int res; - data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + if (src->flushing_on_remove_buffer) { + /* + * If a flush-start was initiated, this might be called by elements like + * queues downstream purging buffers from their internal queues. This can + * deadlock if queues use min-threshold-buffers/bytes/time with src_create + * trying to take the loop lock and buffer_recycle trying to take the loop + * lock down below. We return from here, to prevent deadlock with streaming + * thread in a queue thread. + * + * We will take care of queueing the buffer in on_remove_buffer. + */ + GstBuffer *buffer = GST_BUFFER_CAST(obj); + GST_DEBUG_OBJECT (src, + "flush-start initiated, skipping buffer recycle %p", buffer); + return TRUE; + } GST_OBJECT_LOCK (data->pool); if (!obj->dispose) { @@ -482,7 +539,6 @@ buffer_recycle (GstMiniObject *obj) } GST_BUFFER_FLAGS (obj) = data->flags; - src = data->owner; pw_thread_loop_lock (src->stream->core->loop); if (!obj->dispose) { @@ -519,6 +575,8 @@ on_add_buffer (void *_data, struct pw_buffer *b) data->owner = pwsrc; data->queued = TRUE; GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; + + pwsrc->n_buffers++; } static void @@ -527,17 +585,76 @@ on_remove_buffer (void *_data, struct pw_buffer *b) GstPipeWireSrc *pwsrc = _data; GstPipeWirePoolData *data = b->user_data; GstBuffer *buf = data->buf; + gboolean flush_on_remove; int res; - GST_DEBUG_OBJECT (pwsrc, "remove buffer %p", buf); + GST_DEBUG_OBJECT (pwsrc, "remove buffer %p, queued: %d", + buf, data->queued); GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + flush_on_remove = + pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR || + pwsrc->on_disconnect == GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS; + + if (flush_on_remove && !pwsrc->flushing_on_remove_buffer) { + pwsrc->flushing_on_remove_buffer = TRUE; + + GST_DEBUG_OBJECT (pwsrc, "flush-start on remove buffer"); + /* + * It is possible that when buffers are being removed, a downstream + * element can be holding on to a buffer or in the middle of rendering + * the same. Former is possible with queues min-threshold-buffers or + * similar. Latter can result in a crash during gst_video_frame_copy. + * + * We send a flush-start event downstream to make elements discard + * any buffers they may be holding on to as well as return from their + * chain function ASAP. + */ + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_start ()); + } + if (data->queued) { gst_buffer_unref (buf); } else { if ((res = pw_stream_queue_buffer (pwsrc->stream->pwstream, b)) < 0) - GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); + GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", + buf, spa_strerror(res)); + else + GST_DEBUG_OBJECT (pwsrc, "queued buffer %p", buf); + } + + pwsrc->n_buffers--; + + if (pwsrc->n_buffers == 0) { + GST_DEBUG_OBJECT (pwsrc, "removed all buffers"); + + pwsrc->flushing_on_remove_buffer = FALSE; + + switch (pwsrc->on_disconnect) { + case GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: + GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_stop (FALSE)); + + GST_ELEMENT_ERROR (pwsrc, RESOURCE, NOT_FOUND, + ("all buffers have been removed"), + ("PipeWire link to remote node was destroyed")); + break; + case GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: + GST_DEBUG_OBJECT (pwsrc, "flush-stop on removing all buffers"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_flush_stop (FALSE)); + + GST_DEBUG_OBJECT (pwsrc, "sending eos downstream"); + gst_pad_push_event (GST_BASE_SRC_PAD (pwsrc), + gst_event_new_eos()); + break; + case GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: + GST_DEBUG_OBJECT (pwsrc, "stream closed or removed"); + break; + } } } @@ -739,7 +856,7 @@ on_state_changed (void *data, GstPipeWireSrc *pwsrc = data; GstState current_state = GST_ELEMENT_CAST (pwsrc)->current_state; - GST_DEBUG ("got stream state %s", pw_stream_state_as_string (state)); + GST_DEBUG_OBJECT (pwsrc, "got stream state %s", pw_stream_state_as_string (state)); switch (state) { case PW_STREAM_STATE_UNCONNECTED: diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index d5728cdc9..704ae682c 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -24,6 +24,22 @@ G_BEGIN_DECLS #define GST_PIPEWIRE_SRC_CAST(obj) ((GstPipeWireSrc *) (obj)) G_DECLARE_FINAL_TYPE (GstPipeWireSrc, gst_pipewire_src, GST, PIPEWIRE_SRC, GstPushSrc) +/** + * GstPipeWireSrcOnDisconnect: + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS: send EoS downstream + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR: raise pipeline error + * @GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE: no action + * + * Different actions on disconnect. + */ +typedef enum +{ + GST_PIPEWIRE_SRC_ON_DISCONNECT_NONE, + GST_PIPEWIRE_SRC_ON_DISCONNECT_EOS, + GST_PIPEWIRE_SRC_ON_DISCONNECT_ERROR, +} GstPipeWireSrcOnDisconnect; + +#define GST_TYPE_PIPEWIRE_SRC_ON_DISCONNECT (gst_pipewire_src_on_disconnect_get_type ()) /** * GstPipeWireSrc: @@ -36,6 +52,7 @@ struct _GstPipeWireSrc { GstPipeWireStream *stream; /*< private >*/ + gint n_buffers; gint use_bufferpool; gint min_buffers; gint max_buffers; @@ -56,6 +73,7 @@ struct _GstPipeWireSrc { gboolean flushing; gboolean started; gboolean eos; + gboolean flushing_on_remove_buffer; gboolean is_live; int64_t delay; @@ -65,8 +83,12 @@ struct _GstPipeWireSrc { GstBuffer *last_buffer; enum spa_meta_videotransform_value transform_value; + + GstPipeWireSrcOnDisconnect on_disconnect; }; +GType gst_pipewire_src_on_stream_disconnect_get_type (void); + G_END_DECLS #endif /* __GST_PIPEWIRE_SRC_H__ */