bluetooth/gst: Use GStreamer synchronously within PA's IO thread

Handling multiple threads does not come without overhead, especially
when the end-goal is to ping-pong them making the whole system run
serially.  This patch rips out all that thread handling and instead
"chains" buffers to be encoded/decoded directly into the pipeline,
making them execute their work on the current thread.  The resulting
buffer can be pulled out from appsink immediately without require extra
locking and signalling.  While the overhead on modern systems is found
to be negligible or unnoticable, code complexity of such locking and
signalling systems is prevalent making it the main drive behind this
refactor.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/494>
This commit is contained in:
Marijn Suijten 2021-01-21 11:25:07 +01:00 committed by Arun Raghavan
parent 62deab21a3
commit 201dc6542b
2 changed files with 71 additions and 127 deletions

View file

@ -53,7 +53,6 @@ static GstFlowReturn app_sink_new_sample(GstAppSink *appsink, gpointer userdata)
gst_buffer_ref(buf);
gst_adapter_push(info->sink_adapter, buf);
gst_sample_unref(sample);
pa_fdsem_post(info->sample_ready_fdsem);
return GST_FLOW_OK;
}
@ -61,61 +60,19 @@ static GstFlowReturn app_sink_new_sample(GstAppSink *appsink, gpointer userdata)
static void gst_deinit_common(struct gst_info *info) {
if (!info)
return;
if (info->sample_ready_fdsem)
pa_fdsem_free(info->sample_ready_fdsem);
if (info->app_src)
gst_object_unref(info->app_src);
if (info->app_sink)
gst_object_unref(info->app_sink);
if (info->sink_adapter)
g_object_unref(info->sink_adapter);
if (info->pipeline)
gst_object_unref(info->pipeline);
}
static GstBusSyncReply sync_bus_handler (GstBus *bus, GstMessage *message, struct gst_info *info) {
GstStreamStatusType type;
GstElement *owner;
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_STREAM_STATUS:
gst_message_parse_stream_status (message, &type, &owner);
switch (type) {
case GST_STREAM_STATUS_TYPE_ENTER:
pa_log_debug("GStreamer pipeline thread starting up");
if (info->core->realtime_scheduling)
pa_thread_make_realtime(info->core->realtime_priority);
break;
case GST_STREAM_STATUS_TYPE_LEAVE:
pa_log_debug("GStreamer pipeline thread shutting down");
break;
default:
break;
}
break;
default:
break;
}
/* pass all messages on the async queue */
return GST_BUS_PASS;
if (info->bin)
gst_object_unref(info->bin);
}
bool gst_init_common(struct gst_info *info) {
GstElement *pipeline = NULL;
GstElement *appsrc = NULL, *appsink = NULL;
GstElement *bin = NULL;
GstElement *appsink = NULL;
GstAdapter *adapter;
GstAppSinkCallbacks callbacks = { 0, };
GstBus *bus;
appsrc = gst_element_factory_make("appsrc", "app_source");
if (!appsrc) {
pa_log_error("Could not create appsrc element");
goto fail;
}
g_object_set(appsrc, "is-live", FALSE, "format", GST_FORMAT_TIME, "stream-type", 0, "max-bytes", 0, NULL);
appsink = gst_element_factory_make("appsink", "app_sink");
if (!appsink) {
@ -131,65 +88,22 @@ bool gst_init_common(struct gst_info *info) {
adapter = gst_adapter_new();
pa_assert(adapter);
pipeline = gst_pipeline_new(NULL);
pa_assert(pipeline);
bin = gst_bin_new(NULL);
pa_assert(bin);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
gst_bus_set_sync_handler (bus, (GstBusSyncHandler) sync_bus_handler, info, NULL);
gst_object_unref (bus);
info->app_src = appsrc;
info->app_sink = appsink;
info->sink_adapter = adapter;
info->pipeline = pipeline;
info->sample_ready_fdsem = pa_fdsem_new();
info->bin = bin;
return true;
fail:
if (appsrc)
gst_object_unref(appsrc);
if (appsink)
gst_object_unref(appsink);
return false;
}
/*
* The idea of using buffer probes is as follows. We set a buffer probe on the
* encoder sink pad. In the buffer probe, we set an idle probe on the upstream
* source pad. In encode_buffer, we wait on the fdsem. The fdsem gets posted
* when either new_sample or idle probe gets called. We do this, to make the
* appsink behave synchronously.
*
* For buffer probes, see
* https://gstreamer.freedesktop.org/documentation/additional/design/probes.html?gi-language=c
*/
static GstPadProbeReturn gst_sink_buffer_idle_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
{
struct gst_info *info = (struct gst_info *)userdata;
pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_IDLE);
pa_fdsem_post(info->sample_ready_fdsem);
return GST_PAD_PROBE_REMOVE;
}
static GstPadProbeReturn gst_sink_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
{
struct gst_info *info = (struct gst_info *)userdata;
GstPad *peer_pad;
pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_BUFFER);
peer_pad = gst_pad_get_peer(pad);
gst_pad_add_probe(peer_pad, GST_PAD_PROBE_TYPE_IDLE, gst_sink_buffer_idle_probe, info, NULL);
gst_object_unref(peer_pad);
return GST_PAD_PROBE_OK;
}
static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) {
gchar *sample_format;
GstCaps *caps;
@ -240,6 +154,10 @@ static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) {
bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transcoder) {
GstPad *pad;
GstCaps *caps;
GstEvent *event;
GstSegment segment;
GstEvent *stream_start;
guint group_id;
pa_assert(transcoder);
@ -248,30 +166,49 @@ bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transc
if (!gst_init_common(info))
goto common_fail;
caps = gst_create_caps_from_sample_spec(info->ss);
if (for_encoding)
g_object_set(info->app_src, "caps", caps, NULL);
else
g_object_set(info->app_sink, "caps", caps, NULL);
gst_caps_unref(caps);
gst_bin_add_many(GST_BIN(info->bin), transcoder, info->app_sink, NULL);
gst_bin_add_many(GST_BIN(info->pipeline), info->app_src, transcoder, info->app_sink, NULL);
if (!gst_element_link_many(info->app_src, transcoder, info->app_sink, NULL)) {
if (!gst_element_link_many(transcoder, info->app_sink, NULL)) {
pa_log_error("Failed to link codec elements into pipeline");
goto pipeline_fail;
}
if (gst_element_set_state(info->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
pad = gst_element_get_static_pad(transcoder, "sink");
pa_assert_se(gst_element_add_pad(info->bin, gst_ghost_pad_new("sink", pad)));
/**
* Only the sink pad is needed to push buffers. Cache it since
* gst_element_get_static_pad is relatively expensive and verbose
* on higher log levels.
*/
info->pad_sink = pad;
if (gst_element_set_state(info->bin, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
pa_log_error("Could not start pipeline");
goto pipeline_fail;
}
/* See the comment on buffer probe functions */
pad = gst_element_get_static_pad(transcoder, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, gst_sink_buffer_probe, info, NULL);
gst_object_unref(pad);
/* First, send stream-start sticky event */
group_id = gst_util_group_id_next();
stream_start = gst_event_new_stream_start("gst-codec-pa");
gst_event_set_group_id(stream_start, group_id);
gst_pad_send_event(info->pad_sink, stream_start);
/* Retrieve the pad that handles the PCM format between PA and GStreamer */
if (for_encoding)
pad = gst_element_get_static_pad(transcoder, "sink");
else
pad = gst_element_get_static_pad(transcoder, "src");
/* Second, send caps sticky event */
caps = gst_create_caps_from_sample_spec(info->ss);
pa_assert_se(gst_pad_set_caps(pad, caps));
gst_caps_unref(caps);
gst_object_unref(GST_OBJECT(pad));
/* Third, send segment sticky event */
gst_segment_init(&segment, GST_FORMAT_TIME);
event = gst_event_new_segment(&segment);
gst_pad_send_event(info->pad_sink, event);
pa_log_info("GStreamer pipeline initialisation succeeded");
@ -299,25 +236,32 @@ size_t gst_transcode_buffer(void *codec_info, const uint8_t *input_buffer, size_
struct gst_info *info = (struct gst_info *) codec_info;
gsize available, transcoded;
GstBuffer *in_buf;
GstMapInfo map_info;
GstFlowReturn ret;
size_t written = 0;
in_buf = gst_buffer_new_allocate(NULL, input_size, NULL);
pa_assert(info->pad_sink);
in_buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY,
(gpointer)input_buffer, input_size, 0, input_size, NULL, NULL);
pa_assert(in_buf);
/* Acquire an extra reference to validate refcount afterwards */
gst_mini_object_ref(GST_MINI_OBJECT_CAST(in_buf));
pa_assert(GST_MINI_OBJECT_REFCOUNT_VALUE(in_buf) == 2);
pa_assert_se(gst_buffer_map(in_buf, &map_info, GST_MAP_WRITE));
memcpy(map_info.data, input_buffer, input_size);
gst_buffer_unmap(in_buf, &map_info);
ret = gst_pad_chain(info->pad_sink, in_buf);
/**
* Ensure we're the only one holding a reference to this buffer after gst_pad_chain,
* which internally holds a pointer reference to input_buffer. The caller provides
* no guarantee to the validity of this pointer after returning from this function.
*/
pa_assert(GST_MINI_OBJECT_REFCOUNT_VALUE(in_buf) == 1);
gst_mini_object_unref(GST_MINI_OBJECT_CAST(in_buf));
ret = gst_app_src_push_buffer(GST_APP_SRC(info->app_src), in_buf);
if (ret != GST_FLOW_OK) {
pa_log_error("failed to push buffer for transcoding %d", ret);
goto fail;
}
pa_fdsem_wait(info->sample_ready_fdsem);
available = gst_adapter_available(info->sink_adapter);
if (available) {
@ -343,17 +287,16 @@ fail:
void gst_codec_deinit(void *codec_info) {
struct gst_info *info = (struct gst_info *) codec_info;
if (info->sample_ready_fdsem)
pa_fdsem_free(info->sample_ready_fdsem);
if (info->pipeline) {
gst_element_set_state(info->pipeline, GST_STATE_NULL);
gst_object_unref(info->pipeline);
if (info->bin) {
gst_element_set_state(info->bin, GST_STATE_NULL);
gst_object_unref(info->bin);
}
if (info->sink_adapter)
g_object_unref(info->sink_adapter);
if (info->pad_sink)
gst_object_unref(GST_OBJECT(info->pad_sink));
pa_xfree(info);
}

View file

@ -43,11 +43,12 @@ struct gst_info {
const a2dp_ldac_t *ldac_config;
} a2dp_codec_t;
GstElement *app_src, *app_sink;
GstElement *pipeline;
/* The appsink element that accumulates encoded/decoded buffers */
GstElement *app_sink;
GstElement *bin;
GstAdapter *sink_adapter;
pa_fdsem *sample_ready_fdsem;
/* The sink pad to push to-be-encoded/decoded buffers into */
GstPad *pad_sink;
uint16_t seq_num;
};