pipewiresrc: fix handling outstanding buffers when stopping

The current code has several issues if a pipewiresrc is stopped and deleted
while a buffer is still in use downstream.

 - pw_stream_queue_buffer() is never called for the corresponding
   pw_buffer. As a result, the busy counter is never decremented and
   pw_stream_dequeue_buffer() of the corresponding pipewire output will
   return NULL whenever it encounters this buffer.

 - The pipewiresrc does not own the buffer reference unless the buffer is
   queued in the source, so calling gst_buffer_unref() unconditionally
   causes refcount issues for the buffer.

 - buffer_recycle() can race with on_remove_buffer() and
   gst_pipewire_src_close(). As a result, buffer_recycle() may access pwsrc
   when it was already deleted.

The buffer has its own reference to the pool. So the pool object lock can
be used to ensure that the pwsrc, core and stream remain valid in.
buffer_recycle(). If the 'dispose' function pointer was already cleared,
then on_remove_buffer() has already finished, so abort early.

With the pool lock held, it is save to access the pipewire loop. Now the
loop lock can be used to synchronize with on_remove_buffer(). 'dispose'
must be checked again in case on_remove_buffer() was triggered by something
other than gst_pipewire_src_close().

In on_remove_buffer() unref the buffer if it is queued. Otherwise call
pw_stream_queue_buffer() to ensure that the busy counter is decremented
correctly.
This commit is contained in:
Michael Olbrich 2021-09-08 09:34:45 +02:00 committed by Wim Taymans
parent 744a8aaeb7
commit 314ff82ff0

View file

@ -400,19 +400,34 @@ buffer_recycle (GstMiniObject *obj)
GstPipeWireSrc *src;
GstPipeWirePoolData *data;
gst_mini_object_ref (obj);
data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
GST_OBJECT_LOCK (data->pool);
if (!obj->dispose) {
GST_OBJECT_UNLOCK (data->pool);
return TRUE;
}
GST_BUFFER_FLAGS (obj) = data->flags;
src = data->owner;
pw_thread_loop_lock (src->core->loop);
if (!obj->dispose) {
pw_thread_loop_unlock (src->core->loop);
GST_OBJECT_UNLOCK (data->pool);
return TRUE;
}
gst_mini_object_ref (obj);
data->queued = TRUE;
GST_LOG_OBJECT (src, "recycle buffer %p", obj);
pw_thread_loop_lock (src->core->loop);
if (src->stream)
pw_stream_queue_buffer (src->stream, data->b);
pw_stream_queue_buffer (src->stream, data->b);
pw_thread_loop_unlock (src->core->loop);
GST_OBJECT_UNLOCK (data->pool);
return FALSE;
}
@ -441,7 +456,10 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
gst_buffer_unref (buf);
if (data->queued)
gst_buffer_unref (buf);
else
pw_stream_queue_buffer (pwsrc->stream, b);
}
static GstBuffer *dequeue_buffer(GstPipeWireSrc *pwsrc)
@ -1170,12 +1188,14 @@ gst_pipewire_src_close (GstPipeWireSrc * pwsrc)
g_clear_object (&pwsrc->clock);
GST_OBJECT_UNLOCK (pwsrc);
GST_OBJECT_LOCK (pwsrc->pool);
pw_thread_loop_lock (pwsrc->core->loop);
if (pwsrc->stream) {
pw_stream_destroy (pwsrc->stream);
pwsrc->stream = NULL;
}
pw_thread_loop_unlock (pwsrc->core->loop);
GST_OBJECT_UNLOCK (pwsrc->pool);
if (pwsrc->core) {
gst_pipewire_core_release (pwsrc->core);