diff --git a/src/modules/rtp/rtp-gstreamer.c b/src/modules/rtp/rtp-gstreamer.c index 0db330958..2fe0b02e1 100644 --- a/src/modules/rtp/rtp-gstreamer.c +++ b/src/modules/rtp/rtp-gstreamer.c @@ -43,6 +43,7 @@ } #define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL) +#define RTP_HEADER_SIZE 12 struct pa_rtp_context { pa_fdsem *fdsem; @@ -53,6 +54,9 @@ struct pa_rtp_context { GstElement *appsink; uint32_t last_timestamp; + + uint8_t *send_buf; + size_t mtu; }; static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) { @@ -171,6 +175,8 @@ pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, con c = pa_xnew0(pa_rtp_context, 1); c->ss = *ss; + c->mtu = mtu - RTP_HEADER_SIZE; + c->send_buf = pa_xmalloc(c->mtu); if (!gst_init_check(NULL, NULL, &error)) { pa_log_error("Could not initialise GStreamer: %s", error->message); @@ -216,18 +222,10 @@ static bool process_bus_messages(pa_rtp_context *c) { return ret; } -static void free_buffer(pa_memblock *memblock) { - pa_memblock_release(memblock); - pa_memblock_unref(memblock); -} - /* Called from I/O thread context */ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { - pa_memchunk chunk = { 0, }; GstBuffer *buf; - void *data; - bool stop = false; - int ret = 0; + size_t n = 0; pa_assert(c); pa_assert(q); @@ -235,40 +233,81 @@ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { if (!process_bus_messages(c)) return -1; - while (!stop && pa_memblockq_peek(q, &chunk) == 0) { - GstClock *clock; - GstClockTime timestamp, clock_time; + /* + * While we check here for atleast MTU worth of data being available in + * memblockq, we might not have exact equivalent to MTU. Hence, we walk + * over the memchunks in memblockq and accumulate MTU bytes next. + */ + if (pa_memblockq_get_length(q) < c->mtu) + return 0; - clock = gst_element_get_clock(c->pipeline); - clock_time = gst_clock_get_time(clock); - gst_object_unref(clock); + for (;;) { + pa_memchunk chunk; + int r; - timestamp = gst_element_get_base_time(c->pipeline); - if (timestamp > clock_time) - timestamp -= clock_time; - else - timestamp = 0; + pa_memchunk_reset(&chunk); - pa_assert(chunk.memblock); + if ((r = pa_memblockq_peek(q, &chunk)) >= 0) { + /* + * Accumulate MTU bytes of data before sending. If the current + * chunk length + accumulated bytes exceeds MTU, we drop bytes + * considered for transfer in this iteration from memblockq. + * + * The remaining bytes will be available in the next iteration, + * as these will be tracked and maintained by memblockq. + */ + size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length; - data = pa_memblock_acquire(chunk.memblock); + pa_assert(chunk.memblock); - buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS, - data, chunk.length, chunk.index, chunk.length, chunk.memblock, - (GDestroyNotify) free_buffer); + memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k); + pa_memblock_release(chunk.memblock); + pa_memblock_unref(chunk.memblock); - GST_BUFFER_PTS(buf) = timestamp; - - if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { - pa_log_error("Could not push buffer"); - stop = true; - ret = -1; + n += k; + pa_memblockq_drop(q, k); } - pa_memblockq_drop(q, chunk.length); + if (r < 0 || n >= c->mtu) { + GstClock *clock; + GstClockTime timestamp, clock_time; + GstMapInfo info; + + if (n > 0) { + clock = gst_element_get_clock(c->pipeline); + clock_time = gst_clock_get_time(clock); + gst_object_unref(clock); + + timestamp = gst_element_get_base_time(c->pipeline); + if (timestamp > clock_time) + timestamp -= clock_time; + else + timestamp = 0; + + buf = gst_buffer_new_allocate(NULL, n, NULL); + pa_assert(buf); + + GST_BUFFER_PTS(buf) = timestamp; + + pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE)); + + memcpy(info.data, c->send_buf, n); + gst_buffer_unmap(buf, &info); + + if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { + pa_log_error("Could not push buffer"); + return -1; + } + } + + if (r < 0 || pa_memblockq_get_length(q) < c->mtu) + break; + + n = 0; + } } - return ret; + return 0; } static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) { @@ -415,6 +454,7 @@ pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample c->fdsem = pa_fdsem_new(); c->ss = *ss; + c->send_buf = NULL; if (!gst_init_check(NULL, NULL, &error)) { pa_log_error("Could not initialise GStreamer: %s", error->message); @@ -537,6 +577,7 @@ void pa_rtp_context_free(pa_rtp_context *c) { if (c->appsrc) { gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc)); gst_object_unref(c->appsrc); + pa_xfree(c->send_buf); } if (c->appsink)