gst: try to purge buffers when renegotiating

Track dequeued buffer counts and ensure they are flushed before
reconnecting stream.

When purging use empty buffers to try and push buffers we need
to reclaim out of the pipeline.

Signed-off-by: James Hilliard <james.hilliard1@gmail.com>
This commit is contained in:
James Hilliard 2022-06-30 04:45:59 -06:00 committed by Wim Taymans
parent e395f62425
commit 1e451458a7
2 changed files with 217 additions and 169 deletions

View file

@ -68,6 +68,9 @@ struct _GstPipeWirePool {
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
gboolean purge;
guint num_dequeued;
GCond cond;
};

View file

@ -467,9 +467,10 @@ buffer_recycle (GstMiniObject *obj)
gst_mini_object_ref (obj);
data->queued = TRUE;
g_atomic_int_add (&src->pool->num_dequeued, -1);
if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0)
GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res));
GST_ERROR_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res));
else
GST_LOG_OBJECT (src, "recycle buffer %p", obj);
@ -509,8 +510,9 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
if (data->queued) {
gst_buffer_unref (buf);
} else {
GST_ERROR_OBJECT (pwsrc, "removed buffer was not recycled, buffer pool out of sync with stream queue");
if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0)
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res));
GST_ERROR_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res));
}
}
@ -523,9 +525,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
struct spa_meta_region *crop;
guint i;
b = pw_stream_dequeue_buffer (pwsrc->stream);
if (b == NULL)
return NULL;
if (pwsrc->pool->purge || (b = pw_stream_dequeue_buffer (pwsrc->stream)) == NULL)
return gst_buffer_new ();
data = b->user_data;
@ -544,6 +545,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
buf = gst_buffer_new ();
data->queued = FALSE;
g_atomic_int_add (&pwsrc->pool->num_dequeued, 1);
GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
@ -686,6 +688,34 @@ start_error:
}
}
static gboolean
wait_purged (GstPipeWireSrc *this)
{
guint num_dequeued;
struct timespec abstime;
GST_DEBUG_OBJECT (this, "waiting for buffer pool to finish");
pw_thread_loop_get_time (this->core->loop, &abstime,
GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC);
while ((num_dequeued = g_atomic_int_get (&this->pool->num_dequeued))) {
GST_DEBUG_OBJECT (this, "waiting for %i buffers to recycle", num_dequeued);
if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) {
GST_ERROR_OBJECT (this, "buffer pool failed to purge");
if (num_dequeued = g_atomic_int_get (&this->pool->num_dequeued)) {
pw_stream_set_error(this->stream, -EINVAL, "failed to purge buffers");
return FALSE;
}
}
}
GST_DEBUG_OBJECT (this, "buffer pool purged");
return TRUE;
}
static enum pw_stream_state
wait_started (GstPipeWireSrc *this)
{
@ -727,170 +757,6 @@ wait_started (GstPipeWireSrc *this)
return state;
}
static gboolean
gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
GstCaps *thiscaps;
GstCaps *caps = NULL;
GstCaps *peercaps = NULL;
gboolean result = FALSE;
GPtrArray *possible;
const char *error = NULL;
struct timespec abstime;
uint32_t target_id;
/* first see what is possible on our source pad */
thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
/* nothing or anything is allowed, we're done */
if (thiscaps == NULL)
goto no_nego_needed;
if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
goto no_caps;
/* get the peer caps */
peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
if (peercaps) {
/* The result is already a subset of our caps */
caps = peercaps;
gst_caps_unref (thiscaps);
} else {
/* no peer, work with our own caps then */
caps = thiscaps;
}
if (caps == NULL || gst_caps_is_empty (caps))
goto no_common_caps;
GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps);
/* open a connection with these caps */
possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat);
gst_caps_unref (caps);
/* first disconnect */
pw_thread_loop_lock (pwsrc->core->loop);
if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_UNCONNECTED)
break;
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) {
g_ptr_array_unref (possible);
goto connect_error;
}
pw_thread_loop_wait (pwsrc->core->loop);
}
}
target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY;
if (pwsrc->target_object) {
struct spa_dict_item items[2] = {
SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object),
SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL),
};
struct spa_dict dict = SPA_DICT_INIT_ARRAY(items);
uint64_t serial;
/* If target.object is a name, set it also to node.target */
if (spa_atou64(pwsrc->target_object, &serial, 0)) {
dict.n_items = 1;
} else {
target_id = PW_ID_ANY;
items[1].value = pwsrc->target_object;
}
pw_stream_update_properties (pwsrc->stream, &dict);
}
GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s",
pwsrc->path, pwsrc->target_object);
pwsrc->negotiated = FALSE;
pw_stream_connect (pwsrc->stream,
PW_DIRECTION_INPUT,
target_id,
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT,
(const struct spa_pod **)possible->pdata,
possible->len);
g_ptr_array_free (possible, TRUE);
pw_thread_loop_get_time (pwsrc->core->loop, &abstime,
GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing)
goto connect_error;
if (pwsrc->negotiated)
break;
if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0)
goto connect_error;
}
caps = pwsrc->caps;
pwsrc->caps = NULL;
pw_thread_loop_unlock (pwsrc->core->loop);
if (caps == NULL)
goto no_caps;
gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0);
GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps);
result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps);
gst_caps_unref (caps);
result = gst_pipewire_src_stream_start (pwsrc);
pwsrc->started = result;
return result;
no_nego_needed:
{
GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
if (thiscaps)
gst_caps_unref (thiscaps);
return TRUE;
}
no_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element did not produce valid caps"));
if (thiscaps)
gst_caps_unref (thiscaps);
return FALSE;
}
no_common_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element does not have formats in common with the peer"));
if (caps)
gst_caps_unref (caps);
return FALSE;
}
connect_error:
{
GST_DEBUG_OBJECT (basesrc, "connect error");
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
static void
on_param_changed (void *data, uint32_t id,
const struct spa_pod *param)
@ -1293,6 +1159,185 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
}
}
static gboolean
gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
GstCaps *thiscaps;
GstCaps *caps = NULL;
GstCaps *peercaps = NULL;
gboolean result = FALSE;
GPtrArray *possible;
const char *error = NULL;
struct timespec abstime;
uint32_t target_id;
/* first see what is possible on our source pad */
thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
/* nothing or anything is allowed, we're done */
if (thiscaps == NULL)
goto no_nego_needed;
if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
goto no_caps;
/* get the peer caps */
peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
if (peercaps) {
/* The result is already a subset of our caps */
caps = peercaps;
gst_caps_unref (thiscaps);
} else {
/* no peer, work with our own caps then */
caps = thiscaps;
}
if (caps == NULL || gst_caps_is_empty (caps))
goto no_common_caps;
GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps);
/* open a connection with these caps */
possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat);
gst_caps_unref (caps);
/* first disconnect */
pw_thread_loop_lock (pwsrc->core->loop);
if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) {
pwsrc->pool->purge = TRUE;
if (wait_purged (pwsrc)) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_UNCONNECTED)
break;
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) {
g_ptr_array_unref (possible);
pwsrc->pool->purge = FALSE;
goto connect_error;
}
pw_thread_loop_wait (pwsrc->core->loop);
}
} else {
pw_thread_loop_signal (pwsrc->core->loop, FALSE);
pw_thread_loop_unlock (pwsrc->core->loop);
gst_pipewire_src_close (pwsrc);
if (!gst_pipewire_src_open (pwsrc)) {
g_ptr_array_unref (possible);
pwsrc->pool->purge = FALSE;
goto connect_error;
}
}
pwsrc->pool->purge = FALSE;
}
target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY;
if (pwsrc->target_object) {
struct spa_dict_item items[2] = {
SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object),
SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL),
};
struct spa_dict dict = SPA_DICT_INIT_ARRAY(items);
uint64_t serial;
/* If target.object is a name, set it also to node.target */
if (spa_atou64(pwsrc->target_object, &serial, 0)) {
dict.n_items = 1;
} else {
target_id = PW_ID_ANY;
items[1].value = pwsrc->target_object;
}
pw_stream_update_properties (pwsrc->stream, &dict);
}
GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s",
pwsrc->path, pwsrc->target_object);
pwsrc->negotiated = FALSE;
pw_stream_connect (pwsrc->stream,
PW_DIRECTION_INPUT,
target_id,
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT,
(const struct spa_pod **)possible->pdata,
possible->len);
g_ptr_array_free (possible, TRUE);
pw_thread_loop_get_time (pwsrc->core->loop, &abstime,
GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing)
goto connect_error;
if (pwsrc->negotiated)
break;
if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0)
goto connect_error;
}
caps = pwsrc->caps;
pwsrc->caps = NULL;
pw_thread_loop_unlock (pwsrc->core->loop);
if (caps == NULL)
goto no_caps;
gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0);
GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps);
result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps);
gst_caps_unref (caps);
result = gst_pipewire_src_stream_start (pwsrc);
pwsrc->started = result;
return result;
no_nego_needed:
{
GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
if (thiscaps)
gst_caps_unref (thiscaps);
return TRUE;
}
no_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element did not produce valid caps"));
if (thiscaps)
gst_caps_unref (thiscaps);
return FALSE;
}
no_common_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element does not have formats in common with the peer"));
if (caps)
gst_caps_unref (caps);
return FALSE;
}
connect_error:
{
GST_DEBUG_OBJECT (basesrc, "connect error");
if (pwsrc->core)
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
static gboolean
gst_pipewire_src_send_event (GstElement * elem, GstEvent * event)
{