bluetooth/gst: Unify encoder and decoder pipeline setup

The encoding and decoding pipeline are essentially identical: both push
data in via an appsrc, route it through a codec-specific (opaque)
element, and finally pull data out of an appsink. The code already makes
it impossible to have an encoding and decoding pipeline simultaneously
set up in `gst_info`, and converting `bool for_encoding` to a tri-state
(encode, decode, or both) would be messy; particularly when encoding and
decoding could possibly differ in format.

This change removes a swath of code and removes the possibility of
misusing `enc_` or `dec_` in the wrong place (ie. after copying a bit of
code and forgetting to rename one or two). When bidirectional codecs
come online a second codec instance (`gst_info`) can simply be created
and controlled independently.

Part-of: <https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/merge_requests/487>
This commit is contained in:
Marijn Suijten 2021-01-23 20:39:15 +01:00 committed by PulseAudio Marge Bot
parent 92af2c90f9
commit c05c6f9eee
4 changed files with 129 additions and 344 deletions

View file

@ -35,81 +35,42 @@
#include "a2dp-codec-gst.h"
/* Called from the GStreamer streaming thread */
static void enc_sink_eos(GstAppSink *appsink, gpointer userdata) {
pa_log_debug("Encoder got EOS");
static void app_sink_eos(GstAppSink *appsink, gpointer userdata) {
pa_log_debug("Sink got EOS");
}
/* Called from the GStreamer streaming thread */
static GstFlowReturn enc_sink_new_sample(GstAppSink *appsink, gpointer userdata) {
static GstFlowReturn app_sink_new_sample(GstAppSink *appsink, gpointer userdata) {
struct gst_info *info = (struct gst_info *) userdata;
GstSample *sample = NULL;
GstBuffer *buf;
sample = gst_app_sink_pull_sample(GST_APP_SINK(info->enc_sink));
sample = gst_app_sink_pull_sample(GST_APP_SINK(info->app_sink));
if (!sample)
return GST_FLOW_OK;
buf = gst_sample_get_buffer(sample);
gst_buffer_ref(buf);
gst_adapter_push(info->enc_adapter, buf);
gst_adapter_push(info->sink_adapter, buf);
gst_sample_unref(sample);
pa_fdsem_post(info->enc_fdsem);
pa_fdsem_post(info->sample_ready_fdsem);
return GST_FLOW_OK;
}
/* Called from the GStreamer streaming thread */
static void dec_sink_eos(GstAppSink *appsink, gpointer userdata) {
pa_log_debug("Decoder got EOS");
}
/* Called from the GStreamer streaming thread */
static GstFlowReturn dec_sink_new_sample(GstAppSink *appsink, gpointer userdata) {
struct gst_info *info = (struct gst_info *) userdata;
GstSample *sample = NULL;
GstBuffer *buf;
sample = gst_app_sink_pull_sample(GST_APP_SINK(info->dec_sink));
if (!sample)
return GST_FLOW_OK;
buf = gst_sample_get_buffer(sample);
gst_buffer_ref(buf);
gst_adapter_push(info->dec_adapter, buf);
gst_sample_unref(sample);
pa_fdsem_post(info->dec_fdsem);
return GST_FLOW_OK;
}
static void gst_deinit_enc_common(struct gst_info *info) {
static void gst_deinit_common(struct gst_info *info) {
if (!info)
return;
if (info->enc_fdsem)
pa_fdsem_free(info->enc_fdsem);
if (info->enc_src)
gst_object_unref(info->enc_src);
if (info->enc_sink)
gst_object_unref(info->enc_sink);
if (info->enc_adapter)
g_object_unref(info->enc_adapter);
if (info->enc_pipeline)
gst_object_unref(info->enc_pipeline);
}
static void gst_deinit_dec_common(struct gst_info *info) {
if (!info)
return;
if (info->dec_fdsem)
pa_fdsem_free(info->dec_fdsem);
if (info->dec_src)
gst_object_unref(info->dec_src);
if (info->dec_sink)
gst_object_unref(info->dec_sink);
if (info->dec_adapter)
g_object_unref(info->dec_adapter);
if (info->dec_pipeline)
gst_object_unref(info->dec_pipeline);
if (info->sample_ready_fdsem)
pa_fdsem_free(info->sample_ready_fdsem);
if (info->app_src)
gst_object_unref(info->app_src);
if (info->app_sink)
gst_object_unref(info->app_sink);
if (info->sink_adapter)
g_object_unref(info->sink_adapter);
if (info->pipeline)
gst_object_unref(info->pipeline);
}
static GstBusSyncReply sync_bus_handler (GstBus *bus, GstMessage *message, struct gst_info *info) {
@ -142,29 +103,29 @@ static GstBusSyncReply sync_bus_handler (GstBus *bus, GstMessage *message, struc
return GST_BUS_PASS;
}
bool gst_init_enc_common(struct gst_info *info) {
bool gst_init_common(struct gst_info *info) {
GstElement *pipeline = NULL;
GstElement *appsrc = NULL, *appsink = NULL;
GstAdapter *adapter;
GstAppSinkCallbacks callbacks = { 0, };
GstBus *bus;
appsrc = gst_element_factory_make("appsrc", "enc_source");
appsrc = gst_element_factory_make("appsrc", "app_source");
if (!appsrc) {
pa_log_error("Could not create appsrc element");
goto fail;
}
g_object_set(appsrc, "is-live", FALSE, "format", GST_FORMAT_TIME, "stream-type", 0, "max-bytes", 0, NULL);
appsink = gst_element_factory_make("appsink", "enc_sink");
appsink = gst_element_factory_make("appsink", "app_sink");
if (!appsink) {
pa_log_error("Could not create appsink element");
goto fail;
}
g_object_set(appsink, "sync", FALSE, "async", FALSE, "enable-last-sample", FALSE, NULL);
callbacks.eos = enc_sink_eos;
callbacks.new_sample = enc_sink_new_sample;
callbacks.eos = app_sink_eos;
callbacks.new_sample = app_sink_new_sample;
gst_app_sink_set_callbacks(GST_APP_SINK(appsink), &callbacks, info, NULL);
adapter = gst_adapter_new();
@ -177,63 +138,11 @@ bool gst_init_enc_common(struct gst_info *info) {
gst_bus_set_sync_handler (bus, (GstBusSyncHandler) sync_bus_handler, info, NULL);
gst_object_unref (bus);
info->enc_src = appsrc;
info->enc_sink = appsink;
info->enc_adapter = adapter;
info->enc_pipeline = pipeline;
info->enc_fdsem = pa_fdsem_new();
return true;
fail:
if (appsrc)
gst_object_unref(appsrc);
if (appsink)
gst_object_unref(appsink);
return false;
}
bool gst_init_dec_common(struct gst_info *info) {
GstElement *pipeline = NULL;
GstElement *appsrc = NULL, *appsink = NULL;
GstAdapter *adapter;
GstAppSinkCallbacks callbacks = { 0, };
GstBus *bus;
appsrc = gst_element_factory_make("appsrc", "dec_source");
if (!appsrc) {
pa_log_error("Could not create decoder appsrc element");
goto fail;
}
g_object_set(appsrc, "is-live", FALSE, "format", GST_FORMAT_TIME, "stream-type", 0, "max-bytes", 0, NULL);
appsink = gst_element_factory_make("appsink", "dec_sink");
if (!appsink) {
pa_log_error("Could not create decoder appsink element");
goto fail;
}
g_object_set(appsink, "sync", FALSE, "async", FALSE, "enable-last-sample", FALSE, NULL);
callbacks.eos = dec_sink_eos;
callbacks.new_sample = dec_sink_new_sample;
gst_app_sink_set_callbacks(GST_APP_SINK(appsink), &callbacks, info, NULL);
adapter = gst_adapter_new();
pa_assert(adapter);
pipeline = gst_pipeline_new(NULL);
pa_assert(pipeline);
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
gst_bus_set_sync_handler (bus, (GstBusSyncHandler) sync_bus_handler, info, NULL);
gst_object_unref (bus);
info->dec_src = appsrc;
info->dec_sink = appsink;
info->dec_adapter = adapter;
info->dec_pipeline = pipeline;
info->dec_fdsem = pa_fdsem_new();
info->app_src = appsrc;
info->app_sink = appsink;
info->sink_adapter = adapter;
info->pipeline = pipeline;
info->sample_ready_fdsem = pa_fdsem_new();
return true;
@ -256,18 +165,18 @@ fail:
* For buffer probes, see
* https://gstreamer.freedesktop.org/documentation/additional/design/probes.html?gi-language=c
*/
static GstPadProbeReturn gst_enc_appsink_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
static GstPadProbeReturn gst_sink_buffer_idle_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
{
struct gst_info *info = (struct gst_info *)userdata;
pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_IDLE);
pa_fdsem_post(info->enc_fdsem);
pa_fdsem_post(info->sample_ready_fdsem);
return GST_PAD_PROBE_REMOVE;
}
static GstPadProbeReturn gst_encoder_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
static GstPadProbeReturn gst_sink_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
{
struct gst_info *info = (struct gst_info *)userdata;
GstPad *peer_pad;
@ -275,32 +184,7 @@ static GstPadProbeReturn gst_encoder_buffer_probe(GstPad *pad, GstPadProbeInfo *
pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_BUFFER);
peer_pad = gst_pad_get_peer(pad);
gst_pad_add_probe(peer_pad, GST_PAD_PROBE_TYPE_IDLE, gst_enc_appsink_buffer_probe, info, NULL);
gst_object_unref(peer_pad);
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn gst_dec_appsink_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
{
struct gst_info *info = (struct gst_info *)userdata;
pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_IDLE);
pa_fdsem_post(info->dec_fdsem);
return GST_PAD_PROBE_REMOVE;
}
static GstPadProbeReturn gst_decoder_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata)
{
struct gst_info *info = (struct gst_info *)userdata;
GstPad *peer_pad;
pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_BUFFER);
peer_pad = gst_pad_get_peer(pad);
gst_pad_add_probe(peer_pad, GST_PAD_PROBE_TYPE_IDLE, gst_dec_appsink_buffer_probe, info, NULL);
gst_pad_add_probe(peer_pad, GST_PAD_PROBE_TYPE_IDLE, gst_sink_buffer_idle_probe, info, NULL);
gst_object_unref(peer_pad);
return GST_PAD_PROBE_OK;
@ -353,101 +237,67 @@ static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) {
return caps;
}
bool gst_codec_init(struct gst_info *info, bool for_encoding) {
bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transcoder) {
GstPad *pad;
GstCaps *caps;
pa_assert(transcoder);
info->seq_num = 0;
if (for_encoding) {
if (!gst_init_enc_common(info))
goto fail_common;
} else {
if (!gst_init_dec_common(info))
goto fail_common;
}
if (!gst_init_common(info))
goto common_fail;
caps = gst_create_caps_from_sample_spec(info->ss);
if (for_encoding)
g_object_set(info->app_src, "caps", caps, NULL);
else
g_object_set(info->app_sink, "caps", caps, NULL);
gst_caps_unref(caps);
/* In case if we ever have a codec which supports decoding but not encoding */
if (for_encoding) {
pa_assert(info->enc_bin);
g_object_set(info->enc_src, "caps", caps, NULL);
gst_caps_unref(caps);
gst_bin_add_many(GST_BIN(info->pipeline), info->app_src, transcoder, info->app_sink, NULL);
gst_bin_add_many(GST_BIN(info->enc_pipeline), info->enc_src, info->enc_bin, info->enc_sink, NULL);
if (!gst_element_link_many(info->enc_src, info->enc_bin, info->enc_sink, NULL)) {
pa_log_error("Failed to link encoder elements");
goto enc_dec_fail;
}
if (gst_element_set_state(info->enc_pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
pa_log_error("Could not start encoder pipeline");
goto enc_dec_fail;
}
/* See the comment on buffer probe functions */
pad = gst_element_get_static_pad(info->enc_bin, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, gst_encoder_buffer_probe, info, NULL);
gst_object_unref(pad);
} else {
pa_assert(info->dec_bin);
g_object_set(info->dec_sink, "caps", caps, NULL);
gst_caps_unref(caps);
gst_bin_add_many(GST_BIN(info->dec_pipeline), info->dec_src, info->dec_bin, info->dec_sink, NULL);
if (!gst_element_link_many(info->dec_src, info->dec_bin, info->dec_sink, NULL)) {
pa_log_error("Failed to link decoder elements");
goto enc_dec_fail;
}
if (gst_element_set_state(info->dec_pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
pa_log_error("Could not start decoder pipeline");
goto enc_dec_fail;
}
/* See the comment on buffer probe functions */
pad = gst_element_get_static_pad(info->dec_bin, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, gst_decoder_buffer_probe, info, NULL);
gst_object_unref(pad);
if (!gst_element_link_many(info->app_src, transcoder, info->app_sink, NULL)) {
pa_log_error("Failed to link codec elements into pipeline");
goto pipeline_fail;
}
if (gst_element_set_state(info->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
pa_log_error("Could not start pipeline");
goto pipeline_fail;
}
/* See the comment on buffer probe functions */
pad = gst_element_get_static_pad(transcoder, "sink");
gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, gst_sink_buffer_probe, info, NULL);
gst_object_unref(pad);
pa_log_info("GStreamer pipeline initialisation succeeded");
return true;
enc_dec_fail:
if (for_encoding)
gst_deinit_enc_common(info);
else
gst_deinit_dec_common(info);
pipeline_fail:
gst_deinit_common(info);
pa_log_error("GStreamer pipeline initialisation failed");
return false;
fail_common:
/* If common initialization fails these have not had their ownership
common_fail:
/* If common initialization fails the bin has not yet had its ownership
* transferred to the pipeline yet.
*/
if (for_encoding)
gst_object_unref(info->enc_bin);
else
gst_object_unref(info->dec_bin);
gst_object_unref(transcoder);
pa_log_error("GStreamer pipeline creation failed");
return false;
}
size_t gst_encode_buffer(void *codec_info, uint32_t timestamp, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
size_t gst_transcode_buffer(void *codec_info, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
struct gst_info *info = (struct gst_info *) codec_info;
gsize available, encoded;
gsize available, transcoded;
GstBuffer *in_buf;
GstMapInfo map_info;
GstFlowReturn ret;
@ -460,70 +310,25 @@ size_t gst_encode_buffer(void *codec_info, uint32_t timestamp, const uint8_t *in
memcpy(map_info.data, input_buffer, input_size);
gst_buffer_unmap(in_buf, &map_info);
ret = gst_app_src_push_buffer(GST_APP_SRC(info->enc_src), in_buf);
ret = gst_app_src_push_buffer(GST_APP_SRC(info->app_src), in_buf);
if (ret != GST_FLOW_OK) {
pa_log_error("failed to push buffer for encoding %d", ret);
pa_log_error("failed to push buffer for transcoding %d", ret);
goto fail;
}
pa_fdsem_wait(info->enc_fdsem);
pa_fdsem_wait(info->sample_ready_fdsem);
available = gst_adapter_available(info->enc_adapter);
available = gst_adapter_available(info->sink_adapter);
if (available) {
encoded = PA_MIN(available, output_size);
transcoded = PA_MIN(available, output_size);
gst_adapter_copy(info->enc_adapter, output_buffer, 0, encoded);
gst_adapter_flush(info->enc_adapter, encoded);
gst_adapter_copy(info->sink_adapter, output_buffer, 0, transcoded);
gst_adapter_flush(info->sink_adapter, transcoded);
written += encoded;
written += transcoded;
} else
pa_log_debug("No encoded data available in adapter");
*processed = input_size;
return written;
fail:
*processed = 0;
return written;
}
size_t gst_decode_buffer(void *codec_info, const uint8_t *input_buffer, size_t input_size, uint8_t *output_buffer, size_t output_size, size_t *processed) {
struct gst_info *info = (struct gst_info *) codec_info;
gsize available, decoded;
GstBuffer *in_buf;
GstMapInfo map_info;
GstFlowReturn ret;
size_t written = 0;
in_buf = gst_buffer_new_allocate(NULL, input_size, NULL);
pa_assert(in_buf);
pa_assert_se(gst_buffer_map(in_buf, &map_info, GST_MAP_WRITE));
memcpy(map_info.data, input_buffer, input_size);
gst_buffer_unmap(in_buf, &map_info);
ret = gst_app_src_push_buffer(GST_APP_SRC(info->dec_src), in_buf);
if (ret != GST_FLOW_OK) {
pa_log_error("failed to push buffer for decoding %d", ret);
goto fail;
}
pa_fdsem_wait(info->dec_fdsem);
available = gst_adapter_available(info->dec_adapter);
if (available) {
decoded = PA_MIN(available, output_size);
gst_adapter_copy(info->dec_adapter, output_buffer, 0, decoded);
gst_adapter_flush(info->dec_adapter, decoded);
written += decoded;
} else
pa_log_debug("No decoded data available in adapter");
pa_log_debug("No transcoded data available in adapter");
*processed = input_size;
@ -538,27 +343,17 @@ fail:
void gst_codec_deinit(void *codec_info) {
struct gst_info *info = (struct gst_info *) codec_info;
if (info->enc_fdsem)
pa_fdsem_free(info->enc_fdsem);
if (info->sample_ready_fdsem)
pa_fdsem_free(info->sample_ready_fdsem);
if (info->dec_fdsem)
pa_fdsem_free(info->dec_fdsem);
if (info->enc_pipeline) {
gst_element_set_state(info->enc_pipeline, GST_STATE_NULL);
gst_object_unref(info->enc_pipeline);
if (info->pipeline) {
gst_element_set_state(info->pipeline, GST_STATE_NULL);
gst_object_unref(info->pipeline);
}
if (info->dec_pipeline) {
gst_element_set_state(info->dec_pipeline, GST_STATE_NULL);
gst_object_unref(info->dec_pipeline);
}
if (info->enc_adapter)
g_object_unref(info->enc_adapter);
if (info->dec_adapter)
g_object_unref(info->dec_adapter);
if (info->sink_adapter)
g_object_unref(info->sink_adapter);
pa_xfree(info);
}