gst: remove internal queue

Remove the extra queue and use the queue from pw_stream.
Fix some locks and the bufferpool handling to make audio playback
work.
This commit is contained in:
Wim Taymans 2020-04-23 15:56:12 +02:00
parent 7b13ba202b
commit 39c2d5b963
2 changed files with 20 additions and 35 deletions

View file

@ -276,8 +276,6 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink);
g_queue_init (&sink->queue);
sink->loop = pw_thread_loop_new ("pipewire-sink-loop", NULL);
sink->context = pw_context_new (pw_thread_loop_get_loop(sink->loop), NULL, 0);
GST_DEBUG ("loop %p context %p", sink->loop, sink->context);
@ -402,7 +400,6 @@ on_add_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = _data;
gst_pipewire_pool_wrap_buffer (pwsink->pool, b);
pw_thread_loop_signal (pwsink->loop, FALSE);
}
static void
@ -413,26 +410,17 @@ on_remove_buffer (void *_data, struct pw_buffer *b)
GST_LOG_OBJECT (pwsink, "remove buffer");
if (g_queue_remove (&pwsink->queue, data->buf))
gst_buffer_unref (data->buf);
gst_buffer_unref (data->buf);
}
static void
do_send_buffer (GstPipeWireSink *pwsink)
do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer)
{
GstBuffer *buffer;
GstPipeWirePoolData *data;
gboolean res;
guint i;
struct spa_buffer *b;
buffer = g_queue_pop_head (&pwsink->queue);
if (buffer == NULL) {
GST_WARNING ("out of buffers");
return;
}
data = gst_pipewire_pool_get_data(buffer);
b = data->b->buffer;
@ -451,9 +439,7 @@ do_send_buffer (GstPipeWireSink *pwsink)
if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) {
g_warning ("can't send buffer %s", spa_strerror(res));
pw_thread_loop_signal (pwsink->loop, FALSE);
} else
pwsink->need_ready--;
}
}
@ -461,17 +447,8 @@ static void
on_process (void *data)
{
GstPipeWireSink *pwsink = data;
if (pwsink->stream == NULL) {
GST_LOG_OBJECT (pwsink, "no stream");
return;
}
GST_DEBUG ("signal");
g_cond_signal (&pwsink->pool->cond);
pwsink->need_ready++;
GST_DEBUG ("need buffer %u", pwsink->need_ready);
do_send_buffer (pwsink);
}
static void
@ -515,6 +492,10 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
enum pw_stream_state state;
const char *error = NULL;
gboolean res = FALSE;
GstStructure *config;
guint size;
guint min_buffers;
guint max_buffers;
pwsink = GST_PIPEWIRE_SINK (bsink);
@ -555,6 +536,11 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
}
res = TRUE;
config = gst_buffer_pool_get_config (GST_BUFFER_POOL_CAST (pwsink->pool));
gst_buffer_pool_config_get_params (config, NULL, &size, &min_buffers, &max_buffers);
gst_buffer_pool_config_set_params (config, caps, size, min_buffers, max_buffers);
gst_buffer_pool_set_config (GST_BUFFER_POOL_CAST (pwsink->pool), config);
pw_thread_loop_unlock (pwsink->loop);
pwsink->negotiated = res;
@ -590,6 +576,8 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
GstBuffer *b = NULL;
GstMapInfo info = { 0, };
pw_thread_loop_unlock (pwsink->loop);
if (!gst_buffer_pool_is_active (GST_BUFFER_POOL_CAST (pwsink->pool)))
gst_buffer_pool_set_active (GST_BUFFER_POOL_CAST (pwsink->pool), TRUE);
@ -601,15 +589,14 @@ gst_pipewire_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
gst_buffer_unmap (b, &info);
gst_buffer_resize (b, 0, gst_buffer_get_size (buffer));
buffer = b;
} else {
gst_buffer_ref (buffer);
pw_thread_loop_lock (pwsink->loop);
if (pw_stream_get_state (pwsink->stream, &error) != PW_STREAM_STATE_STREAMING)
goto done;
}
GST_DEBUG ("push buffer in queue");
g_queue_push_tail (&pwsink->queue, buffer);
if (pwsink->mode == GST_PIPEWIRE_SINK_MODE_PROVIDE)
do_send_buffer (pwsink);
GST_DEBUG ("push buffer");
do_send_buffer (pwsink, buffer);
done:
pw_thread_loop_unlock (pwsink->loop);

View file

@ -95,8 +95,6 @@ struct _GstPipeWireSink {
GstPipeWireSinkMode mode;
GstPipeWirePool *pool;
GQueue queue;
guint need_ready;
};
struct _GstPipeWireSinkClass {