diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index acf81064f..576af40c4 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -69,6 +69,9 @@ struct _GstPipeWirePool { GstAllocator *fd_allocator; GstAllocator *dmabuf_allocator; + gboolean purge; + guint num_dequeued; + GCond cond; }; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 4e37846fc..9dfaf1acb 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -468,9 +468,10 @@ buffer_recycle (GstMiniObject *obj) gst_mini_object_ref (obj); data->queued = TRUE; + g_atomic_int_add (&src->pool->num_dequeued, -1); if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) - GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); + GST_ERROR_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); else GST_LOG_OBJECT (src, "recycle buffer %p", obj); @@ -510,8 +511,9 @@ on_remove_buffer (void *_data, struct pw_buffer *b) if (data->queued) { gst_buffer_unref (buf); } else { + GST_ERROR_OBJECT (pwsrc, "removed buffer was not recycled, buffer pool out of sync with stream queue"); if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0) - GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); + GST_ERROR_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); } } @@ -544,9 +546,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) struct spa_meta_videotransform *videotransform; guint i; - b = pw_stream_dequeue_buffer (pwsrc->stream); - if (b == NULL) - return NULL; + if (pwsrc->pool->purge || (b = pw_stream_dequeue_buffer (pwsrc->stream)) == NULL) + return gst_buffer_new (); data = b->user_data; @@ -565,6 +566,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc) buf = gst_buffer_new (); data->queued = FALSE; + g_atomic_int_add (&pwsrc->pool->num_dequeued, 1); GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; @@ -732,6 +734,34 @@ start_error: } } +static gboolean +wait_purged (GstPipeWireSrc *this) +{ + guint num_dequeued; + struct timespec abstime; + + GST_DEBUG_OBJECT (this, "waiting for buffer pool to finish"); + + pw_thread_loop_get_time (this->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while ((num_dequeued = g_atomic_int_get (&this->pool->num_dequeued))) { + GST_DEBUG_OBJECT (this, "waiting for %i buffers to recycle", num_dequeued); + + if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) { + GST_ERROR_OBJECT (this, "buffer pool failed to purge"); + if (num_dequeued = g_atomic_int_get (&this->pool->num_dequeued)) { + pw_stream_set_error(this->stream, -EINVAL, "failed to purge buffers"); + return FALSE; + } + } + } + + GST_DEBUG_OBJECT (this, "buffer pool purged"); + + return TRUE; +} + static enum pw_stream_state wait_started (GstPipeWireSrc *this) { @@ -773,170 +803,6 @@ wait_started (GstPipeWireSrc *this) return state; } -static gboolean -gst_pipewire_src_negotiate (GstBaseSrc * basesrc) -{ - GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); - GstCaps *thiscaps; - GstCaps *caps = NULL; - GstCaps *peercaps = NULL; - gboolean result = FALSE; - GPtrArray *possible; - const char *error = NULL; - struct timespec abstime; - uint32_t target_id; - - /* first see what is possible on our source pad */ - thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL); - GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps); - /* nothing or anything is allowed, we're done */ - if (thiscaps == NULL) - goto no_nego_needed; - - if (G_UNLIKELY (gst_caps_is_empty (thiscaps))) - goto no_caps; - - /* get the peer caps */ - peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps); - GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); - if (peercaps) { - /* The result is already a subset of our caps */ - caps = peercaps; - gst_caps_unref (thiscaps); - } else { - /* no peer, work with our own caps then */ - caps = thiscaps; - } - if (caps == NULL || gst_caps_is_empty (caps)) - goto no_common_caps; - - GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps); - - /* open a connection with these caps */ - possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); - gst_caps_unref (caps); - - /* first disconnect */ - pw_thread_loop_lock (pwsrc->core->loop); - if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { - GST_DEBUG_OBJECT (basesrc, "disconnect capture"); - pw_stream_disconnect (pwsrc->stream); - while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); - - GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state)); - if (state == PW_STREAM_STATE_UNCONNECTED) - break; - - if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) { - g_ptr_array_unref (possible); - goto connect_error; - } - - pw_thread_loop_wait (pwsrc->core->loop); - } - } - - target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY; - - if (pwsrc->target_object) { - struct spa_dict_item items[2] = { - SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object), - SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), - }; - struct spa_dict dict = SPA_DICT_INIT_ARRAY(items); - uint64_t serial; - - /* If target.object is a name, set it also to node.target */ - if (spa_atou64(pwsrc->target_object, &serial, 0)) { - dict.n_items = 1; - } else { - target_id = PW_ID_ANY; - items[1].value = pwsrc->target_object; - } - - pw_stream_update_properties (pwsrc->stream, &dict); - } - - GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", - pwsrc->path, pwsrc->target_object); - pwsrc->negotiated = FALSE; - pw_stream_connect (pwsrc->stream, - PW_DIRECTION_INPUT, - target_id, - PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT, - (const struct spa_pod **)possible->pdata, - possible->len); - g_ptr_array_free (possible, TRUE); - - pw_thread_loop_get_time (pwsrc->core->loop, &abstime, - GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); - - while (TRUE) { - enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); - - GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); - if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) - goto connect_error; - - if (pwsrc->negotiated) - break; - - if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) - goto connect_error; - } - caps = pwsrc->caps; - pwsrc->caps = NULL; - pw_thread_loop_unlock (pwsrc->core->loop); - - if (caps == NULL) - goto no_caps; - - gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); - - GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps); - result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); - gst_caps_unref (caps); - - result = gst_pipewire_src_stream_start (pwsrc); - - pwsrc->started = result; - - return result; - -no_nego_needed: - { - GST_DEBUG_OBJECT (basesrc, "no negotiation needed"); - if (thiscaps) - gst_caps_unref (thiscaps); - return TRUE; - } -no_caps: - { - GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, - ("No supported formats found"), - ("This element did not produce valid caps")); - if (thiscaps) - gst_caps_unref (thiscaps); - return FALSE; - } -no_common_caps: - { - GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, - ("No supported formats found"), - ("This element does not have formats in common with the peer")); - if (caps) - gst_caps_unref (caps); - return FALSE; - } -connect_error: - { - GST_DEBUG_OBJECT (basesrc, "connect error"); - pw_thread_loop_unlock (pwsrc->core->loop); - return FALSE; - } -} - static void on_param_changed (void *data, uint32_t id, const struct spa_pod *param) @@ -1348,6 +1214,185 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc) } } +static gboolean +gst_pipewire_src_negotiate (GstBaseSrc * basesrc) +{ + GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc); + GstCaps *thiscaps; + GstCaps *caps = NULL; + GstCaps *peercaps = NULL; + gboolean result = FALSE; + GPtrArray *possible; + const char *error = NULL; + struct timespec abstime; + uint32_t target_id; + + /* first see what is possible on our source pad */ + thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL); + GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps); + /* nothing or anything is allowed, we're done */ + if (thiscaps == NULL) + goto no_nego_needed; + + if (G_UNLIKELY (gst_caps_is_empty (thiscaps))) + goto no_caps; + + /* get the peer caps */ + peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps); + GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); + if (peercaps) { + /* The result is already a subset of our caps */ + caps = peercaps; + gst_caps_unref (thiscaps); + } else { + /* no peer, work with our own caps then */ + caps = thiscaps; + } + if (caps == NULL || gst_caps_is_empty (caps)) + goto no_common_caps; + + GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps); + + /* open a connection with these caps */ + possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat); + gst_caps_unref (caps); + + /* first disconnect */ + pw_thread_loop_lock (pwsrc->core->loop); + if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) { + pwsrc->pool->purge = TRUE; + if (wait_purged (pwsrc)) { + GST_DEBUG_OBJECT (basesrc, "disconnect capture"); + pw_stream_disconnect (pwsrc->stream); + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_UNCONNECTED) + break; + + if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) { + g_ptr_array_unref (possible); + pwsrc->pool->purge = FALSE; + goto connect_error; + } + + pw_thread_loop_wait (pwsrc->core->loop); + } + } else { + pw_thread_loop_signal (pwsrc->core->loop, FALSE); + pw_thread_loop_unlock (pwsrc->core->loop); + gst_pipewire_src_close (pwsrc); + if (!gst_pipewire_src_open (pwsrc)) { + g_ptr_array_unref (possible); + pwsrc->pool->purge = FALSE; + goto connect_error; + } + } + pwsrc->pool->purge = FALSE; + } + + target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY; + + if (pwsrc->target_object) { + struct spa_dict_item items[2] = { + SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object), + SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL), + }; + struct spa_dict dict = SPA_DICT_INIT_ARRAY(items); + uint64_t serial; + + /* If target.object is a name, set it also to node.target */ + if (spa_atou64(pwsrc->target_object, &serial, 0)) { + dict.n_items = 1; + } else { + target_id = PW_ID_ANY; + items[1].value = pwsrc->target_object; + } + + pw_stream_update_properties (pwsrc->stream, &dict); + } + + GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s", + pwsrc->path, pwsrc->target_object); + pwsrc->negotiated = FALSE; + pw_stream_connect (pwsrc->stream, + PW_DIRECTION_INPUT, + target_id, + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT, + (const struct spa_pod **)possible->pdata, + possible->len); + g_ptr_array_free (possible, TRUE); + + pw_thread_loop_get_time (pwsrc->core->loop, &abstime, + GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC); + + while (TRUE) { + enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error); + + GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state)); + if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) + goto connect_error; + + if (pwsrc->negotiated) + break; + + if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0) + goto connect_error; + } + caps = pwsrc->caps; + pwsrc->caps = NULL; + pw_thread_loop_unlock (pwsrc->core->loop); + + if (caps == NULL) + goto no_caps; + + gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0); + + GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps); + result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps); + gst_caps_unref (caps); + + result = gst_pipewire_src_stream_start (pwsrc); + + pwsrc->started = result; + + return result; + +no_nego_needed: + { + GST_DEBUG_OBJECT (basesrc, "no negotiation needed"); + if (thiscaps) + gst_caps_unref (thiscaps); + return TRUE; + } +no_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element did not produce valid caps")); + if (thiscaps) + gst_caps_unref (thiscaps); + return FALSE; + } +no_common_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element does not have formats in common with the peer")); + if (caps) + gst_caps_unref (caps); + return FALSE; + } +connect_error: + { + GST_DEBUG_OBJECT (basesrc, "connect error"); + if (pwsrc->core) + pw_thread_loop_unlock (pwsrc->core->loop); + return FALSE; + } +} + static gboolean gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) {