Merge branch 'purge-buffers' into 'master'

gst: try to purge buffers when renegotiating

See merge request pipewire/pipewire!1299
This commit is contained in:
James Hilliard 2023-01-09 12:37:28 +00:00
commit 19155ca0e0
2 changed files with 217 additions and 169 deletions

View file

@ -69,6 +69,9 @@ struct _GstPipeWirePool {
GstAllocator *fd_allocator; GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator; GstAllocator *dmabuf_allocator;
gboolean purge;
guint num_dequeued;
GCond cond; GCond cond;
}; };

View file

@ -468,9 +468,10 @@ buffer_recycle (GstMiniObject *obj)
gst_mini_object_ref (obj); gst_mini_object_ref (obj);
data->queued = TRUE; data->queued = TRUE;
g_atomic_int_add (&src->pool->num_dequeued, -1);
if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0) if ((res = pw_stream_queue_buffer (src->stream, data->b)) < 0)
GST_WARNING_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res)); GST_ERROR_OBJECT (src, "can't queue recycled buffer %p, %s", obj, spa_strerror(res));
else else
GST_LOG_OBJECT (src, "recycle buffer %p", obj); GST_LOG_OBJECT (src, "recycle buffer %p", obj);
@ -510,8 +511,9 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
if (data->queued) { if (data->queued) {
gst_buffer_unref (buf); gst_buffer_unref (buf);
} else { } else {
GST_ERROR_OBJECT (pwsrc, "removed buffer was not recycled, buffer pool out of sync with stream queue");
if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0) if ((res = pw_stream_queue_buffer (pwsrc->stream, b)) < 0)
GST_WARNING_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res)); GST_ERROR_OBJECT (pwsrc, "can't queue removed buffer %p, %s", buf, spa_strerror(res));
} }
} }
@ -544,9 +546,8 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
struct spa_meta_videotransform *videotransform; struct spa_meta_videotransform *videotransform;
guint i; guint i;
b = pw_stream_dequeue_buffer (pwsrc->stream); if (pwsrc->pool->purge || (b = pw_stream_dequeue_buffer (pwsrc->stream)) == NULL)
if (b == NULL) return gst_buffer_new ();
return NULL;
data = b->user_data; data = b->user_data;
@ -565,6 +566,7 @@ static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
buf = gst_buffer_new (); buf = gst_buffer_new ();
data->queued = FALSE; data->queued = FALSE;
g_atomic_int_add (&pwsrc->pool->num_dequeued, 1);
GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_PTS (buf) = GST_CLOCK_TIME_NONE;
GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE; GST_BUFFER_DTS (buf) = GST_CLOCK_TIME_NONE;
@ -732,6 +734,34 @@ start_error:
} }
} }
static gboolean
wait_purged (GstPipeWireSrc *this)
{
guint num_dequeued;
struct timespec abstime;
GST_DEBUG_OBJECT (this, "waiting for buffer pool to finish");
pw_thread_loop_get_time (this->core->loop, &abstime,
GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC);
while ((num_dequeued = g_atomic_int_get (&this->pool->num_dequeued))) {
GST_DEBUG_OBJECT (this, "waiting for %i buffers to recycle", num_dequeued);
if (pw_thread_loop_timed_wait_full (this->core->loop, &abstime) < 0) {
GST_ERROR_OBJECT (this, "buffer pool failed to purge");
if (num_dequeued = g_atomic_int_get (&this->pool->num_dequeued)) {
pw_stream_set_error(this->stream, -EINVAL, "failed to purge buffers");
return FALSE;
}
}
}
GST_DEBUG_OBJECT (this, "buffer pool purged");
return TRUE;
}
static enum pw_stream_state static enum pw_stream_state
wait_started (GstPipeWireSrc *this) wait_started (GstPipeWireSrc *this)
{ {
@ -773,170 +803,6 @@ wait_started (GstPipeWireSrc *this)
return state; return state;
} }
static gboolean
gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
GstCaps *thiscaps;
GstCaps *caps = NULL;
GstCaps *peercaps = NULL;
gboolean result = FALSE;
GPtrArray *possible;
const char *error = NULL;
struct timespec abstime;
uint32_t target_id;
/* first see what is possible on our source pad */
thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
/* nothing or anything is allowed, we're done */
if (thiscaps == NULL)
goto no_nego_needed;
if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
goto no_caps;
/* get the peer caps */
peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
if (peercaps) {
/* The result is already a subset of our caps */
caps = peercaps;
gst_caps_unref (thiscaps);
} else {
/* no peer, work with our own caps then */
caps = thiscaps;
}
if (caps == NULL || gst_caps_is_empty (caps))
goto no_common_caps;
GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps);
/* open a connection with these caps */
possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat);
gst_caps_unref (caps);
/* first disconnect */
pw_thread_loop_lock (pwsrc->core->loop);
if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_UNCONNECTED)
break;
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) {
g_ptr_array_unref (possible);
goto connect_error;
}
pw_thread_loop_wait (pwsrc->core->loop);
}
}
target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY;
if (pwsrc->target_object) {
struct spa_dict_item items[2] = {
SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object),
SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL),
};
struct spa_dict dict = SPA_DICT_INIT_ARRAY(items);
uint64_t serial;
/* If target.object is a name, set it also to node.target */
if (spa_atou64(pwsrc->target_object, &serial, 0)) {
dict.n_items = 1;
} else {
target_id = PW_ID_ANY;
items[1].value = pwsrc->target_object;
}
pw_stream_update_properties (pwsrc->stream, &dict);
}
GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s",
pwsrc->path, pwsrc->target_object);
pwsrc->negotiated = FALSE;
pw_stream_connect (pwsrc->stream,
PW_DIRECTION_INPUT,
target_id,
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT,
(const struct spa_pod **)possible->pdata,
possible->len);
g_ptr_array_free (possible, TRUE);
pw_thread_loop_get_time (pwsrc->core->loop, &abstime,
GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing)
goto connect_error;
if (pwsrc->negotiated)
break;
if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0)
goto connect_error;
}
caps = pwsrc->caps;
pwsrc->caps = NULL;
pw_thread_loop_unlock (pwsrc->core->loop);
if (caps == NULL)
goto no_caps;
gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0);
GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps);
result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps);
gst_caps_unref (caps);
result = gst_pipewire_src_stream_start (pwsrc);
pwsrc->started = result;
return result;
no_nego_needed:
{
GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
if (thiscaps)
gst_caps_unref (thiscaps);
return TRUE;
}
no_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element did not produce valid caps"));
if (thiscaps)
gst_caps_unref (thiscaps);
return FALSE;
}
no_common_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element does not have formats in common with the peer"));
if (caps)
gst_caps_unref (caps);
return FALSE;
}
connect_error:
{
GST_DEBUG_OBJECT (basesrc, "connect error");
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
static void static void
on_param_changed (void *data, uint32_t id, on_param_changed (void *data, uint32_t id,
const struct spa_pod *param) const struct spa_pod *param)
@ -1348,6 +1214,185 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
} }
} }
static gboolean
gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
{
GstPipeWireSrc *pwsrc = GST_PIPEWIRE_SRC (basesrc);
GstCaps *thiscaps;
GstCaps *caps = NULL;
GstCaps *peercaps = NULL;
gboolean result = FALSE;
GPtrArray *possible;
const char *error = NULL;
struct timespec abstime;
uint32_t target_id;
/* first see what is possible on our source pad */
thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
/* nothing or anything is allowed, we're done */
if (thiscaps == NULL)
goto no_nego_needed;
if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
goto no_caps;
/* get the peer caps */
peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
if (peercaps) {
/* The result is already a subset of our caps */
caps = peercaps;
gst_caps_unref (thiscaps);
} else {
/* no peer, work with our own caps then */
caps = thiscaps;
}
if (caps == NULL || gst_caps_is_empty (caps))
goto no_common_caps;
GST_DEBUG_OBJECT (basesrc, "have common caps: %" GST_PTR_FORMAT, caps);
/* open a connection with these caps */
possible = gst_caps_to_format_all (caps, SPA_PARAM_EnumFormat);
gst_caps_unref (caps);
/* first disconnect */
pw_thread_loop_lock (pwsrc->core->loop);
if (pw_stream_get_state(pwsrc->stream, &error) != PW_STREAM_STATE_UNCONNECTED) {
pwsrc->pool->purge = TRUE;
if (wait_purged (pwsrc)) {
GST_DEBUG_OBJECT (basesrc, "disconnect capture");
pw_stream_disconnect (pwsrc->stream);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for UNCONNECTED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_UNCONNECTED)
break;
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing) {
g_ptr_array_unref (possible);
pwsrc->pool->purge = FALSE;
goto connect_error;
}
pw_thread_loop_wait (pwsrc->core->loop);
}
} else {
pw_thread_loop_signal (pwsrc->core->loop, FALSE);
pw_thread_loop_unlock (pwsrc->core->loop);
gst_pipewire_src_close (pwsrc);
if (!gst_pipewire_src_open (pwsrc)) {
g_ptr_array_unref (possible);
pwsrc->pool->purge = FALSE;
goto connect_error;
}
}
pwsrc->pool->purge = FALSE;
}
target_id = pwsrc->path ? (uint32_t)atoi(pwsrc->path) : PW_ID_ANY;
if (pwsrc->target_object) {
struct spa_dict_item items[2] = {
SPA_DICT_ITEM_INIT(PW_KEY_TARGET_OBJECT, pwsrc->target_object),
SPA_DICT_ITEM_INIT(PW_KEY_NODE_TARGET, NULL),
};
struct spa_dict dict = SPA_DICT_INIT_ARRAY(items);
uint64_t serial;
/* If target.object is a name, set it also to node.target */
if (spa_atou64(pwsrc->target_object, &serial, 0)) {
dict.n_items = 1;
} else {
target_id = PW_ID_ANY;
items[1].value = pwsrc->target_object;
}
pw_stream_update_properties (pwsrc->stream, &dict);
}
GST_DEBUG_OBJECT (basesrc, "connect capture with path %s, target-object %s",
pwsrc->path, pwsrc->target_object);
pwsrc->negotiated = FALSE;
pw_stream_connect (pwsrc->stream,
PW_DIRECTION_INPUT,
target_id,
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_DONT_RECONNECT,
(const struct spa_pod **)possible->pdata,
possible->len);
g_ptr_array_free (possible, TRUE);
pw_thread_loop_get_time (pwsrc->core->loop, &abstime,
GST_PIPEWIRE_DEFAULT_TIMEOUT * SPA_NSEC_PER_SEC);
while (TRUE) {
enum pw_stream_state state = pw_stream_get_state (pwsrc->stream, &error);
GST_DEBUG_OBJECT (basesrc, "waiting for NEGOTIATED, now %s", pw_stream_state_as_string (state));
if (state == PW_STREAM_STATE_ERROR || pwsrc->flushing)
goto connect_error;
if (pwsrc->negotiated)
break;
if (pw_thread_loop_timed_wait_full (pwsrc->core->loop, &abstime) < 0)
goto connect_error;
}
caps = pwsrc->caps;
pwsrc->caps = NULL;
pw_thread_loop_unlock (pwsrc->core->loop);
if (caps == NULL)
goto no_caps;
gst_pipewire_clock_reset (GST_PIPEWIRE_CLOCK (pwsrc->clock), 0);
GST_DEBUG_OBJECT (pwsrc, "set format %" GST_PTR_FORMAT, caps);
result = gst_base_src_set_caps (GST_BASE_SRC (pwsrc), caps);
gst_caps_unref (caps);
result = gst_pipewire_src_stream_start (pwsrc);
pwsrc->started = result;
return result;
no_nego_needed:
{
GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
if (thiscaps)
gst_caps_unref (thiscaps);
return TRUE;
}
no_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element did not produce valid caps"));
if (thiscaps)
gst_caps_unref (thiscaps);
return FALSE;
}
no_common_caps:
{
GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
("No supported formats found"),
("This element does not have formats in common with the peer"));
if (caps)
gst_caps_unref (caps);
return FALSE;
}
connect_error:
{
GST_DEBUG_OBJECT (basesrc, "connect error");
if (pwsrc->core)
pw_thread_loop_unlock (pwsrc->core->loop);
return FALSE;
}
}
static gboolean static gboolean
gst_pipewire_src_send_event (GstElement * elem, GstEvent * event) gst_pipewire_src_send_event (GstElement * elem, GstEvent * event)
{ {