diff --git a/src/examples/audio-src.c b/src/examples/audio-src.c index 0603d7afd..1ed51e821 100644 --- a/src/examples/audio-src.c +++ b/src/examples/audio-src.c @@ -89,18 +89,17 @@ static void fill_f32(struct data *d, void *dest, int avail) } } -static void on_need_buffer(void *userdata) +static void on_process(void *userdata) { struct data *data = userdata; - uint32_t id; + struct pw_buffer *b; struct spa_buffer *buf; uint8_t *p; - id = pw_stream_get_empty_buffer(data->stream); - if (id == SPA_ID_INVALID) + if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) return; - buf = pw_stream_peek_buffer(data->stream, id); + buf = b->buffer; if ((p = buf->datas[0].data) == NULL) return; @@ -109,12 +108,12 @@ static void on_need_buffer(void *userdata) buf->datas[0].chunk->size = buf->datas[0].maxsize; - pw_stream_send_buffer(data->stream, id); + pw_stream_queue_buffer(data->stream, b); } static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, - .need_buffer = on_need_buffer, + .process = on_process, }; static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error) @@ -159,7 +158,8 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo PW_DIRECTION_OUTPUT, NULL, PW_STREAM_FLAG_AUTOCONNECT | - PW_STREAM_FLAG_MAP_BUFFERS, + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, params, 1); break; } diff --git a/src/examples/video-play.c b/src/examples/video-play.c index d8f24f213..02d076c34 100644 --- a/src/examples/video-play.c +++ b/src/examples/video-play.c @@ -87,25 +87,31 @@ static void handle_events(struct data *data) } } -static int -do_render(struct spa_loop *loop, bool async, uint32_t seq, - const void *_data, size_t size, void *user_data) +static void +on_process(void *_data) { - struct data *data = user_data; - struct spa_buffer *buf = ((struct spa_buffer **) _data)[0]; + struct data *data = _data; + struct pw_stream *stream = data->stream; + struct pw_buffer *b; + struct spa_buffer *buf; void *sdata, *ddata; int sstride, dstride, ostride; uint32_t i; uint8_t *src, *dst; + b = pw_stream_dequeue_buffer(stream); + buf = b->buffer; + + pw_log_trace("new buffer %d", buf->id); + handle_events(data); if ((sdata = buf->datas[0].data) == NULL) - return -EINVAL; + return; if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) { fprintf(stderr, "Couldn't lock texture: %s\n", SDL_GetError()); - return -EIO; + return; } sstride = buf->datas[0].chunk->stride; ostride = SPA_MIN(sstride, dstride); @@ -123,25 +129,8 @@ do_render(struct spa_loop *loop, bool async, uint32_t seq, SDL_RenderCopy(data->renderer, data->texture, NULL, NULL); SDL_RenderPresent(data->renderer); - return 0; -} -static void -on_stream_new_buffer(void *_data, uint32_t id) -{ - struct data *data = _data; - struct pw_stream *stream = data->stream; - struct spa_buffer *buf; - - buf = pw_stream_peek_buffer(stream, id); - - pw_log_trace("new buffer %d", id); - - pw_loop_invoke(pw_main_loop_get_loop(data->loop), do_render, - SPA_ID_INVALID, &buf, sizeof(struct spa_buffer *), - true, data); - - pw_stream_recycle_buffer(stream, id); + pw_stream_queue_buffer(stream, b); } static void on_stream_state_changed(void *_data, enum pw_stream_state old, @@ -281,7 +270,7 @@ static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_stream_state_changed, .format_changed = on_stream_format_changed, - .new_buffer = on_stream_new_buffer, + .process = on_process, }; static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error) diff --git a/src/examples/video-src.c b/src/examples/video-src.c index 16a0979c4..d43ec124b 100644 --- a/src/examples/video-src.c +++ b/src/examples/video-src.c @@ -71,7 +71,7 @@ struct data { static void on_timeout(void *userdata, uint64_t expirations) { struct data *data = userdata; - uint32_t id; + struct pw_buffer *b; struct spa_buffer *buf; int i, j; uint8_t *p, *map; @@ -79,13 +79,12 @@ static void on_timeout(void *userdata, uint64_t expirations) pw_log_trace("timeout"); - id = pw_stream_get_empty_buffer(data->stream); - if (id == SPA_ID_INVALID) { + b = pw_stream_dequeue_buffer(data->stream); + if (b == NULL) { pw_log_warn("out of buffers"); return; } - - buf = pw_stream_peek_buffer(data->stream, id); + buf = b->buffer; if (buf->datas[0].type == data->t->data.MemFd || buf->datas[0].type == data->t->data.DmaBuf) { @@ -129,7 +128,7 @@ static void on_timeout(void *userdata, uint64_t expirations) buf->datas[0].chunk->size = buf->datas[0].maxsize; - pw_stream_send_buffer(data->stream, id); + pw_stream_queue_buffer(data->stream, b); } static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum pw_stream_state state, @@ -240,7 +239,7 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo pw_stream_connect(data->stream, PW_DIRECTION_OUTPUT, - NULL, PW_STREAM_FLAG_NONE, + NULL, PW_STREAM_FLAG_DRIVER, params, 1); break; } diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index eb36f957f..b75ff5563 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -21,8 +21,13 @@ #include "config.h" #endif +#include + #include +#include +#include + #include "gstpipewirepool.h" GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category); @@ -40,6 +45,8 @@ enum static guint pool_signals[LAST_SIGNAL] = { 0 }; +static GQuark pool_data_quark; + GstPipeWirePool * gst_pipewire_pool_new (void) { @@ -50,6 +57,74 @@ gst_pipewire_pool_new (void) return pool; } +static void +pool_data_destroy (gpointer user_data) +{ + GstPipeWirePoolData *data = user_data; + + gst_object_unref (data->pool); + g_slice_free (GstPipeWirePoolData, data); +} + +void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) +{ + GstBuffer *buf; + uint32_t i; + GstPipeWirePoolData *data; + struct pw_type *t = pool->t; + + GST_LOG_OBJECT (pool, "wrap buffer"); + + data = g_slice_new (GstPipeWirePoolData); + + buf = gst_buffer_new (); + + for (i = 0; i < b->buffer->n_datas; i++) { + struct spa_data *d = &b->buffer->datas[i]; + GstMemory *gmem = NULL; + + GST_LOG_OBJECT (pool, "wrap buffer %d %d", d->mapoffset, d->maxsize); + if (d->type == t->data.MemFd) { + gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup (d->fd), + d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (gmem, d->mapoffset, d->maxsize); + data->offset = d->mapoffset; + } + else if(d->type == t->data.DmaBuf) { + gmem = gst_dmabuf_allocator_alloc (pool->dmabuf_allocator, dup (d->fd), + d->mapoffset + d->maxsize); + gst_memory_resize (gmem, d->mapoffset, d->maxsize); + data->offset = d->mapoffset; + } + else if (d->type == t->data.MemPtr) { + gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, + d->maxsize, NULL, NULL); + data->offset = 0; + } + if (gmem) + gst_buffer_append_memory (buf, gmem); + } + + data->pool = gst_object_ref (pool); + data->owner = NULL; + data->header = spa_buffer_find_meta (b->buffer, t->meta.Header); + data->flags = GST_BUFFER_FLAGS (buf); + data->b = b; + data->buf = buf; + + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), + pool_data_quark, + data, + pool_data_destroy); + b->user_data = data; +} + +GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer) +{ + return gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), pool_data_quark); +} + +#if 0 gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer) { @@ -78,25 +153,31 @@ gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer) return res; } +#endif static GstFlowReturn acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, GstBufferPoolAcquireParams * params) { GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); + GstPipeWirePoolData *data; + struct pw_buffer *b; GST_OBJECT_LOCK (pool); while (TRUE) { if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool))) goto flushing; - if (p->available.length > 0) + if ((b = pw_stream_dequeue_buffer(p->stream))) break; GST_WARNING ("queue empty"); g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool)); } - *buffer = g_queue_pop_head (&p->available); + + data = b->user_data; + *buffer = data->buf; + GST_OBJECT_UNLOCK (pool); GST_DEBUG ("acquire buffer %p", *buffer); @@ -123,13 +204,7 @@ flush_start (GstBufferPool * pool) static void release_buffer (GstBufferPool * pool, GstBuffer *buffer) { - GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); - GST_DEBUG ("release buffer %p", buffer); - GST_OBJECT_LOCK (pool); - g_queue_push_tail (&p->available, buffer); - g_cond_signal (&p->cond); - GST_OBJECT_UNLOCK (pool); } static gboolean @@ -145,6 +220,8 @@ gst_pipewire_pool_finalize (GObject * object) GstPipeWirePool *pool = GST_PIPEWIRE_POOL (object); GST_DEBUG_OBJECT (pool, "finalize"); + g_object_unref (pool->fd_allocator); + g_object_unref (pool->dmabuf_allocator); G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); } @@ -168,11 +245,14 @@ gst_pipewire_pool_class_init (GstPipeWirePoolClass * klass) GST_DEBUG_CATEGORY_INIT (gst_pipewire_pool_debug_category, "pipewirepool", 0, "debug category for pipewirepool object"); + + pool_data_quark = g_quark_from_static_string ("GstPipeWirePoolDataQuark"); } static void gst_pipewire_pool_init (GstPipeWirePool * pool) { + pool->fd_allocator = gst_fd_allocator_new (); + pool->dmabuf_allocator = gst_dmabuf_allocator_new (); g_cond_init (&pool->cond); - g_queue_init (&pool->available); } diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index 80188f71c..4ed62d79a 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -39,14 +39,29 @@ G_BEGIN_DECLS #define GST_PIPEWIRE_POOL_GET_CLASS(klass) \ (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PIPEWIRE_POOL, GstPipeWirePoolClass)) +typedef struct _GstPipeWirePoolData GstPipeWirePoolData; typedef struct _GstPipeWirePool GstPipeWirePool; typedef struct _GstPipeWirePoolClass GstPipeWirePoolClass; +struct _GstPipeWirePoolData { + GstPipeWirePool *pool; + void *owner; + struct spa_meta_header *header; + guint flags; + goffset offset; + struct pw_buffer *b; + GstBuffer *buf; +}; + struct _GstPipeWirePool { GstBufferPool parent; struct pw_stream *stream; - GQueue available; + struct pw_type *t; + + GstAllocator *fd_allocator; + GstAllocator *dmabuf_allocator; + GCond cond; }; @@ -58,8 +73,12 @@ GType gst_pipewire_pool_get_type (void); GstPipeWirePool * gst_pipewire_pool_new (void); -gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer); -gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer); +void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *buffer); + +GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer); + +//gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer); +//gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer); G_END_DECLS diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 498ef8153..5154e214e 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -37,14 +37,9 @@ #include #include #include -#include - -#include #include "gstpipewireformat.h" -static GQuark process_mem_data_quark; - GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define GST_CAT_DEFAULT pipewire_sink_debug @@ -123,10 +118,8 @@ gst_pipewire_sink_finalize (GObject * object) if (pwsink->properties) gst_structure_free (pwsink->properties); - g_object_unref (pwsink->allocator); g_free (pwsink->path); g_free (pwsink->client_name); - g_hash_table_unref (pwsink->buf_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -219,8 +212,6 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0, "PipeWire Sink"); - - process_mem_data_quark = g_quark_from_static_string ("GstPipeWireSinkProcessMemQuark"); } #define PROP_RANGE(min,max) 2,min,max @@ -273,7 +264,6 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) static void gst_pipewire_sink_init (GstPipeWireSink * sink) { - sink->allocator = gst_fd_allocator_new (); sink->pool = gst_pipewire_pool_new (); sink->client_name = pw_get_client_name(); sink->mode = DEFAULT_PROP_MODE; @@ -281,15 +271,13 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink); - sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - (GDestroyNotify) gst_buffer_unref); - g_queue_init (&sink->queue); sink->loop = pw_loop_new (NULL); sink->main_loop = pw_thread_loop_new (sink->loop, "pipewire-sink-loop"); sink->core = pw_core_new (sink->loop, NULL); sink->type = pw_core_get_type (sink->core); + sink->pool->t = sink->type; GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop); } @@ -407,126 +395,35 @@ gst_pipewire_sink_get_property (GObject * object, guint prop_id, } } -typedef struct { - GstPipeWireSink *sink; - guint id; - struct spa_buffer *buf; - struct spa_meta_header *header; - guint flags; - goffset offset; -} ProcessMemData; - static void -process_mem_data_destroy (gpointer user_data) -{ - ProcessMemData *data = user_data; - - gst_object_unref (data->sink); - g_slice_free (ProcessMemData, data); -} - -static void -on_add_buffer (void *_data, - uint32_t id) +on_add_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSink *pwsink = _data; - struct spa_buffer *b; - GstBuffer *buf; - uint32_t i; - ProcessMemData data; - struct pw_type *t = pwsink->type; - - GST_LOG_OBJECT (pwsink, "add buffer"); - - if (!(b = pw_stream_peek_buffer (pwsink->stream, id))) { - g_warning ("failed to peek buffer"); - return; - } - - buf = gst_buffer_new (); - - data.sink = gst_object_ref (pwsink); - data.id = id; - data.buf = b; - data.header = spa_buffer_find_meta (b, t->meta.Header); - - for (i = 0; i < b->n_datas; i++) { - struct spa_data *d = &b->datas[i]; - GstMemory *gmem = NULL; - - if (d->type == t->data.MemFd || - d->type == t->data.DmaBuf) { - gmem = gst_fd_allocator_alloc (pwsink->allocator, dup (d->fd), - d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (gmem, d->mapoffset, d->maxsize); - data.offset = d->mapoffset; - } - else if (d->type == t->data.MemPtr) { - gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, - d->maxsize, NULL, NULL); - data.offset = 0; - } - if (gmem) - gst_buffer_append_memory (buf, gmem); - } - data.flags = GST_BUFFER_FLAGS (buf); - gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark, - g_slice_dup (ProcessMemData, &data), - process_mem_data_destroy); - - gst_pipewire_pool_add_buffer (pwsink->pool, buf); - g_hash_table_insert (pwsink->buf_ids, GINT_TO_POINTER (id), buf); - + gst_pipewire_pool_wrap_buffer (pwsink->pool, b); pw_thread_loop_signal (pwsink->main_loop, FALSE); } static void -on_remove_buffer (void *data, - uint32_t id) +on_remove_buffer (void *_data, struct pw_buffer *b) { - GstPipeWireSink *pwsink = data; - GstBuffer *buf; + GstPipeWireSink *pwsink = _data; + GstPipeWirePoolData *data = b->user_data; GST_LOG_OBJECT (pwsink, "remove buffer"); - buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id)); - if (buf) { - GST_MINI_OBJECT_CAST (buf)->dispose = NULL; - if (!gst_pipewire_pool_remove_buffer (pwsink->pool, buf)) - gst_buffer_ref (buf); - if (g_queue_remove (&pwsink->queue, buf)) - gst_buffer_unref (buf); - g_hash_table_remove (pwsink->buf_ids, GINT_TO_POINTER (id)); - } -} -static void -on_new_buffer (void *data, - uint32_t id) -{ - GstPipeWireSink *pwsink = data; - GstBuffer *buf; - - GST_LOG_OBJECT (pwsink, "got new buffer %u", id); - if (pwsink->stream == NULL) { - GST_LOG_OBJECT (pwsink, "no stream"); - return; - } - buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id)); - - if (buf) { - gst_buffer_unref (buf); - pw_thread_loop_signal (pwsink->main_loop, FALSE); - } + if (g_queue_remove (&pwsink->queue, data->buf)) + gst_buffer_unref (data->buf); + gst_buffer_unref (data->buf); } static void do_send_buffer (GstPipeWireSink *pwsink) { GstBuffer *buffer; - ProcessMemData *data; + GstPipeWirePoolData *data; gboolean res; guint i; + struct spa_buffer *b; buffer = g_queue_pop_head (&pwsink->queue); if (buffer == NULL) { @@ -534,23 +431,24 @@ do_send_buffer (GstPipeWireSink *pwsink) return; } - data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), - process_mem_data_quark); + data = gst_pipewire_pool_get_data(buffer); + + b = data->b->buffer; if (data->header) { data->header->seq = GST_BUFFER_OFFSET (buffer); data->header->pts = GST_BUFFER_PTS (buffer); data->header->dts_offset = GST_BUFFER_DTS (buffer); } - for (i = 0; i < data->buf->n_datas; i++) { - struct spa_data *d = &data->buf->datas[i]; + for (i = 0; i < b->n_datas; i++) { + struct spa_data *d = &b->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buffer, i); d->chunk->offset = mem->offset - data->offset; d->chunk->size = mem->size; } - if (!(res = pw_stream_send_buffer (pwsink->stream, data->id))) { - g_warning ("can't send buffer"); + if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) { + g_warning ("can't send buffer %s", spa_strerror(res)); pw_thread_loop_signal (pwsink->main_loop, FALSE); } else pwsink->need_ready--; @@ -558,9 +456,17 @@ do_send_buffer (GstPipeWireSink *pwsink) static void -on_need_buffer (void *data) +on_process (void *data) { GstPipeWireSink *pwsink = data; + + if (pwsink->stream == NULL) { + GST_LOG_OBJECT (pwsink, "no stream"); + return; + } + + g_cond_signal (&pwsink->pool->cond); + pwsink->need_ready++; GST_DEBUG ("need buffer %u", pwsink->need_ready); do_send_buffer (pwsink); @@ -733,8 +639,7 @@ static const struct pw_stream_events stream_events = { .format_changed = on_format_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, - .new_buffer = on_new_buffer, - .need_buffer = on_need_buffer, + .process = on_process, }; static gboolean diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index c746cf358..5ff61f2e4 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -89,12 +89,10 @@ struct _GstPipeWireSink { struct pw_stream *stream; struct spa_hook stream_listener; - GstAllocator *allocator; GstStructure *properties; GstPipeWireSinkMode mode; GstPipeWirePool *pool; - GHashTable *buf_ids; GQueue queue; guint need_ready; }; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 86770f1a0..8f877a9a2 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -211,13 +211,10 @@ gst_pipewire_src_finalize (GObject * object) if (pwsrc->properties) gst_structure_free (pwsrc->properties); - g_object_unref (pwsrc->fd_allocator); - g_object_unref (pwsrc->dmabuf_allocator); if (pwsrc->clock) gst_object_unref (pwsrc->clock); g_free (pwsrc->path); g_free (pwsrc->client_name); - g_hash_table_unref (pwsrc->buf_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -322,163 +319,94 @@ gst_pipewire_src_init (GstPipeWireSrc * src) g_queue_init (&src->queue); - src->fd_allocator = gst_fd_allocator_new (); - src->dmabuf_allocator = gst_dmabuf_allocator_new (); src->client_name = pw_get_client_name (); - src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref); + src->pool = gst_pipewire_pool_new (); src->loop = pw_loop_new (NULL); src->main_loop = pw_thread_loop_new (src->loop, "pipewire-main-loop"); src->core = pw_core_new (src->loop, NULL); src->type = pw_core_get_type (src->core); + src->pool->t = src->type; GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop); } -typedef struct { - GstPipeWireSrc *src; - guint id; - struct spa_buffer *buf; - struct spa_meta_header *header; - guint flags; - goffset offset; -} ProcessMemData; - -static void -process_mem_data_destroy (gpointer user_data) -{ - ProcessMemData *data = user_data; - - gst_object_unref (data->src); - g_slice_free (ProcessMemData, data); -} - static gboolean buffer_recycle (GstMiniObject *obj) { - ProcessMemData *data; GstPipeWireSrc *src; + GstPipeWirePoolData *data; gst_mini_object_ref (obj); - data = gst_mini_object_get_qdata (obj, - process_mem_data_quark); + data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + GST_BUFFER_FLAGS (obj) = data->flags; - src = data->src; + src = data->owner; GST_LOG_OBJECT (obj, "recycle buffer"); pw_thread_loop_lock (src->main_loop); - pw_stream_recycle_buffer (src->stream, data->id); + pw_stream_queue_buffer (src->stream, data->b); pw_thread_loop_unlock (src->main_loop); return FALSE; } static void -on_add_buffer (void *_data, guint id) +on_add_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSrc *pwsrc = _data; - struct spa_buffer *b; - GstBuffer *buf; - uint32_t i; - ProcessMemData data; - struct pw_core *core = pwsrc->core; - struct pw_type *t = pw_core_get_type(core); + GstPipeWirePoolData *data; GST_LOG_OBJECT (pwsrc, "add buffer"); - - if (!(b = pw_stream_peek_buffer (pwsrc->stream, id))) { - g_warning ("failed to peek buffer"); - return; - } - - buf = gst_buffer_new (); - GST_MINI_OBJECT_CAST (buf)->dispose = buffer_recycle; - - data.src = gst_object_ref (pwsrc); - data.id = id; - data.buf = b; - data.header = spa_buffer_find_meta (b, t->meta.Header); - - for (i = 0; i < b->n_datas; i++) { - struct spa_data *d = &b->datas[i]; - GstMemory *gmem = NULL; - - if (d->type == t->data.MemFd) { - gmem = gst_fd_allocator_alloc (pwsrc->fd_allocator, dup (d->fd), - d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (gmem, d->mapoffset, d->maxsize); - data.offset = d->mapoffset; - } - else if(d->type == t->data.DmaBuf) { - gmem = gst_dmabuf_allocator_alloc (pwsrc->dmabuf_allocator, dup (d->fd), - d->mapoffset + d->maxsize); - gst_memory_resize (gmem, d->mapoffset, d->maxsize); - data.offset = d->mapoffset; - } - else if (d->type == t->data.MemPtr) { - gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, - d->maxsize, NULL, NULL); - data.offset = 0; - } - if (gmem) - gst_buffer_append_memory (buf, gmem); - } - data.flags = GST_BUFFER_FLAGS (buf); - gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark, - g_slice_dup (ProcessMemData, &data), - process_mem_data_destroy); - - g_hash_table_insert (pwsrc->buf_ids, GINT_TO_POINTER (id), buf); + gst_pipewire_pool_wrap_buffer (pwsrc->pool, b); + data = b->user_data; + data->owner = pwsrc; + GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; } static void -on_remove_buffer (void *data, - guint id) -{ - GstPipeWireSrc *pwsrc = data; - GstBuffer *buf; - - GST_LOG_OBJECT (pwsrc, "remove buffer"); - buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id)); - if (buf) { - GList *walk; - - GST_MINI_OBJECT_CAST (buf)->dispose = NULL; - - walk = pwsrc->queue.head; - while (walk) { - GList *next = walk->next; - - if (walk->data == buf) { - gst_buffer_unref (buf); - g_queue_delete_link (&pwsrc->queue, walk); - } - walk = next; - } - g_hash_table_remove (pwsrc->buf_ids, GINT_TO_POINTER (id)); - } -} - -static void -on_new_buffer (void *_data, - guint id) +on_remove_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSrc *pwsrc = _data; + GstPipeWirePoolData *data = b->user_data; + GstBuffer *buf = data->buf; + GList *walk; + + GST_LOG_OBJECT (pwsrc, "remove buffer"); + + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + + walk = pwsrc->queue.head; + while (walk) { + GList *next = walk->next; + + if (walk->data == buf) { + gst_buffer_unref (buf); + g_queue_delete_link (&pwsrc->queue, walk); + } + walk = next; + } +} + +static void +on_process (void *_data) +{ + GstPipeWireSrc *pwsrc = _data; + struct pw_buffer *b; GstBuffer *buf; - ProcessMemData *data; + GstPipeWirePoolData *data; struct spa_meta_header *h; guint i; - buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id)); - if (buf == NULL) { - g_warning ("unknown buffer %d", id); - return; - } + b = pw_stream_dequeue_buffer (pwsrc->stream); + if (b == NULL) + return; + + data = b->user_data; + buf = data->buf; + GST_LOG_OBJECT (pwsrc, "got new buffer %p", buf); - data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark); h = data->header; if (h) { GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset); @@ -490,8 +418,8 @@ on_new_buffer (void *_data, } GST_BUFFER_OFFSET (buf) = h->seq; } - for (i = 0; i < data->buf->n_datas; i++) { - struct spa_data *d = &data->buf->datas[i]; + for (i = 0; i < b->buffer->n_datas; i++) { + struct spa_data *d = &b->buffer->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buf, i); mem->offset = SPA_MIN(d->chunk->offset, d->maxsize); mem->size = SPA_MIN(d->chunk->size, d->maxsize - mem->offset); @@ -1041,7 +969,7 @@ static const struct pw_stream_events stream_events = { .format_changed = on_format_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, - .new_buffer = on_new_buffer, + .process = on_process, }; static gboolean diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index 5769c02dc..c8f718b91 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -24,6 +24,7 @@ #include #include +#include G_BEGIN_DECLS @@ -76,11 +77,9 @@ struct _GstPipeWireSrc { struct pw_stream *stream; struct spa_hook stream_listener; - GstAllocator *fd_allocator; - GstAllocator *dmabuf_allocator; GstStructure *properties; - GHashTable *buf_ids; + GstPipeWirePool *pool; GQueue queue; GstClock *clock; }; diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 50746d3c0..5c0f008ec 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -50,12 +51,15 @@ static inline void init_type(struct type *type, struct spa_type_map *map) } struct buffer { - struct spa_list link; + struct pw_buffer this; uint32_t id; -#define BUFFER_FLAG_READY (1 << 0) -#define BUFFER_FLAG_MAPPED (1 << 1) +#define BUFFER_FLAG_MAPPED (1 << 0) uint32_t flags; - struct spa_buffer *buffer; +}; + +struct queue { + uint32_t ids[MAX_BUFFERS]; + struct spa_ringbuffer ring; }; struct stream { @@ -81,11 +85,9 @@ struct stream { struct buffer buffers[MAX_BUFFERS]; int n_buffers; - struct spa_list free; - struct spa_list ready; - bool in_need_buffer; - bool in_new_buffer; + struct queue dequeued; + struct queue queued; uint32_t n_init_params; struct spa_pod **init_params; @@ -104,34 +106,27 @@ struct stream { int64_t last_monotonic; }; -static inline void queue_free(struct stream *stream, struct buffer *buffer) +static inline void push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer) { - spa_list_append(&stream->free, &buffer->link); + uint32_t index; + + spa_ringbuffer_get_write_index(&queue->ring, &index); + queue->ids[index & (MAX_BUFFERS-1)] = buffer->id; + spa_ringbuffer_write_update(&queue->ring, index + 1); } -static inline struct buffer *dequeue_free(struct stream *stream) +static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue) { - struct buffer *b = NULL; - if (!spa_list_is_empty(&stream->free)) { - b = spa_list_first(&stream->free, struct buffer, link); - spa_list_remove(&b->link); - } - return b; -} + int32_t avail; + uint32_t index, id; -static inline void queue_ready(struct stream *stream, struct buffer *buffer) -{ - spa_list_append(&stream->ready, &buffer->link); -} + if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) <= 0) + return NULL; -static inline struct buffer *dequeue_ready(struct stream *stream) -{ - struct buffer *b = NULL; - if (!spa_list_is_empty(&stream->ready)) { - b = spa_list_first(&stream->ready, struct buffer, link); - spa_list_remove(&b->link); - } - return b; + id = queue->ids[index & (MAX_BUFFERS-1)]; + spa_ringbuffer_read_update(&queue->ring, index + 1); + + return &stream->buffers[id]; } static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error) @@ -162,6 +157,27 @@ static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id) return NULL; } +static int +do_call_process(struct spa_loop *loop, + bool async, uint32_t seq, const void *data, size_t size, void *user_data) +{ + struct stream *impl = user_data; + struct pw_stream *stream = &impl->this; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, process); + return 0; +} + +static void call_process(struct stream *impl) +{ + if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) { + do_call_process(NULL, false, 1, NULL, 0, impl); + } + else { + pw_loop_invoke(impl->core->main_loop, + do_call_process, 1, NULL, 0, false, impl); + } +} + static int impl_send_command(struct spa_node *node, const struct spa_command *command) { struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); @@ -181,10 +197,7 @@ static int impl_send_command(struct spa_node *node, const struct spa_command *co impl->io->status = SPA_STATUS_NEED_BUFFER; } else { - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - need_buffer); - impl->in_need_buffer = false; + call_process(impl); } stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); } @@ -375,45 +388,111 @@ static int impl_port_set_param(struct spa_node *node, return -ENOENT; } -static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, - struct spa_buffer **buffers, uint32_t n_buffers) +static int map_data(struct stream *impl, struct spa_data *data, int prot) { - struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); - struct pw_stream *stream = &d->this; - struct pw_type *t = d->t; - int i, prot; + void *ptr; + struct pw_map_range range; + + pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize); + + ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset); + if (ptr == MAP_FAILED) { + pw_log_error("stream %p: failed to mmap buffer mem: %m", impl); + return -errno; + } + data->data = SPA_MEMBER(ptr, range.start, void); + pw_log_debug("stream %p: fd %d mapped %d %d %p", impl, data->fd, + range.offset, range.size, data->data); + + return 0; +} + +static int unmap_data(struct stream *impl, struct spa_data *data) +{ + struct pw_map_range range; + + pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize); + + if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0) + pw_log_warn("failed to unmap: %m"); + + pw_log_debug("stream %p: fd %d unmapped", impl, data->fd); + return 0; +} + +static void clear_buffers(struct pw_stream *stream) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + int i, j; + + pw_log_debug("stream %p: clear buffers", stream); + + for (i = 0; i < impl->n_buffers; i++) { + struct buffer *b = &impl->buffers[i]; + + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + remove_buffer, &b->this); + + if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_MAPPED)) { + for (j = 0; j < b->this.buffer->n_datas; j++) { + struct spa_data *d = &b->this.buffer->datas[j]; + pw_log_debug("stream %p: clear buffer %d mem", + stream, b->id); + unmap_data(impl, d); + } + } + } + impl->n_buffers = 0; + spa_ringbuffer_init(&impl->dequeued.ring); + spa_ringbuffer_init(&impl->queued.ring); +} + +static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id, + struct spa_buffer **buffers, uint32_t n_buffers) +{ + struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_stream *stream = &impl->this; + struct pw_type *t = impl->t; + int i, j, prot, res; prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0); + clear_buffers(stream); + for (i = 0; i < n_buffers; i++) { - struct buffer *b = &d->buffers[i]; - struct spa_data *datas = buffers[i]->datas; + struct buffer *b = &impl->buffers[i]; + int size = 0; b->flags = 0; b->id = buffers[i]->id; - if (datas[0].type == t->data.MemFd || - datas[0].type == t->data.DmaBuf) { - void *ptr; - ptr = mmap(NULL, datas[0].maxsize + datas[0].mapoffset, prot, - MAP_SHARED, datas[0].fd, 0); - if (ptr == MAP_FAILED) { - pw_log_error("failed to mmap buffer mem"); - return -errno; - + if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS)) { + for (j = 0; j < buffers[i]->n_datas; j++) { + struct spa_data *d = &buffers[i]->datas[j]; + if (d->type == t->data.MemFd || + d->type == t->data.DmaBuf) { + if ((res = map_data(impl, d, prot)) < 0) + return res; + } + else if (d->data == NULL) { + pw_log_error("invalid buffer mem"); + return -EINVAL; + } + size += d->maxsize; } - datas[0].data = SPA_MEMBER(ptr, datas[0].mapoffset, void); SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED); } - else if (datas[0].data == NULL) { - pw_log_error("invalid buffer mem"); - return -EINVAL; - } - b->buffer = buffers[i]; - pw_log_info("got buffer %d size %d", i, datas[0].maxsize); - queue_free(d, b); + b->this.buffer = buffers[i]; + pw_log_info("got buffer %d %d datas, total size %d", i, + buffers[i]->n_datas, size); + + if (impl->direction == SPA_DIRECTION_OUTPUT) + push_queue(impl, &impl->dequeued, b); + + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + add_buffer, &b->this); } - d->n_buffers = n_buffers; + impl->n_buffers = n_buffers; if (n_buffers > 0) stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL); @@ -423,71 +502,63 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc return 0; } -static inline void reuse_buffer(struct stream *d, uint32_t id) -{ - pw_log_trace("export-source %p: recycle buffer %d", d, id); - if (id < d->n_buffers) - queue_free(d, &d->buffers[id]); -} - static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id) { struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node); - reuse_buffer(d, buffer_id); + pw_log_trace("export-source %p: recycle buffer %d", d, buffer_id); + if (buffer_id < d->n_buffers) + push_queue(d, &d->queued, &d->buffers[buffer_id]); return 0; } -static int impl_node_process(struct spa_node *node) +static int impl_node_process_input(struct spa_node *node) { struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); struct pw_stream *stream = &impl->this; struct spa_io_buffers *io = impl->io; struct buffer *b; - uint32_t buffer_id; - int res; - if (impl->direction == SPA_DIRECTION_INPUT) { - buffer_id = io->buffer_id; + pw_log_trace("stream %p: process input %d %d", stream, io->status, io->buffer_id); - pw_log_trace("stream %p: process input %d %d", stream, io->status, - buffer_id); + if (io->status != SPA_STATUS_HAVE_BUFFER) + goto done; - if ((b = find_buffer(stream, buffer_id)) == NULL) - return SPA_STATUS_NEED_BUFFER; + if ((b = find_buffer(stream, io->buffer_id)) == NULL) + goto done; - queue_ready(impl, b); + push_queue(impl, &impl->dequeued, b); + call_process(impl); - if (impl->client_reuse) - io->buffer_id = SPA_ID_INVALID; - - if (io->status == SPA_STATUS_HAVE_BUFFER) { - impl->in_new_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - new_buffer, buffer_id); - impl->in_new_buffer = false; - } - io->status = SPA_STATUS_NEED_BUFFER; - res = SPA_STATUS_NEED_BUFFER; - } else { - reuse_buffer(impl, io->buffer_id); + if (impl->client_reuse) io->buffer_id = SPA_ID_INVALID; - pw_log_trace("stream %p: process output", stream); + done: + io->status = SPA_STATUS_NEED_BUFFER; + return SPA_STATUS_HAVE_BUFFER; +} - if ((b = dequeue_ready(impl)) == NULL) { - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - need_buffer); - impl->in_need_buffer = false; - b = dequeue_ready(impl); - } - if (b != NULL) { - io->buffer_id = b->id; - io->status = SPA_STATUS_HAVE_BUFFER; - } - res = SPA_STATUS_HAVE_BUFFER; +static int impl_node_process_output(struct spa_node *node) +{ + struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); + struct pw_stream *stream = &impl->this; + struct spa_io_buffers *io = impl->io; + struct buffer *b; + + pw_log_trace("stream %p: process out %d %d", stream, io->status, io->buffer_id); + + if ((b = find_buffer(stream, io->buffer_id)) != NULL) + push_queue(impl, &impl->dequeued, b); + + if ((b = pop_queue(impl, &impl->queued)) != NULL) { + io->buffer_id = b->id; + io->status = SPA_STATUS_HAVE_BUFFER; + } else { + io->buffer_id = SPA_ID_INVALID; + io->status = SPA_STATUS_NEED_BUFFER; } - return res; + call_process(impl); + + return SPA_STATUS_HAVE_BUFFER; } static const struct spa_node impl_node = { @@ -502,7 +573,6 @@ static const struct spa_node impl_node = { .port_set_param = impl_port_set_param, .port_use_buffers = impl_port_use_buffers, .port_reuse_buffer = impl_port_reuse_buffer, - .process = impl_node_process, }; #if 0 @@ -565,8 +635,8 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name, str = pw_properties_get(props, "pipewire.client.reuse"); impl->client_reuse = str && pw_properties_parse_bool(str); - spa_list_init(&impl->free); - spa_list_init(&impl->ready); + spa_ringbuffer_init(&impl->dequeued.ring); + spa_ringbuffer_init(&impl->queued.ring); spa_hook_list_init(&this->listener_list); @@ -760,6 +830,12 @@ pw_stream_connect(struct pw_stream *stream, impl->node = pw_node_new(impl->core, "export-source", pw_properties_copy(stream->properties), 0); impl->impl_node = impl_node; + + if (impl->direction == SPA_DIRECTION_INPUT) + impl->impl_node.process = impl_node_process_input; + else + impl->impl_node.process = impl_node_process_output; + pw_node_set_implementation(impl->node, &impl->impl_node); pw_node_register(impl->node, NULL, NULL, NULL); @@ -825,41 +901,18 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) return 0; } -uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer *b; - if ((b = dequeue_free(impl)) == NULL) - return SPA_ID_INVALID; - return b->id; -} - -int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) +struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - if ((b = find_buffer(stream, id)) == NULL) - return -EINVAL; - - queue_free(impl, b); - - if (impl->in_new_buffer) { - struct spa_io_buffers *io = impl->io; - io->buffer_id = id; + if ((b = pop_queue(impl, &impl->dequeued)) == NULL) { + pw_log_trace("stream %p: no more buffers", stream); + return NULL; } - return 0; -} + pw_log_trace("stream %p: dequeue buffer %d", stream, b->id); -struct spa_buffer * -pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id) -{ - struct buffer *b; - - if ((b = find_buffer(stream, id))) - return b->buffer; - - return NULL; + return &b->this; } static int @@ -871,21 +924,20 @@ do_process(struct spa_loop *loop, return 0; } -int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) +int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); struct buffer *b; - struct spa_io_buffers *io = impl->io; - if ((b = find_buffer(stream, id))) { - queue_ready(impl, b); - pw_log_trace("stream %p: send buffer %d %p", stream, id, io); + if ((b = find_buffer(stream, buffer->buffer->id)) == NULL) + return -EINVAL; - if (!impl->in_need_buffer) - pw_loop_invoke(impl->core->data_loop, - do_process, 1, NULL, 0, false, impl); - } else { - pw_log_debug("stream %p: output %u was used", stream, id); + pw_log_trace("stream %p: queue buffer %d", stream, b->id); + push_queue(impl, &impl->queued, b); + + if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) { + pw_loop_invoke(impl->core->data_loop, + do_process, 1, NULL, 0, false, impl); } return 0; } diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index fda1a9616..de491a17d 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -59,18 +59,6 @@ extern "C" { * The stream is initially unconnected. To connect the stream, use * \ref pw_stream_connect(). Pass the desired direction as an argument. * - * \subsection ssec_stream_mode Stream modes - * - * The stream mode specifies how the data will be exchanged with PipeWire. - * The following stream modes are available - * - * \li \ref PW_STREAM_MODE_BUFFER: data is exchanged with fixed size - * buffers. This is ideal for video frames or equal sized audio - * frames. - * \li \ref PW_STREAM_MODE_RINGBUFFER: data is exhanged with a fixed - * size ringbuffer. This is ideal for variable sized audio packets - * or compressed media. - * * \subsection ssec_stream_target Stream target * * To make the newly connected stream automatically connect to an existing @@ -107,7 +95,8 @@ extern "C" { * between client and server. * * With the add_buffer event, a stream will be notified of a new buffer - * that can be used for data transport. + * that can be used for data transport. You can attach user_data to these + * buffers. * * Afer the buffers are negotiated, the stream will transition to the * \ref PW_STREAM_STATE_PAUSED state. @@ -124,28 +113,23 @@ extern "C" { * * \subsection ssec_consume Consume data * - * The new_buffer event is emited for each new buffer can can be + * The process event is emited for each new buffer that can can be * consumed. * - * \ref pw_stream_peek_buffer() should be used to get the data and metadata - * of the buffer. + * \ref pw_stream_dequeue_buffer() should be used to get the data and + * metadata of the buffer. * - * When the buffer is no longer in use, call \ref pw_stream_recycle_buffer() + * When the buffer is no longer in use, call \ref pw_stream_queue_buffer() * to let PipeWire reuse the buffer. * * \subsection ssec_produce Produce data * - * The need_buffer event is emited when PipeWire needs a new buffer for this - * stream. + * \ref pw_stream_dequeue_buffer() gives an empty buffer that can be filled. * - * \ref pw_stream_get_empty_buffer() gives the id of an empty buffer. - * Use \ref pw_stream_peek_buffer() to get the data and metadata that should - * be filled. + * Filled buffers should be queued with \ref pw_stream_queue_buffer(). * - * To send the filled buffer, use \ref pw_stream_send_buffer(). - * - * The new_buffer event is emited when PipeWire no longer uses the buffer - * and it can be safely reused. + * The process event is emited when PipeWire has emptied a buffer that + * can now be refilled. * * \section sec_stream_disconnect Disconnect * @@ -179,6 +163,11 @@ enum pw_stream_state { PW_STREAM_STATE_STREAMING = 5 /**< streaming */ }; +struct pw_buffer { + struct spa_buffer *buffer; /* the spa buffer */ + void *user_data; /* user data attached to the buffer */ +}; + /** Events for a stream */ struct pw_stream_events { #define PW_VERSION_STREAM_EVENTS 0 @@ -194,14 +183,15 @@ struct pw_stream_events { void (*format_changed) (void *data, struct spa_pod *format); /** when a new buffer was created for this stream */ - void (*add_buffer) (void *data, uint32_t id); + void (*add_buffer) (void *data, struct pw_buffer *buffer); /** when a buffer was destroyed for this stream */ - void (*remove_buffer) (void *data, uint32_t id); - /** when a buffer can be reused (for playback streams) or - * is filled (for capture streams */ - void (*new_buffer) (void *data, uint32_t id); - /** when a buffer is needed (for playback streams) */ - void (*need_buffer) (void *data); + void (*remove_buffer) (void *data, struct pw_buffer *buffer); + + /** when a buffer can be queued (for playback streams) or + * dequeued (for capture streams). This is normally called from the + * mainloop but can also be called directly from the realtime data + * thread if the user is prepared to deal with this. */ + void (*process) (void *data); }; /** Convert a stream state to a readable string \memberof pw_stream */ @@ -216,6 +206,8 @@ enum pw_stream_flags { * this stream */ PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */ PW_STREAM_FLAG_MAP_BUFFERS = (1 << 3), /**< mmap the buffers */ + PW_STREAM_FLAG_DRIVER = (1 << 4), /**< be a driver */ + PW_STREAM_FLAG_RT_PROCESS = (1 << 5), /**< call process from the realtime thread */ }; /** A time structure \memberof pw_stream */ @@ -256,9 +248,8 @@ const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream); /** Connect a stream for input or output on \a port_path. \memberof pw_stream * \return 0 on success < 0 on error. * - * When \a mode is \ref PW_STREAM_MODE_BUFFER, you should connect to the new-buffer - * event and use pw_stream_peek_buffer() to get the latest metadata and - * data. */ + * You should connect to the process event and use pw_stream_dequeue_buffer() + * to get the latest metadata and data. */ int pw_stream_connect(struct pw_stream *stream, /**< a \ref pw_stream */ enum pw_direction direction, /**< the stream direction */ @@ -298,29 +289,12 @@ int pw_stream_set_active(struct pw_stream *stream, bool active); /** Query the time on the stream \memberof pw_stream */ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time); -/** Get the id of an empty buffer that can be filled \memberof pw_stream - * \return the id of an empty buffer or \ref SPA_ID_INVALID when no buffer is - * available. */ -uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream); +/** Get a buffer that can be filled for playback streams or consumed + * for capture streams. */ +struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream); -/** Recycle the buffer with \a id \memberof pw_stream - * \return 0 on success, < 0 when \a id is invalid or not a used buffer - * Let the PipeWire server know that it can reuse the buffer with \a id. */ -int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id); - -/** Get the buffer with \a id from \a stream \memberof pw_stream - * \return a \ref spa_buffer or NULL when there is no buffer - * - * This function should be called from the new-buffer event. */ -struct spa_buffer * -pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id); - -/** Send a buffer with \a id to \a stream \memberof pw_stream - * \return 0 when \a id was handled, < 0 on error - * - * For provider or playback streams, this function should be called whenever - * there is a new buffer available. */ -int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id); +/** Submit a buffer for playback or recycle a buffer for capture. */ +int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer); #ifdef __cplusplus }