From 1e451458a7e239ad94392c126491cd270e83620f Mon Sep 17 00:00:00 2001 From: James Hilliard Date: Thu, 30 Jun 2022 04:45:59 -0600 Subject: [PATCH] gst: try to purge buffers when renegotiating Track dequeued buffer counts and ensure they are flushed before reconnecting stream. When purging use empty buffers to try and push buffers we need to reclaim out of the pipeline. Signed-off-by: James Hilliard --- src/gst/gstpipewirepool.h | 3 + src/gst/gstpipewiresrc.c | 383 +++++++++++++++++++++----------------- 2 files changed, 217 insertions(+), 169 deletions(-) diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index b7b7ca8a7..68f9f26fe 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -68,6 +68,9 @@ struct _GstPipeWirePool { GstAllocator *fd_allocator; GstAllocator *dmabuf_allocator; + gboolean purge; + guint num_dequeued; + GCond cond; }; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 54b0c957c..627562e53 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -467,9 +467,10 @@ buffer_recycle (GstMiniObject *obj) gst_mini_object_ref (obj); data->queued = TRUE; + g_atomic_int_add (&src->pool->num_dequeued, -1); if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) - GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); + GST_ERROR_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); else GST_LOG_OBJECT (src, "recycle buffer %p", obj); @@ -509,8 +510,9 @@ on_remove_buffer (void *_data, struct pw_buffer *b) if (data->queued) { gst_buffer_unref (buf); } else { + GST_ERROR_OBJECT (pwsrc, "removed buffer was not recycled, buffer pool out of sync with stream queue"); if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0) - GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); + GST_ERROR_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); } } @@ -523,9 +525,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) struct spa_meta_region *crop; guint i; - b = pw_stream_dequeue_buffer (pwsrc->stream); - if (b == NULL) - return NULL; + if (pwsrc->pool->purge || (b = pw_stream_dequeue_buffer (pwsrc->stream)) == NULL) + return gst_buffer_new (); data = b->user_data; @@ -544,6 +545,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) buf = gst_buffer_new (); data->queued = FALSE; + g_atomic_int_add (&pwsrc->pool->num_dequeued, 1); GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; @@ -686,6 +688,34 @@ start_error: } } +static gboolean +wait_purged (GstPipeWireSrc *this) +{ + guint num_dequeued; + struct timespec abstime; + + GST_DEBUG_OBJECT (this, "waiting for buffer pool to finish"); + + pw_thread_loop_get_time (this->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while ((num_dequeued = g_atomic_int_get (&this->pool->num_dequeued))) { + GST_DEBUG_OBJECT (this, "waiting for %i buffers to recycle", num_dequeued); + + if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) { + GST_ERROR_OBJECT (this, "buffer pool failed to purge"); + if (num_dequeued = g_atomic_int_get (&this->pool->num_dequeued)) { + pw_stream_set_error(this->stream, -EINVAL, "failed to purge buffers"); + return FALSE; + } + } + } + + GST_DEBUG_OBJECT (this, "buffer pool purged"); + + return TRUE; +} + static enum pw_stream_state wait_started (GstPipeWireSrc *this) { @@ -727,170 +757,6 @@ wait_started (GstPipeWireSrc *this) return state; } -static gboolean -gst_pipewire_src_negotiate (GstBaseSrc * basesrc) -{ - GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); - GstCaps *thiscaps; - GstCaps *caps = NULL; - GstCaps *peercaps = NULL; - gboolean result = FALSE; - GPtrArray *possible; - const char *error = NULL; - struct timespec abstime; - uint32_t target_id; - - /* first see what is possible on our source pad */ - thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL); - GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps); - /* nothing or anything is allowed, we're done */ - if (thiscaps == NULL) - goto no_nego_needed; - - if (G_UNLIKELY (gst_caps_is_empty (thiscaps))) - goto no_caps; - - /* get the peer caps */ - peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps); - GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); - if (peercaps) { - /* The result is already a subset of our caps */ - caps = peercaps; - gst_caps_unref (thiscaps); - } else { - /* no peer, work with our own caps then */ - caps = thiscaps; - } - if (caps == NULL || gst_caps_is_empty (caps)) - goto no_common_caps; - - GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps); - - /* open a connection with these caps */ - possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); - gst_caps_unref (caps); - - /* first disconnect */ - pw_thread_loop_lock (pwsrc->core->loop); - if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { - GST_DEBUG_OBJECT (basesrc, "disconnect capture"); - pw_stream_disconnect (pwsrc->stream); - while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); - - GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state)); - if (state == PW_STREAM_STATE_UNCONNECTED) - break; - - if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) { - g_ptr_array_unref (possible); - goto connect_error; - } - - pw_thread_loop_wait (pwsrc->core->loop); - } - } - - target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY; - - if (pwsrc->target_object) { - struct spa_dict_item items[2] = { - SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object), - SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), - }; - struct spa_dict dict = SPA_DICT_INIT_ARRAY(items); - uint64_t serial; - - /* If target.object is a name, set it also to node.target */ - if (spa_atou64(pwsrc->target_object, &serial, 0)) { - dict.n_items = 1; - } else { - target_id = PW_ID_ANY; - items[1].value = pwsrc->target_object; - } - - pw_stream_update_properties (pwsrc->stream, &dict); - } - - GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", - pwsrc->path, pwsrc->target_object); - pwsrc->negotiated = FALSE; - pw_stream_connect (pwsrc->stream, - PW_DIRECTION_INPUT, - target_id, - PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT, - (const struct spa_pod **)possible->pdata, - possible->len); - g_ptr_array_free (possible, TRUE); - - pw_thread_loop_get_time (pwsrc->core->loop, &abstime, - GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); - - while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); - - GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); - if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) - goto connect_error; - - if (pwsrc->negotiated) - break; - - if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) - goto connect_error; - } - caps = pwsrc->caps; - pwsrc->caps = NULL; - pw_thread_loop_unlock (pwsrc->core->loop); - - if (caps == NULL) - goto no_caps; - - gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); - - GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps); - result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); - gst_caps_unref (caps); - - result = gst_pipewire_src_stream_start (pwsrc); - - pwsrc->started = result; - - return result; - -no_nego_needed: - { - GST_DEBUG_OBJECT (basesrc, "no negotiation needed"); - if (thiscaps) - gst_caps_unref (thiscaps); - return TRUE; - } -no_caps: - { - GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, - ("No supported formats found"), - ("This element did not produce valid caps")); - if (thiscaps) - gst_caps_unref (thiscaps); - return FALSE; - } -no_common_caps: - { - GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, - ("No supported formats found"), - ("This element does not have formats in common with the peer")); - if (caps) - gst_caps_unref (caps); - return FALSE; - } -connect_error: - { - GST_DEBUG_OBJECT (basesrc, "connect error"); - pw_thread_loop_unlock (pwsrc->core->loop); - return FALSE; - } -} - static void on_param_changed (void *data, uint32_t id, const struct spa_pod *param) @@ -1293,6 +1159,185 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) } } +static gboolean +gst_pipewire_src_negotiate (GstBaseSrc * basesrc) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); + GstCaps *thiscaps; + GstCaps *caps = NULL; + GstCaps *peercaps = NULL; + gboolean result = FALSE; + GPtrArray *possible; + const char *error = NULL; + struct timespec abstime; + uint32_t target_id; + + /* first see what is possible on our source pad */ + thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL); + GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps); + /* nothing or anything is allowed, we're done */ + if (thiscaps == NULL) + goto no_nego_needed; + + if (G_UNLIKELY (gst_caps_is_empty (thiscaps))) + goto no_caps; + + /* get the peer caps */ + peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps); + GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); + if (peercaps) { + /* The result is already a subset of our caps */ + caps = peercaps; + gst_caps_unref (thiscaps); + } else { + /* no peer, work with our own caps then */ + caps = thiscaps; + } + if (caps == NULL || gst_caps_is_empty (caps)) + goto no_common_caps; + + GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps); + + /* open a connection with these caps */ + possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); + gst_caps_unref (caps); + + /* first disconnect */ + pw_thread_loop_lock (pwsrc->core->loop); + if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { + pwsrc->pool->purge = TRUE; + if (wait_purged (pwsrc)) { + GST_DEBUG_OBJECT (basesrc, "disconnect capture"); + pw_stream_disconnect (pwsrc->stream); + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_UNCONNECTED) + break; + + if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) { + g_ptr_array_unref (possible); + pwsrc->pool->purge = FALSE; + goto connect_error; + } + + pw_thread_loop_wait (pwsrc->core->loop); + } + } else { + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); + gst_pipewire_src_close (pwsrc); + if (!gst_pipewire_src_open (pwsrc)) { + g_ptr_array_unref (possible); + pwsrc->pool->purge = FALSE; + goto connect_error; + } + } + pwsrc->pool->purge = FALSE; + } + + target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY; + + if (pwsrc->target_object) { + struct spa_dict_item items[2] = { + SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object), + SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), + }; + struct spa_dict dict = SPA_DICT_INIT_ARRAY(items); + uint64_t serial; + + /* If target.object is a name, set it also to node.target */ + if (spa_atou64(pwsrc->target_object, &serial, 0)) { + dict.n_items = 1; + } else { + target_id = PW_ID_ANY; + items[1].value = pwsrc->target_object; + } + + pw_stream_update_properties (pwsrc->stream, &dict); + } + + GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", + pwsrc->path, pwsrc->target_object); + pwsrc->negotiated = FALSE; + pw_stream_connect (pwsrc->stream, + PW_DIRECTION_INPUT, + target_id, + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT, + (const struct spa_pod **)possible->pdata, + possible->len); + g_ptr_array_free (possible, TRUE); + + pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) + goto connect_error; + + if (pwsrc->negotiated) + break; + + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) + goto connect_error; + } + caps = pwsrc->caps; + pwsrc->caps = NULL; + pw_thread_loop_unlock (pwsrc->core->loop); + + if (caps == NULL) + goto no_caps; + + gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); + + GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps); + result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); + gst_caps_unref (caps); + + result = gst_pipewire_src_stream_start (pwsrc); + + pwsrc->started = result; + + return result; + +no_nego_needed: + { + GST_DEBUG_OBJECT (basesrc, "no negotiation needed"); + if (thiscaps) + gst_caps_unref (thiscaps); + return TRUE; + } +no_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element did not produce valid caps")); + if (thiscaps) + gst_caps_unref (thiscaps); + return FALSE; + } +no_common_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element does not have formats in common with the peer")); + if (caps) + gst_caps_unref (caps); + return FALSE; + } +connect_error: + { + GST_DEBUG_OBJECT (basesrc, "connect error"); + if (pwsrc->core) + pw_thread_loop_unlock (pwsrc->core->loop); + return FALSE; + } +} + static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) {