diff --git a/src/modules/bluetooth/a2dp-codec-gst.c b/src/modules/bluetooth/a2dp-codec-gst.c index c456ff55b..6b2404dc3 100644 --- a/src/modules/bluetooth/a2dp-codec-gst.c +++ b/src/modules/bluetooth/a2dp-codec-gst.c @@ -53,7 +53,6 @@ static GstFlowReturn app_sink_new_sample(GstAppSink *appsink, gpointer userdata) gst_buffer_ref(buf); gst_adapter_push(info->sink_adapter, buf); gst_sample_unref(sample); - pa_fdsem_post(info->sample_ready_fdsem); return GST_FLOW_OK; } @@ -61,61 +60,19 @@ static GstFlowReturn app_sink_new_sample(GstAppSink *appsink, gpointer userdata) static void gst_deinit_common(struct gst_info *info) { if (!info) return; - if (info->sample_ready_fdsem) - pa_fdsem_free(info->sample_ready_fdsem); - if (info->app_src) - gst_object_unref(info->app_src); if (info->app_sink) gst_object_unref(info->app_sink); if (info->sink_adapter) g_object_unref(info->sink_adapter); - if (info->pipeline) - gst_object_unref(info->pipeline); -} - -static GstBusSyncReply sync_bus_handler (GstBus *bus, GstMessage *message, struct gst_info *info) { - GstStreamStatusType type; - GstElement *owner; - - switch (GST_MESSAGE_TYPE (message)) { - case GST_MESSAGE_STREAM_STATUS: - - gst_message_parse_stream_status (message, &type, &owner); - - switch (type) { - case GST_STREAM_STATUS_TYPE_ENTER: - pa_log_debug("GStreamer pipeline thread starting up"); - if (info->core->realtime_scheduling) - pa_thread_make_realtime(info->core->realtime_priority); - break; - case GST_STREAM_STATUS_TYPE_LEAVE: - pa_log_debug("GStreamer pipeline thread shutting down"); - break; - default: - break; - } - break; - default: - break; - } - - /* pass all messages on the async queue */ - return GST_BUS_PASS; + if (info->bin) + gst_object_unref(info->bin); } bool gst_init_common(struct gst_info *info) { - GstElement *pipeline = NULL; - GstElement *appsrc = NULL, *appsink = NULL; + GstElement *bin = NULL; + GstElement *appsink = NULL; GstAdapter *adapter; GstAppSinkCallbacks callbacks = { 0, }; - GstBus *bus; - - appsrc = gst_element_factory_make("appsrc", "app_source"); - if (!appsrc) { - pa_log_error("Could not create appsrc element"); - goto fail; - } - g_object_set(appsrc, "is-live", FALSE, "format", GST_FORMAT_TIME, "stream-type", 0, "max-bytes", 0, NULL); appsink = gst_element_factory_make("appsink", "app_sink"); if (!appsink) { @@ -131,65 +88,22 @@ bool gst_init_common(struct gst_info *info) { adapter = gst_adapter_new(); pa_assert(adapter); - pipeline = gst_pipeline_new(NULL); - pa_assert(pipeline); + bin = gst_bin_new(NULL); + pa_assert(bin); - bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline)); - gst_bus_set_sync_handler (bus, (GstBusSyncHandler) sync_bus_handler, info, NULL); - gst_object_unref (bus); - - info->app_src = appsrc; info->app_sink = appsink; info->sink_adapter = adapter; - info->pipeline = pipeline; - info->sample_ready_fdsem = pa_fdsem_new(); + info->bin = bin; return true; fail: - if (appsrc) - gst_object_unref(appsrc); if (appsink) gst_object_unref(appsink); return false; } -/* - * The idea of using buffer probes is as follows. We set a buffer probe on the - * encoder sink pad. In the buffer probe, we set an idle probe on the upstream - * source pad. In encode_buffer, we wait on the fdsem. The fdsem gets posted - * when either new_sample or idle probe gets called. We do this, to make the - * appsink behave synchronously. - * - * For buffer probes, see - * https://gstreamer.freedesktop.org/documentation/additional/design/probes.html?gi-language=c - */ -static GstPadProbeReturn gst_sink_buffer_idle_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata) -{ - struct gst_info *info = (struct gst_info *)userdata; - - pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_IDLE); - - pa_fdsem_post(info->sample_ready_fdsem); - - return GST_PAD_PROBE_REMOVE; -} - -static GstPadProbeReturn gst_sink_buffer_probe(GstPad *pad, GstPadProbeInfo *probe_info, gpointer userdata) -{ - struct gst_info *info = (struct gst_info *)userdata; - GstPad *peer_pad; - - pa_assert(probe_info->type & GST_PAD_PROBE_TYPE_BUFFER); - - peer_pad = gst_pad_get_peer(pad); - gst_pad_add_probe(peer_pad, GST_PAD_PROBE_TYPE_IDLE, gst_sink_buffer_idle_probe, info, NULL); - gst_object_unref(peer_pad); - - return GST_PAD_PROBE_OK; -} - static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) { gchar *sample_format; GstCaps *caps; @@ -240,6 +154,10 @@ static GstCaps *gst_create_caps_from_sample_spec(const pa_sample_spec *ss) { bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transcoder) { GstPad *pad; GstCaps *caps; + GstEvent *event; + GstSegment segment; + GstEvent *stream_start; + guint group_id; pa_assert(transcoder); @@ -248,30 +166,49 @@ bool gst_codec_init(struct gst_info *info, bool for_encoding, GstElement *transc if (!gst_init_common(info)) goto common_fail; - caps = gst_create_caps_from_sample_spec(info->ss); - if (for_encoding) - g_object_set(info->app_src, "caps", caps, NULL); - else - g_object_set(info->app_sink, "caps", caps, NULL); - gst_caps_unref(caps); + gst_bin_add_many(GST_BIN(info->bin), transcoder, info->app_sink, NULL); - - gst_bin_add_many(GST_BIN(info->pipeline), info->app_src, transcoder, info->app_sink, NULL); - - if (!gst_element_link_many(info->app_src, transcoder, info->app_sink, NULL)) { + if (!gst_element_link_many(transcoder, info->app_sink, NULL)) { pa_log_error("Failed to link codec elements into pipeline"); goto pipeline_fail; } - if (gst_element_set_state(info->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { + pad = gst_element_get_static_pad(transcoder, "sink"); + pa_assert_se(gst_element_add_pad(info->bin, gst_ghost_pad_new("sink", pad))); + /** + * Only the sink pad is needed to push buffers. Cache it since + * gst_element_get_static_pad is relatively expensive and verbose + * on higher log levels. + */ + info->pad_sink = pad; + + if (gst_element_set_state(info->bin, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { pa_log_error("Could not start pipeline"); goto pipeline_fail; } - /* See the comment on buffer probe functions */ - pad = gst_element_get_static_pad(transcoder, "sink"); - gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, gst_sink_buffer_probe, info, NULL); - gst_object_unref(pad); + /* First, send stream-start sticky event */ + group_id = gst_util_group_id_next(); + stream_start = gst_event_new_stream_start("gst-codec-pa"); + gst_event_set_group_id(stream_start, group_id); + gst_pad_send_event(info->pad_sink, stream_start); + + /* Retrieve the pad that handles the PCM format between PA and GStreamer */ + if (for_encoding) + pad = gst_element_get_static_pad(transcoder, "sink"); + else + pad = gst_element_get_static_pad(transcoder, "src"); + + /* Second, send caps sticky event */ + caps = gst_create_caps_from_sample_spec(info->ss); + pa_assert_se(gst_pad_set_caps(pad, caps)); + gst_caps_unref(caps); + gst_object_unref(GST_OBJECT(pad)); + + /* Third, send segment sticky event */ + gst_segment_init(&segment, GST_FORMAT_TIME); + event = gst_event_new_segment(&segment); + gst_pad_send_event(info->pad_sink, event); pa_log_info("GStreamer pipeline initialisation succeeded"); @@ -299,25 +236,32 @@ size_t gst_transcode_buffer(void *codec_info, const uint8_t *input_buffer, size_ struct gst_info *info = (struct gst_info *) codec_info; gsize available, transcoded; GstBuffer *in_buf; - GstMapInfo map_info; GstFlowReturn ret; size_t written = 0; - in_buf = gst_buffer_new_allocate(NULL, input_size, NULL); + pa_assert(info->pad_sink); + + in_buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY, + (gpointer)input_buffer, input_size, 0, input_size, NULL, NULL); pa_assert(in_buf); + /* Acquire an extra reference to validate refcount afterwards */ + gst_mini_object_ref(GST_MINI_OBJECT_CAST(in_buf)); + pa_assert(GST_MINI_OBJECT_REFCOUNT_VALUE(in_buf) == 2); - pa_assert_se(gst_buffer_map(in_buf, &map_info, GST_MAP_WRITE)); - memcpy(map_info.data, input_buffer, input_size); - gst_buffer_unmap(in_buf, &map_info); + ret = gst_pad_chain(info->pad_sink, in_buf); + /** + * Ensure we're the only one holding a reference to this buffer after gst_pad_chain, + * which internally holds a pointer reference to input_buffer. The caller provides + * no guarantee to the validity of this pointer after returning from this function. + */ + pa_assert(GST_MINI_OBJECT_REFCOUNT_VALUE(in_buf) == 1); + gst_mini_object_unref(GST_MINI_OBJECT_CAST(in_buf)); - ret = gst_app_src_push_buffer(GST_APP_SRC(info->app_src), in_buf); if (ret != GST_FLOW_OK) { pa_log_error("failed to push buffer for transcoding %d", ret); goto fail; } - pa_fdsem_wait(info->sample_ready_fdsem); - available = gst_adapter_available(info->sink_adapter); if (available) { @@ -343,17 +287,16 @@ fail: void gst_codec_deinit(void *codec_info) { struct gst_info *info = (struct gst_info *) codec_info; - if (info->sample_ready_fdsem) - pa_fdsem_free(info->sample_ready_fdsem); - - - if (info->pipeline) { - gst_element_set_state(info->pipeline, GST_STATE_NULL); - gst_object_unref(info->pipeline); + if (info->bin) { + gst_element_set_state(info->bin, GST_STATE_NULL); + gst_object_unref(info->bin); } if (info->sink_adapter) g_object_unref(info->sink_adapter); + if (info->pad_sink) + gst_object_unref(GST_OBJECT(info->pad_sink)); + pa_xfree(info); } diff --git a/src/modules/bluetooth/a2dp-codec-gst.h b/src/modules/bluetooth/a2dp-codec-gst.h index 75a0ad712..dfc784546 100644 --- a/src/modules/bluetooth/a2dp-codec-gst.h +++ b/src/modules/bluetooth/a2dp-codec-gst.h @@ -43,11 +43,12 @@ struct gst_info { const a2dp_ldac_t *ldac_config; } a2dp_codec_t; - GstElement *app_src, *app_sink; - GstElement *pipeline; + /* The appsink element that accumulates encoded/decoded buffers */ + GstElement *app_sink; + GstElement *bin; GstAdapter *sink_adapter; - - pa_fdsem *sample_ready_fdsem; + /* The sink pad to push to-be-encoded/decoded buffers into */ + GstPad *pad_sink; uint16_t seq_num; };