diff --git a/src/examples/video-play.c b/src/examples/video-play.c index bd7920da7..17c7f1433 100644 --- a/src/examples/video-play.c +++ b/src/examples/video-play.c @@ -139,19 +139,19 @@ do_render(struct spa_loop *loop, bool async, uint32_t seq, } static void -on_stream_new_buffer(void *_data, uint32_t id) +on_stream_process(void *_data) { struct data *data = _data; struct pw_stream *stream = data->stream; - struct spa_buffer *buf; + struct pw_buffer *buf; - buf = pw_stream_peek_buffer(stream, id); + buf = pw_stream_dequeue_buffer(stream); pw_loop_invoke(pw_main_loop_get_loop(data->loop), do_render, - SPA_ID_INVALID, &buf, sizeof(struct spa_buffer *), + SPA_ID_INVALID, &buf->buffer, sizeof(struct spa_buffer *), true, data); - pw_stream_recycle_buffer(stream, id); + pw_stream_queue_buffer(stream, buf); } static void on_stream_state_changed(void *_data, enum pw_stream_state old, @@ -239,14 +239,14 @@ static Uint32 id_to_sdl_format(struct data *data, uint32_t id) } static void -on_stream_format_changed(void *_data, struct spa_pod *format) +on_stream_format_changed(void *_data, const struct spa_pod *format) { struct data *data = _data; struct pw_stream *stream = data->stream; struct pw_type *t = data->t; uint8_t params_buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer)); - struct spa_pod *params[2]; + const struct spa_pod *params[2]; Uint32 sdl_format; void *d; @@ -291,7 +291,7 @@ static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_stream_state_changed, .format_changed = on_stream_format_changed, - .new_buffer = on_stream_new_buffer, + .process = on_stream_process, }; static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error) diff --git a/src/examples/video-src.c b/src/examples/video-src.c index e440d5f9c..7a4415cec 100644 --- a/src/examples/video-src.c +++ b/src/examples/video-src.c @@ -71,35 +71,35 @@ struct data { static void on_timeout(void *userdata, uint64_t expirations) { struct data *data = userdata; - uint32_t id; - struct spa_buffer *buf; int i, j; uint8_t *p, *map; struct spa_meta_header *h; + struct pw_buffer *buf; + struct spa_buffer *b; - id = pw_stream_get_empty_buffer(data->stream); - if (id == SPA_ID_INVALID) + buf = pw_stream_dequeue_buffer(data->stream); + if (buf == NULL) return; - buf = pw_stream_peek_buffer(data->stream, id); + b = buf->buffer; - if (buf->datas[0].type == data->t->data.MemFd || - buf->datas[0].type == data->t->data.DmaBuf) { + if (b->datas[0].type == data->t->data.MemFd || + b->datas[0].type == data->t->data.DmaBuf) { map = - mmap(NULL, buf->datas[0].maxsize + buf->datas[0].mapoffset, - PROT_READ | PROT_WRITE, MAP_SHARED, buf->datas[0].fd, 0); + mmap(NULL, b->datas[0].maxsize + b->datas[0].mapoffset, + PROT_READ | PROT_WRITE, MAP_SHARED, b->datas[0].fd, 0); if (map == MAP_FAILED) { printf("failed to mmap: %s\n", strerror(errno)); return; } - p = SPA_MEMBER(map, buf->datas[0].mapoffset, uint8_t); - } else if (buf->datas[0].type == data->t->data.MemPtr) { + p = SPA_MEMBER(map, b->datas[0].mapoffset, uint8_t); + } else if (b->datas[0].type == data->t->data.MemPtr) { map = NULL; - p = buf->datas[0].data; + p = b->datas[0].data; } else return; - if ((h = spa_buffer_find_meta(buf, data->t->meta.Header))) { + if ((h = spa_buffer_find_meta(b, data->t->meta.Header))) { #if 0 struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); @@ -116,16 +116,16 @@ static void on_timeout(void *userdata, uint64_t expirations) for (j = 0; j < data->format.size.width * BPP; j++) { p[j] = data->counter + j * i; } - p += buf->datas[0].chunk->stride; + p += b->datas[0].chunk->stride; data->counter += 13; } if (map) - munmap(map, buf->datas[0].maxsize + buf->datas[0].mapoffset); + munmap(map, b->datas[0].maxsize + b->datas[0].mapoffset); - buf->datas[0].chunk->size = buf->datas[0].maxsize; + b->datas[0].chunk->size = b->datas[0].maxsize; - pw_stream_send_buffer(data->stream, id); + pw_stream_queue_buffer(data->stream, buf); } static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum pw_stream_state state, @@ -157,14 +157,14 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum } static void -on_stream_format_changed(void *_data, struct spa_pod *format) +on_stream_format_changed(void *_data, const struct spa_pod *format) { struct data *data = _data; struct pw_stream *stream = data->stream; struct pw_type *t = data->t; uint8_t params_buffer[1024]; struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer)); - struct spa_pod *params[2]; + const struct spa_pod *params[2]; if (format == NULL) { pw_stream_finish_format(stream, 0, NULL, 0); diff --git a/src/gst/gstpipewireclock.c b/src/gst/gstpipewireclock.c index a82631efe..4c4e47889 100644 --- a/src/gst/gstpipewireclock.c +++ b/src/gst/gstpipewireclock.c @@ -50,12 +50,12 @@ gst_pipewire_clock_get_internal_time (GstClock * clock) pw_stream_get_time (pclock->stream, &t); - if (t.rate) - result = gst_util_uint64_scale_int (t.ticks, GST_SECOND, t.rate); + if (t.rate.denom) + result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.num, t.rate.denom); else result = GST_CLOCK_TIME_NONE; - GST_DEBUG ("%"PRId64", %d %"PRId64, t.ticks, t.rate, result); + GST_DEBUG ("%"PRId64", %d %"PRId64, t.ticks, t.rate.denom, result); return result; } diff --git a/src/gst/gstpipewirepool.c b/src/gst/gstpipewirepool.c index eb36f957f..b75ff5563 100644 --- a/src/gst/gstpipewirepool.c +++ b/src/gst/gstpipewirepool.c @@ -21,8 +21,13 @@ #include "config.h" #endif +#include + #include +#include +#include + #include "gstpipewirepool.h" GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category); @@ -40,6 +45,8 @@ enum static guint pool_signals[LAST_SIGNAL] = { 0 }; +static GQuark pool_data_quark; + GstPipeWirePool * gst_pipewire_pool_new (void) { @@ -50,6 +57,74 @@ gst_pipewire_pool_new (void) return pool; } +static void +pool_data_destroy (gpointer user_data) +{ + GstPipeWirePoolData *data = user_data; + + gst_object_unref (data->pool); + g_slice_free (GstPipeWirePoolData, data); +} + +void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b) +{ + GstBuffer *buf; + uint32_t i; + GstPipeWirePoolData *data; + struct pw_type *t = pool->t; + + GST_LOG_OBJECT (pool, "wrap buffer"); + + data = g_slice_new (GstPipeWirePoolData); + + buf = gst_buffer_new (); + + for (i = 0; i < b->buffer->n_datas; i++) { + struct spa_data *d = &b->buffer->datas[i]; + GstMemory *gmem = NULL; + + GST_LOG_OBJECT (pool, "wrap buffer %d %d", d->mapoffset, d->maxsize); + if (d->type == t->data.MemFd) { + gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup (d->fd), + d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); + gst_memory_resize (gmem, d->mapoffset, d->maxsize); + data->offset = d->mapoffset; + } + else if(d->type == t->data.DmaBuf) { + gmem = gst_dmabuf_allocator_alloc (pool->dmabuf_allocator, dup (d->fd), + d->mapoffset + d->maxsize); + gst_memory_resize (gmem, d->mapoffset, d->maxsize); + data->offset = d->mapoffset; + } + else if (d->type == t->data.MemPtr) { + gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, + d->maxsize, NULL, NULL); + data->offset = 0; + } + if (gmem) + gst_buffer_append_memory (buf, gmem); + } + + data->pool = gst_object_ref (pool); + data->owner = NULL; + data->header = spa_buffer_find_meta (b->buffer, t->meta.Header); + data->flags = GST_BUFFER_FLAGS (buf); + data->b = b; + data->buf = buf; + + gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), + pool_data_quark, + data, + pool_data_destroy); + b->user_data = data; +} + +GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer) +{ + return gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), pool_data_quark); +} + +#if 0 gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer) { @@ -78,25 +153,31 @@ gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer) return res; } +#endif static GstFlowReturn acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, GstBufferPoolAcquireParams * params) { GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); + GstPipeWirePoolData *data; + struct pw_buffer *b; GST_OBJECT_LOCK (pool); while (TRUE) { if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool))) goto flushing; - if (p->available.length > 0) + if ((b = pw_stream_dequeue_buffer(p->stream))) break; GST_WARNING ("queue empty"); g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool)); } - *buffer = g_queue_pop_head (&p->available); + + data = b->user_data; + *buffer = data->buf; + GST_OBJECT_UNLOCK (pool); GST_DEBUG ("acquire buffer %p", *buffer); @@ -123,13 +204,7 @@ flush_start (GstBufferPool * pool) static void release_buffer (GstBufferPool * pool, GstBuffer *buffer) { - GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool); - GST_DEBUG ("release buffer %p", buffer); - GST_OBJECT_LOCK (pool); - g_queue_push_tail (&p->available, buffer); - g_cond_signal (&p->cond); - GST_OBJECT_UNLOCK (pool); } static gboolean @@ -145,6 +220,8 @@ gst_pipewire_pool_finalize (GObject * object) GstPipeWirePool *pool = GST_PIPEWIRE_POOL (object); GST_DEBUG_OBJECT (pool, "finalize"); + g_object_unref (pool->fd_allocator); + g_object_unref (pool->dmabuf_allocator); G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object); } @@ -168,11 +245,14 @@ gst_pipewire_pool_class_init (GstPipeWirePoolClass * klass) GST_DEBUG_CATEGORY_INIT (gst_pipewire_pool_debug_category, "pipewirepool", 0, "debug category for pipewirepool object"); + + pool_data_quark = g_quark_from_static_string ("GstPipeWirePoolDataQuark"); } static void gst_pipewire_pool_init (GstPipeWirePool * pool) { + pool->fd_allocator = gst_fd_allocator_new (); + pool->dmabuf_allocator = gst_dmabuf_allocator_new (); g_cond_init (&pool->cond); - g_queue_init (&pool->available); } diff --git a/src/gst/gstpipewirepool.h b/src/gst/gstpipewirepool.h index 80188f71c..4ed62d79a 100644 --- a/src/gst/gstpipewirepool.h +++ b/src/gst/gstpipewirepool.h @@ -39,14 +39,29 @@ G_BEGIN_DECLS #define GST_PIPEWIRE_POOL_GET_CLASS(klass) \ (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PIPEWIRE_POOL, GstPipeWirePoolClass)) +typedef struct _GstPipeWirePoolData GstPipeWirePoolData; typedef struct _GstPipeWirePool GstPipeWirePool; typedef struct _GstPipeWirePoolClass GstPipeWirePoolClass; +struct _GstPipeWirePoolData { + GstPipeWirePool *pool; + void *owner; + struct spa_meta_header *header; + guint flags; + goffset offset; + struct pw_buffer *b; + GstBuffer *buf; +}; + struct _GstPipeWirePool { GstBufferPool parent; struct pw_stream *stream; - GQueue available; + struct pw_type *t; + + GstAllocator *fd_allocator; + GstAllocator *dmabuf_allocator; + GCond cond; }; @@ -58,8 +73,12 @@ GType gst_pipewire_pool_get_type (void); GstPipeWirePool * gst_pipewire_pool_new (void); -gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer); -gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer); +void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *buffer); + +GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer); + +//gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer); +//gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer); G_END_DECLS diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index 498ef8153..ea4b3eb3e 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -37,14 +37,9 @@ #include #include #include -#include - -#include #include "gstpipewireformat.h" -static GQuark process_mem_data_quark; - GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug); #define GST_CAT_DEFAULT pipewire_sink_debug @@ -123,10 +118,8 @@ gst_pipewire_sink_finalize (GObject * object) if (pwsink->properties) gst_structure_free (pwsink->properties); - g_object_unref (pwsink->allocator); g_free (pwsink->path); g_free (pwsink->client_name); - g_hash_table_unref (pwsink->buf_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -219,8 +212,6 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass) GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0, "PipeWire Sink"); - - process_mem_data_quark = g_quark_from_static_string ("GstPipeWireSinkProcessMemQuark"); } #define PROP_RANGE(min,max) 2,min,max @@ -234,7 +225,7 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) guint size; guint min_buffers; guint max_buffers; - struct spa_pod *port_params[2]; + const struct spa_pod *port_params[2]; struct spa_pod_builder b = { NULL }; uint8_t buffer[1024]; @@ -273,7 +264,6 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink) static void gst_pipewire_sink_init (GstPipeWireSink * sink) { - sink->allocator = gst_fd_allocator_new (); sink->pool = gst_pipewire_pool_new (); sink->client_name = pw_get_client_name(); sink->mode = DEFAULT_PROP_MODE; @@ -281,15 +271,13 @@ gst_pipewire_sink_init (GstPipeWireSink * sink) g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink); - sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, - (GDestroyNotify) gst_buffer_unref); - g_queue_init (&sink->queue); sink->loop = pw_loop_new (NULL); sink->main_loop = pw_thread_loop_new (sink->loop, "pipewire-sink-loop"); sink->core = pw_core_new (sink->loop, NULL); sink->type = pw_core_get_type (sink->core); + sink->pool->t = sink->type; GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop); } @@ -407,126 +395,35 @@ gst_pipewire_sink_get_property (GObject * object, guint prop_id, } } -typedef struct { - GstPipeWireSink *sink; - guint id; - struct spa_buffer *buf; - struct spa_meta_header *header; - guint flags; - goffset offset; -} ProcessMemData; - static void -process_mem_data_destroy (gpointer user_data) -{ - ProcessMemData *data = user_data; - - gst_object_unref (data->sink); - g_slice_free (ProcessMemData, data); -} - -static void -on_add_buffer (void *_data, - uint32_t id) +on_add_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSink *pwsink = _data; - struct spa_buffer *b; - GstBuffer *buf; - uint32_t i; - ProcessMemData data; - struct pw_type *t = pwsink->type; - - GST_LOG_OBJECT (pwsink, "add buffer"); - - if (!(b = pw_stream_peek_buffer (pwsink->stream, id))) { - g_warning ("failed to peek buffer"); - return; - } - - buf = gst_buffer_new (); - - data.sink = gst_object_ref (pwsink); - data.id = id; - data.buf = b; - data.header = spa_buffer_find_meta (b, t->meta.Header); - - for (i = 0; i < b->n_datas; i++) { - struct spa_data *d = &b->datas[i]; - GstMemory *gmem = NULL; - - if (d->type == t->data.MemFd || - d->type == t->data.DmaBuf) { - gmem = gst_fd_allocator_alloc (pwsink->allocator, dup (d->fd), - d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (gmem, d->mapoffset, d->maxsize); - data.offset = d->mapoffset; - } - else if (d->type == t->data.MemPtr) { - gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, - d->maxsize, NULL, NULL); - data.offset = 0; - } - if (gmem) - gst_buffer_append_memory (buf, gmem); - } - data.flags = GST_BUFFER_FLAGS (buf); - gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark, - g_slice_dup (ProcessMemData, &data), - process_mem_data_destroy); - - gst_pipewire_pool_add_buffer (pwsink->pool, buf); - g_hash_table_insert (pwsink->buf_ids, GINT_TO_POINTER (id), buf); - + gst_pipewire_pool_wrap_buffer (pwsink->pool, b); pw_thread_loop_signal (pwsink->main_loop, FALSE); } static void -on_remove_buffer (void *data, - uint32_t id) +on_remove_buffer (void *_data, struct pw_buffer *b) { - GstPipeWireSink *pwsink = data; - GstBuffer *buf; + GstPipeWireSink *pwsink = _data; + GstPipeWirePoolData *data = b->user_data; GST_LOG_OBJECT (pwsink, "remove buffer"); - buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id)); - if (buf) { - GST_MINI_OBJECT_CAST (buf)->dispose = NULL; - if (!gst_pipewire_pool_remove_buffer (pwsink->pool, buf)) - gst_buffer_ref (buf); - if (g_queue_remove (&pwsink->queue, buf)) - gst_buffer_unref (buf); - g_hash_table_remove (pwsink->buf_ids, GINT_TO_POINTER (id)); - } -} -static void -on_new_buffer (void *data, - uint32_t id) -{ - GstPipeWireSink *pwsink = data; - GstBuffer *buf; - - GST_LOG_OBJECT (pwsink, "got new buffer %u", id); - if (pwsink->stream == NULL) { - GST_LOG_OBJECT (pwsink, "no stream"); - return; - } - buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id)); - - if (buf) { - gst_buffer_unref (buf); - pw_thread_loop_signal (pwsink->main_loop, FALSE); - } + if (g_queue_remove (&pwsink->queue, data->buf)) + gst_buffer_unref (data->buf); + gst_buffer_unref (data->buf); } static void do_send_buffer (GstPipeWireSink *pwsink) { GstBuffer *buffer; - ProcessMemData *data; + GstPipeWirePoolData *data; gboolean res; guint i; + struct spa_buffer *b; buffer = g_queue_pop_head (&pwsink->queue); if (buffer == NULL) { @@ -534,23 +431,24 @@ do_send_buffer (GstPipeWireSink *pwsink) return; } - data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), - process_mem_data_quark); + data = gst_pipewire_pool_get_data(buffer); + + b = data->b->buffer; if (data->header) { data->header->seq = GST_BUFFER_OFFSET (buffer); data->header->pts = GST_BUFFER_PTS (buffer); data->header->dts_offset = GST_BUFFER_DTS (buffer); } - for (i = 0; i < data->buf->n_datas; i++) { - struct spa_data *d = &data->buf->datas[i]; + for (i = 0; i < b->n_datas; i++) { + struct spa_data *d = &b->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buffer, i); d->chunk->offset = mem->offset - data->offset; d->chunk->size = mem->size; } - if (!(res = pw_stream_send_buffer (pwsink->stream, data->id))) { - g_warning ("can't send buffer"); + if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) { + g_warning ("can't send buffer %s", spa_strerror(res)); pw_thread_loop_signal (pwsink->main_loop, FALSE); } else pwsink->need_ready--; @@ -558,9 +456,17 @@ do_send_buffer (GstPipeWireSink *pwsink) static void -on_need_buffer (void *data) +on_process (void *data) { GstPipeWireSink *pwsink = data; + + if (pwsink->stream == NULL) { + GST_LOG_OBJECT (pwsink, "no stream"); + return; + } + + g_cond_signal (&pwsink->pool->cond); + pwsink->need_ready++; GST_DEBUG ("need buffer %u", pwsink->need_ready); do_send_buffer (pwsink); @@ -590,7 +496,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta } static void -on_format_changed (void *data, struct spa_pod *format) +on_format_changed (void *data, const struct spa_pod *format) { GstPipeWireSink *pwsink = data; @@ -733,8 +639,7 @@ static const struct pw_stream_events stream_events = { .format_changed = on_format_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, - .new_buffer = on_new_buffer, - .need_buffer = on_need_buffer, + .process = on_process, }; static gboolean diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index c746cf358..5ff61f2e4 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -89,12 +89,10 @@ struct _GstPipeWireSink { struct pw_stream *stream; struct spa_hook stream_listener; - GstAllocator *allocator; GstStructure *properties; GstPipeWireSinkMode mode; GstPipeWirePool *pool; - GHashTable *buf_ids; GQueue queue; guint need_ready; }; diff --git a/src/gst/gstpipewiresrc.c b/src/gst/gstpipewiresrc.c index 86770f1a0..568ebfa9b 100644 --- a/src/gst/gstpipewiresrc.c +++ b/src/gst/gstpipewiresrc.c @@ -211,13 +211,10 @@ gst_pipewire_src_finalize (GObject * object) if (pwsrc->properties) gst_structure_free (pwsrc->properties); - g_object_unref (pwsrc->fd_allocator); - g_object_unref (pwsrc->dmabuf_allocator); if (pwsrc->clock) gst_object_unref (pwsrc->clock); g_free (pwsrc->path); g_free (pwsrc->client_name); - g_hash_table_unref (pwsrc->buf_ids); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -322,163 +319,94 @@ gst_pipewire_src_init (GstPipeWireSrc * src) g_queue_init (&src->queue); - src->fd_allocator = gst_fd_allocator_new (); - src->dmabuf_allocator = gst_dmabuf_allocator_new (); src->client_name = pw_get_client_name (); - src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref); + src->pool = gst_pipewire_pool_new (); src->loop = pw_loop_new (NULL); src->main_loop = pw_thread_loop_new (src->loop, "pipewire-main-loop"); src->core = pw_core_new (src->loop, NULL); src->type = pw_core_get_type (src->core); + src->pool->t = src->type; GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop); } -typedef struct { - GstPipeWireSrc *src; - guint id; - struct spa_buffer *buf; - struct spa_meta_header *header; - guint flags; - goffset offset; -} ProcessMemData; - -static void -process_mem_data_destroy (gpointer user_data) -{ - ProcessMemData *data = user_data; - - gst_object_unref (data->src); - g_slice_free (ProcessMemData, data); -} - static gboolean buffer_recycle (GstMiniObject *obj) { - ProcessMemData *data; GstPipeWireSrc *src; + GstPipeWirePoolData *data; gst_mini_object_ref (obj); - data = gst_mini_object_get_qdata (obj, - process_mem_data_quark); + data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj)); + GST_BUFFER_FLAGS (obj) = data->flags; - src = data->src; + src = data->owner; GST_LOG_OBJECT (obj, "recycle buffer"); pw_thread_loop_lock (src->main_loop); - pw_stream_recycle_buffer (src->stream, data->id); + pw_stream_queue_buffer (src->stream, data->b); pw_thread_loop_unlock (src->main_loop); return FALSE; } static void -on_add_buffer (void *_data, guint id) +on_add_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSrc *pwsrc = _data; - struct spa_buffer *b; - GstBuffer *buf; - uint32_t i; - ProcessMemData data; - struct pw_core *core = pwsrc->core; - struct pw_type *t = pw_core_get_type(core); + GstPipeWirePoolData *data; GST_LOG_OBJECT (pwsrc, "add buffer"); - - if (!(b = pw_stream_peek_buffer (pwsrc->stream, id))) { - g_warning ("failed to peek buffer"); - return; - } - - buf = gst_buffer_new (); - GST_MINI_OBJECT_CAST (buf)->dispose = buffer_recycle; - - data.src = gst_object_ref (pwsrc); - data.id = id; - data.buf = b; - data.header = spa_buffer_find_meta (b, t->meta.Header); - - for (i = 0; i < b->n_datas; i++) { - struct spa_data *d = &b->datas[i]; - GstMemory *gmem = NULL; - - if (d->type == t->data.MemFd) { - gmem = gst_fd_allocator_alloc (pwsrc->fd_allocator, dup (d->fd), - d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE); - gst_memory_resize (gmem, d->mapoffset, d->maxsize); - data.offset = d->mapoffset; - } - else if(d->type == t->data.DmaBuf) { - gmem = gst_dmabuf_allocator_alloc (pwsrc->dmabuf_allocator, dup (d->fd), - d->mapoffset + d->maxsize); - gst_memory_resize (gmem, d->mapoffset, d->maxsize); - data.offset = d->mapoffset; - } - else if (d->type == t->data.MemPtr) { - gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0, - d->maxsize, NULL, NULL); - data.offset = 0; - } - if (gmem) - gst_buffer_append_memory (buf, gmem); - } - data.flags = GST_BUFFER_FLAGS (buf); - gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark, - g_slice_dup (ProcessMemData, &data), - process_mem_data_destroy); - - g_hash_table_insert (pwsrc->buf_ids, GINT_TO_POINTER (id), buf); + gst_pipewire_pool_wrap_buffer (pwsrc->pool, b); + data = b->user_data; + data->owner = pwsrc; + GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle; } static void -on_remove_buffer (void *data, - guint id) -{ - GstPipeWireSrc *pwsrc = data; - GstBuffer *buf; - - GST_LOG_OBJECT (pwsrc, "remove buffer"); - buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id)); - if (buf) { - GList *walk; - - GST_MINI_OBJECT_CAST (buf)->dispose = NULL; - - walk = pwsrc->queue.head; - while (walk) { - GList *next = walk->next; - - if (walk->data == buf) { - gst_buffer_unref (buf); - g_queue_delete_link (&pwsrc->queue, walk); - } - walk = next; - } - g_hash_table_remove (pwsrc->buf_ids, GINT_TO_POINTER (id)); - } -} - -static void -on_new_buffer (void *_data, - guint id) +on_remove_buffer (void *_data, struct pw_buffer *b) { GstPipeWireSrc *pwsrc = _data; + GstPipeWirePoolData *data = b->user_data; + GstBuffer *buf = data->buf; + GList *walk; + + GST_LOG_OBJECT (pwsrc, "remove buffer"); + + GST_MINI_OBJECT_CAST (buf)->dispose = NULL; + + walk = pwsrc->queue.head; + while (walk) { + GList *next = walk->next; + + if (walk->data == buf) { + gst_buffer_unref (buf); + g_queue_delete_link (&pwsrc->queue, walk); + } + walk = next; + } +} + +static void +on_process (void *_data) +{ + GstPipeWireSrc *pwsrc = _data; + struct pw_buffer *b; GstBuffer *buf; - ProcessMemData *data; + GstPipeWirePoolData *data; struct spa_meta_header *h; guint i; - buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id)); - if (buf == NULL) { - g_warning ("unknown buffer %d", id); - return; - } + b = pw_stream_dequeue_buffer (pwsrc->stream); + if (b == NULL) + return; + + data = b->user_data; + buf = data->buf; + GST_LOG_OBJECT (pwsrc, "got new buffer %p", buf); - data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf), - process_mem_data_quark); h = data->header; if (h) { GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset); @@ -490,8 +418,8 @@ on_new_buffer (void *_data, } GST_BUFFER_OFFSET (buf) = h->seq; } - for (i = 0; i < data->buf->n_datas; i++) { - struct spa_data *d = &data->buf->datas[i]; + for (i = 0; i < b->buffer->n_datas; i++) { + struct spa_data *d = &b->buffer->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buf, i); mem->offset = SPA_MIN(d->chunk->offset, d->maxsize); mem->size = SPA_MIN(d->chunk->size, d->maxsize - mem->offset); @@ -690,7 +618,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc) pw_stream_connect (pwsrc->stream, PW_DIRECTION_INPUT, pwsrc->path, - PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_CLOCK_UPDATE, + PW_STREAM_FLAG_AUTOCONNECT, (const struct spa_pod **)possible->pdata, possible->len); g_ptr_array_free (possible, TRUE); @@ -754,8 +682,8 @@ connect_error: #define SPA_PROP_RANGE(min,max) 2,min,max static void -on_format_changed (void *data, - struct spa_pod *format) +on_format_changed (void *data, + const struct spa_pod *format) { GstPipeWireSrc *pwsrc = data; GstCaps *caps; @@ -775,7 +703,7 @@ on_format_changed (void *data, gst_caps_unref (caps); if (res) { - struct spa_pod *params[2]; + const struct spa_pod *params[2]; struct spa_pod_builder b = { NULL }; uint8_t buffer[512]; @@ -1041,7 +969,7 @@ static const struct pw_stream_events stream_events = { .format_changed = on_format_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, - .new_buffer = on_new_buffer, + .process = on_process, }; static gboolean diff --git a/src/gst/gstpipewiresrc.h b/src/gst/gstpipewiresrc.h index 5769c02dc..c8f718b91 100644 --- a/src/gst/gstpipewiresrc.h +++ b/src/gst/gstpipewiresrc.h @@ -24,6 +24,7 @@ #include #include +#include G_BEGIN_DECLS @@ -76,11 +77,9 @@ struct _GstPipeWireSrc { struct pw_stream *stream; struct spa_hook stream_listener; - GstAllocator *fd_allocator; - GstAllocator *dmabuf_allocator; GstStructure *properties; - GHashTable *buf_ids; + GstPipeWirePool *pool; GQueue queue; GstClock *clock; }; diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index dfbef3df9..cb3d351f9 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -42,7 +42,7 @@ #define MAX_INPUTS 64 #define MAX_OUTPUTS 64 -struct mem_id { +struct mem { uint32_t id; int fd; uint32_t flags; @@ -51,15 +51,14 @@ struct mem_id { void *ptr; }; -struct buffer_id { +struct buffer { + struct pw_buffer buffer; struct spa_list link; - uint32_t id; - bool used; - struct spa_buffer *buf; + bool queued; void *ptr; struct pw_map_range map; uint32_t n_mem; - struct mem_id **mem; + struct mem **mem; }; struct stream { @@ -101,9 +100,8 @@ struct stream { bool client_reuse; - struct spa_list free; - bool in_need_buffer; - bool in_new_buffer; + struct spa_list queue; + bool in_process; int64_t last_ticks; int32_t last_rate; @@ -111,19 +109,19 @@ struct stream { }; /** \endcond */ -static struct mem_id *find_mem(struct pw_stream *stream, uint32_t id) +static struct mem *find_mem(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct mem_id *mid; + struct mem *m; - pw_array_for_each(mid, &impl->mem_ids) { - if (mid->id == id) - return mid; + pw_array_for_each(m, &impl->mem_ids) { + if (m->id == id) + return m; } return NULL; } -static void *mem_map(struct pw_stream *stream, struct mem_id *m, uint32_t offset, uint32_t size) +static void *mem_map(struct pw_stream *stream, struct mem *m, uint32_t offset, uint32_t size) { if (m->ptr == NULL) { pw_map_range_init(&m->map, offset, size, stream->remote->core->sc_pagesize); @@ -140,7 +138,7 @@ static void *mem_map(struct pw_stream *stream, struct mem_id *m, uint32_t offset return SPA_MEMBER(m->ptr, m->map.start, void); } -static void mem_unmap(struct stream *impl, struct mem_id *m) +static void mem_unmap(struct stream *impl, struct mem *m) { if (m->ptr != NULL) { if (munmap(m->ptr, m->map.size) < 0) @@ -149,15 +147,15 @@ static void mem_unmap(struct stream *impl, struct mem_id *m) } } -static void clear_memid(struct stream *impl, struct mem_id *mid) +static void clear_mem(struct stream *impl, struct mem *m) { - if (mid->fd != -1) { + if (m->fd != -1) { bool has_ref = false; - struct mem_id *m2; + struct mem *m2; int fd; - fd = mid->fd; - mid->fd = -1; + fd = m->fd; + m->fd = -1; pw_array_for_each(m2, &impl->mem_ids) { if (m2->fd == fd) { @@ -166,7 +164,7 @@ static void clear_memid(struct stream *impl, struct mem_id *mid) } } if (!has_ref) { - mem_unmap(impl, mid); + mem_unmap(impl, m); close(fd); } } @@ -175,33 +173,33 @@ static void clear_memid(struct stream *impl, struct mem_id *mid) static void clear_mems(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct mem_id *mid; + struct mem *m; - pw_array_for_each(mid, &impl->mem_ids) - clear_memid(impl, mid); + pw_array_for_each(m, &impl->mem_ids) + clear_mem(impl, m); impl->mem_ids.size = 0; } static void clear_buffers(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; pw_log_debug("stream %p: clear buffers", stream); - pw_array_for_each(bid, &impl->buffer_ids) { - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, bid->id); - if (bid->ptr != NULL) - if (munmap(bid->ptr, bid->map.size) < 0) + pw_array_for_each(b, &impl->buffer_ids) { + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, &b->buffer); + if (b->ptr != NULL) + if (munmap(b->ptr, b->map.size) < 0) pw_log_warn("failed to unmap buffer: %m"); - bid->ptr = NULL; - free(bid->buf); - bid->buf = NULL; - bid->used = false; + b->ptr = NULL; + free(b->buffer.buffer); + b->buffer.buffer = NULL; + b->queued = false; } impl->buffer_ids.size = 0; impl->in_order = true; - spa_list_init(&impl->free); + spa_list_init(&impl->queue); } static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error) @@ -282,11 +280,11 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote, this->state = PW_STREAM_STATE_UNCONNECTED; pw_array_init(&impl->mem_ids, 64); - pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem_id) * 64); + pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem) * 64); pw_array_init(&impl->buffer_ids, 32); - pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer_id) * 64); + pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer) * 64); impl->pending_seq = SPA_ID_INVALID; - spa_list_init(&impl->free); + spa_list_init(&impl->queue); spa_list_append(&remote->stream_list, &this->link); @@ -374,7 +372,7 @@ set_init_params(struct pw_stream *stream, } } -static void set_params(struct pw_stream *stream, int n_params, struct spa_pod **params) +static void set_params(struct pw_stream *stream, int n_params, const struct spa_pod **params) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); int i; @@ -549,18 +547,18 @@ static void on_timeout(void *data, uint64_t expirations) add_request_clock_update(stream); } -static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id) +static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer_id)) { - return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer_id); + if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer)) { + return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer); } else { - struct buffer_id *bid; + struct buffer *b; - pw_array_for_each(bid, &impl->buffer_ids) { - if (bid->id == id) - return bid; + pw_array_for_each(b, &impl->buffer_ids) { + if (b->buffer.buffer->id == id) + return b; } } return NULL; @@ -569,15 +567,12 @@ static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id) static inline void reuse_buffer(struct pw_stream *stream, uint32_t id) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; - if ((bid = find_buffer(stream, id)) && bid->used) { + if ((b = find_buffer(stream, id)) && !b->queued) { pw_log_trace("stream %p: reuse buffer %u", stream, id); - bid->used = false; - spa_list_append(&impl->free, &bid->link); - impl->in_new_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, new_buffer, id); - impl->in_new_buffer = false; + spa_list_append(&impl->queue, &b->link); + b->queued = true; } } @@ -592,7 +587,7 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod for (i = 0; i < impl->trans->area->n_input_ports; i++) { struct spa_io_buffers *input = &impl->trans->inputs[i]; - struct buffer_id *bid; + struct buffer *b; uint32_t buffer_id; buffer_id = input->buffer_id; @@ -600,20 +595,20 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod pw_log_trace("stream %p: process input %d %d", stream, input->status, buffer_id); - if ((bid = find_buffer(stream, buffer_id)) == NULL) + if ((b = find_buffer(stream, buffer_id)) == NULL) continue; if (impl->client_reuse) input->buffer_id = SPA_ID_INVALID; if (input->status == SPA_STATUS_HAVE_BUFFER) { - bid->used = true; - impl->in_new_buffer = true; + spa_list_append(&impl->queue, &b->link); + b->queued = true; + impl->in_process = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - new_buffer, buffer_id); - impl->in_new_buffer = false; + process); + impl->in_process = false; } - input->status = SPA_STATUS_NEED_BUFFER; } send_need_input(stream); @@ -633,9 +628,9 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod output->buffer_id = SPA_ID_INVALID; } pw_log_trace("stream %p: process output", stream); - impl->in_need_buffer = true; - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer); - impl->in_need_buffer = false; + impl->in_process = true; + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, process); + impl->in_process = false; break; } case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER: @@ -695,12 +690,11 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd) SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, stream); - if (impl->flags & PW_STREAM_FLAG_CLOCK_UPDATE) { - impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream); - interval.tv_sec = 0; - interval.tv_nsec = 100000000; - pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false); - } + impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream); + interval.tv_sec = 0; + interval.tv_nsec = 100000000; + pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false); + return; } @@ -751,10 +745,10 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma send_need_input(stream); } else { - impl->in_need_buffer = true; + impl->in_process = true; spa_hook_list_call(&stream->listener_list, struct pw_stream_events, - need_buffer); - impl->in_need_buffer = false; + process); + impl->in_process = false; } stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL); } @@ -839,15 +833,15 @@ client_node_add_mem(void *data, { struct stream *impl = data; struct pw_stream *stream = &impl->this; - struct mem_id *m; + struct mem *m; m = find_mem(stream, mem_id); if (m) { pw_log_debug("update mem %u, fd %d, flags %d", mem_id, memfd, flags); - clear_memid(impl, m); + clear_mem(impl, m); } else { - m = pw_array_add(&impl->mem_ids, sizeof(struct mem_id)); + m = pw_array_add(&impl->mem_ids, sizeof(struct mem)); pw_log_debug("add mem %u, fd %d, flags %d", mem_id, memfd, flags); } @@ -868,7 +862,7 @@ client_node_port_use_buffers(void *data, struct pw_stream *stream = &impl->this; struct pw_core *core = stream->remote->core; struct pw_type *t = &core->type; - struct buffer_id *bid; + struct buffer *bid; uint32_t i, j, len; struct spa_buffer *b; int prot; @@ -881,29 +875,29 @@ client_node_port_use_buffers(void *data, for (i = 0; i < n_buffers; i++) { off_t offset; - struct mem_id *mid = find_mem(stream, buffers[i].mem_id); - if (mid == NULL) { + struct mem *m = find_mem(stream, buffers[i].mem_id); + if (m == NULL) { pw_log_warn("unknown memory id %u", buffers[i].mem_id); continue; } - len = pw_array_get_len(&impl->buffer_ids, struct buffer_id); - bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer_id)); + len = pw_array_get_len(&impl->buffer_ids, struct buffer); + bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer)); if (impl->direction == SPA_DIRECTION_OUTPUT) { - bid->used = false; - spa_list_append(&impl->free, &bid->link); + bid->queued = true; + spa_list_append(&impl->queue, &bid->link); } else { - bid->used = true; + bid->queued = false; } b = buffers[i].buffer; pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize); - bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, mid->fd, bid->map.offset); + bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, m->fd, bid->map.offset); if (bid->ptr == MAP_FAILED) { bid->ptr = NULL; - pw_log_warn("Failed to mmap memory %d %p: %s", bid->map.size, mid, + pw_log_warn("Failed to mmap memory %d %p: %s", bid->map.size, m, strerror(errno)); continue; } @@ -912,35 +906,34 @@ client_node_port_use_buffers(void *data, size_t size; size = sizeof(struct spa_buffer); - size += sizeof(struct mem_id *); + size += sizeof(struct mem *); for (j = 0; j < buffers[i].buffer->n_metas; j++) size += sizeof(struct spa_meta); for (j = 0; j < buffers[i].buffer->n_datas; j++) { size += sizeof(struct spa_data); - size += sizeof(struct mem_id *); + size += sizeof(struct mem *); } - b = bid->buf = malloc(size); + b = bid->buffer.buffer = malloc(size); memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer)); b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta); b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas, struct spa_data); bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas, - struct mem_id*); + struct mem*); bid->n_mem = 0; - mid->ref++; - bid->mem[bid->n_mem++] = mid; + m->ref++; + bid->mem[bid->n_mem++] = m; } - bid->id = b->id; - if (bid->id != len) { - pw_log_warn("unexpected id %u found, expected %u", bid->id, len); + if (b->id != len) { + pw_log_warn("unexpected id %u found, expected %u", b->id, len); impl->in_order = false; } - pw_log_debug("add buffer %d %d %u %u", mid->id, - bid->id, bid->map.offset, bid->map.size); + pw_log_debug("add buffer %d %d %u %u", m->id, + b->id, bid->map.offset, bid->map.size); offset = bid->map.start; for (j = 0; j < b->n_metas; j++) { @@ -959,22 +952,23 @@ client_node_port_use_buffers(void *data, struct spa_chunk); if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) { - struct mem_id *bmid = find_mem(stream, SPA_PTR_TO_UINT32(d->data)); + struct mem *bm = find_mem(stream, SPA_PTR_TO_UINT32(d->data)); d->data = NULL; - d->fd = bmid->fd; - bmid->ref++; - bid->mem[bid->n_mem++] = bmid; - pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd); + d->fd = bm->fd; + bm->ref++; + bid->mem[bid->n_mem++] = bm; + pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd); } else if (d->type == t->data.MemPtr) { d->data = SPA_MEMBER(bid->ptr, bid->map.start + SPA_PTR_TO_INT(d->data), void); d->fd = -1; - pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data); + pw_log_debug(" data %d %u -> mem %p", j, b->id, d->data); } else { pw_log_warn("unknown buffer data type %d", d->type); } } - spa_hook_list_call(&stream->listener_list, struct pw_stream_events, add_buffer, bid->id); + spa_hook_list_call(&stream->listener_list, struct pw_stream_events, + add_buffer, &bid->buffer); } add_async_complete(stream, seq, 0); @@ -1029,7 +1023,7 @@ static void client_node_port_set_io(void *data, struct pw_stream *stream = &impl->this; struct pw_core *core = stream->remote->core; struct pw_type *t = &core->type; - struct mem_id *m; + struct mem *m; void *ptr; int res; @@ -1135,6 +1129,12 @@ pw_stream_connect(struct pw_stream *stream, return 0; } +struct pw_remote * +pw_stream_get_remote(struct pw_stream *stream) +{ + return stream->remote; +} + uint32_t pw_stream_get_node_id(struct pw_stream *stream) { @@ -1144,7 +1144,7 @@ pw_stream_get_node_id(struct pw_stream *stream) void pw_stream_finish_format(struct pw_stream *stream, int res, - struct spa_pod **params, + const struct spa_pod **params, uint32_t n_params) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); @@ -1203,81 +1203,78 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time) elapsed = (time->now - impl->last_monotonic) / 1000; time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC; - time->rate = impl->last_rate; + time->rate.num = 1; + time->rate.denom = impl->last_rate; return 0; } -uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream) +int pw_stream_set_control(struct pw_stream *stream, const char *name, float value) { - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; - - if (spa_list_is_empty(&impl->free)) - return SPA_ID_INVALID; - - bid = spa_list_first(&impl->free, struct buffer_id, link); - - return bid->id; + return -ENOTSUP; } -int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id) +int pw_stream_get_control(struct pw_stream *stream, const char *name, float *value) +{ + return -ENOTSUP; +} + +struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream) { struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; + struct buffer *b; - if ((bid = find_buffer(stream, id)) == NULL || !bid->used) + if (spa_list_is_empty(&impl->queue)) + return NULL; + + b = spa_list_first(&impl->queue, struct buffer, link); + + b->queued = false; + spa_list_remove(&b->link); + + return &b->buffer; +} + +int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer) +{ + struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); + struct buffer *b; + uint32_t id; + + if ((b = find_buffer(stream, buffer->buffer->id)) == NULL) return -EINVAL; - bid->used = false; - spa_list_append(&impl->free, &bid->link); + if (b->queued) + return -EINVAL; - if (impl->in_new_buffer) { - int i; + id = buffer->buffer->id; - for (i = 0; i < impl->trans->area->n_input_ports; i++) { - struct spa_io_buffers *input = &impl->trans->inputs[i]; - input->buffer_id = id; + if (impl->direction == SPA_DIRECTION_OUTPUT) { + if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) { + pw_log_debug("can't send %u, pending buffer %u", id, + impl->trans->outputs[0].buffer_id); + return -EIO; } - } else { - send_reuse_buffer(stream, id); - } - - return 0; -} - -struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id) -{ - struct buffer_id *bid; - - if ((bid = find_buffer(stream, id))) - return bid->buf; - - return NULL; -} - -int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id) -{ - struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this); - struct buffer_id *bid; - - if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) { - pw_log_debug("can't send %u, pending buffer %u", id, - impl->trans->outputs[0].buffer_id); - return -EIO; - } - - if ((bid = find_buffer(stream, id)) && !bid->used) { - bid->used = true; - spa_list_remove(&bid->link); impl->trans->outputs[0].buffer_id = id; impl->trans->outputs[0].status = SPA_STATUS_HAVE_BUFFER; pw_log_trace("stream %p: send buffer %d", stream, id); - if (!impl->in_need_buffer) + if (!impl->in_process) send_have_output(stream); - } else { - pw_log_debug("stream %p: output %u was used", stream, id); } + else { + b->queued = true; + spa_list_append(&impl->queue, &b->link); + if (impl->in_process) { + int i; + for (i = 0; i < impl->trans->area->n_input_ports; i++) { + struct spa_io_buffers *input = &impl->trans->inputs[i]; + input->buffer_id = id; + } + } + else { + send_reuse_buffer(stream, id); + } + } return 0; } diff --git a/src/pipewire/stream.h b/src/pipewire/stream.h index 5d99dab60..4d9ccc60d 100644 --- a/src/pipewire/stream.h +++ b/src/pipewire/stream.h @@ -59,18 +59,6 @@ extern "C" { * The stream is initially unconnected. To connect the stream, use * \ref pw_stream_connect(). Pass the desired direction as an argument. * - * \subsection ssec_stream_mode Stream modes - * - * The stream mode specifies how the data will be exchanged with PipeWire. - * The following stream modes are available - * - * \li \ref PW_STREAM_MODE_BUFFER: data is exchanged with fixed size - * buffers. This is ideal for video frames or equal sized audio - * frames. - * \li \ref PW_STREAM_MODE_RINGBUFFER: data is exhanged with a fixed - * size ringbuffer. This is ideal for variable sized audio packets - * or compressed media. - * * \subsection ssec_stream_target Stream target * * To make the newly connected stream automatically connect to an existing @@ -107,7 +95,8 @@ extern "C" { * between client and server. * * With the add_buffer event, a stream will be notified of a new buffer - * that can be used for data transport. + * that can be used for data transport. You can attach user_data to these + * buffers. * * Afer the buffers are negotiated, the stream will transition to the * \ref PW_STREAM_STATE_PAUSED state. @@ -124,28 +113,23 @@ extern "C" { * * \subsection ssec_consume Consume data * - * The new_buffer event is emited for each new buffer can can be + * The process event is emited for each new buffer that can can be * consumed. * - * \ref pw_stream_peek_buffer() should be used to get the data and metadata - * of the buffer. + * \ref pw_stream_dequeue_buffer() should be used to get the data and + * metadata of the buffer. * - * When the buffer is no longer in use, call \ref pw_stream_recycle_buffer() + * When the buffer is no longer in use, call \ref pw_stream_queue_buffer() * to let PipeWire reuse the buffer. * * \subsection ssec_produce Produce data * - * The need_buffer event is emited when PipeWire needs a new buffer for this - * stream. + * \ref pw_stream_dequeue_buffer() gives an empty buffer that can be filled. * - * \ref pw_stream_get_empty_buffer() gives the id of an empty buffer. - * Use \ref pw_stream_peek_buffer() to get the data and metadata that should - * be filled. + * Filled buffers should be queued with \ref pw_stream_queue_buffer(). * - * To send the filled buffer, use \ref pw_stream_send_buffer(). - * - * The new_buffer event is emited when PipeWire no longer uses the buffer - * and it can be safely reused. + * The process event is emited when PipeWire has emptied a buffer that + * can now be refilled. * * \section sec_stream_disconnect Disconnect * @@ -179,6 +163,11 @@ enum pw_stream_state { PW_STREAM_STATE_STREAMING = 5 /**< streaming */ }; +struct pw_buffer { + struct spa_buffer *buffer; /* the spa buffer */ + void *user_data; /* user data attached to the buffer */ +}; + /** Events for a stream */ struct pw_stream_events { #define PW_VERSION_STREAM_EVENTS 0 @@ -191,17 +180,18 @@ struct pw_stream_events { /** when the format changed. The listener should call * pw_stream_finish_format() from within this callback or later to complete * the format negotiation and start the buffer negotiation. */ - void (*format_changed) (void *data, struct spa_pod *format); + void (*format_changed) (void *data, const struct spa_pod *format); /** when a new buffer was created for this stream */ - void (*add_buffer) (void *data, uint32_t id); + void (*add_buffer) (void *data, struct pw_buffer *buffer); /** when a buffer was destroyed for this stream */ - void (*remove_buffer) (void *data, uint32_t id); - /** when a buffer can be reused (for playback streams) or - * is filled (for capture streams */ - void (*new_buffer) (void *data, uint32_t id); - /** when a buffer is needed (for playback streams) */ - void (*need_buffer) (void *data); + void (*remove_buffer) (void *data, struct pw_buffer *buffer); + + /** when a buffer can be queued (for playback streams) or + * dequeued (for capture streams). This is normally called from the + * mainloop but can also be called directly from the realtime data + * thread if the user is prepared to deal with this. */ + void (*process) (void *data); }; /** Convert a stream state to a readable string \memberof pw_stream */ @@ -212,16 +202,14 @@ enum pw_stream_flags { PW_STREAM_FLAG_NONE = 0, /**< no flags */ PW_STREAM_FLAG_AUTOCONNECT = (1 << 0), /**< try to automatically connect * this stream */ - PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for - * this stream */ - PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */ -}; - -/** A time structure \memberof pw_stream */ -struct pw_time { - int64_t now; /**< the monotonic time */ - int64_t ticks; /**< the ticks at \a now */ - int32_t rate; /**< the rate of \a ticks */ + PW_STREAM_FLAG_INACTIVE = (1 << 1), /**< start the stream inactive */ + PW_STREAM_FLAG_MAP_BUFFERS = (1 << 2), /**< mmap the buffers */ + PW_STREAM_FLAG_DRIVER = (1 << 3), /**< be a driver */ + PW_STREAM_FLAG_RT_PROCESS = (1 << 4), /**< call process from the realtime + * thread */ + PW_STREAM_FLAG_NO_CONVERT = (1 << 5), /**< don't convert format */ + PW_STREAM_FLAG_EXCLUSIVE = (1 << 6), /**< require exclusive access to the + * device */ }; /** Create a new unconneced \ref pw_stream \memberof pw_stream @@ -231,6 +219,13 @@ pw_stream_new(struct pw_remote *remote, /**< a \ref pw_remote */ const char *name, /**< a stream name */ struct pw_properties *props /**< stream properties, ownership is taken */); +struct pw_stream * +pw_stream_new_simple(struct pw_loop *loop, /**< a \ref pw_loop to use */ + const char *name, /**< a stream name */ + struct pw_properties *props,/**< stream properties, ownership is taken */ + const struct pw_stream_events *events, /**< stream events */ + void *data /**< data passed to events */); + /** Destroy a stream \memberof pw_stream */ void pw_stream_destroy(struct pw_stream *stream); @@ -243,6 +238,8 @@ enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char ** const char *pw_stream_get_name(struct pw_stream *stream); +struct pw_remote *pw_stream_get_remote(struct pw_stream *stream); + /** Indicates that the stream is live, boolean default false */ #define PW_STREAM_PROP_IS_LIVE "pipewire.latency.is-live" /** The minimum latency of the stream, int, default 0 */ @@ -255,9 +252,8 @@ const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream); /** Connect a stream for input or output on \a port_path. \memberof pw_stream * \return 0 on success < 0 on error. * - * When \a mode is \ref PW_STREAM_MODE_BUFFER, you should connect to the new-buffer - * event and use pw_stream_peek_buffer() to get the latest metadata and - * data. */ + * You should connect to the process event and use pw_stream_dequeue_buffer() + * to get the latest metadata and data. */ int pw_stream_connect(struct pw_stream *stream, /**< a \ref pw_stream */ enum pw_direction direction, /**< the stream direction */ @@ -286,40 +282,45 @@ int pw_stream_disconnect(struct pw_stream *stream); void pw_stream_finish_format(struct pw_stream *stream, /**< a \ref pw_stream */ int res, /**< a result code */ - struct spa_pod **params, /**< an array of params. The params should + const struct spa_pod **params, /**< an array of params. The params should * ideally contain parameters for doing * buffer allocation. */ uint32_t n_params /**< number of elements in \a params */); + +/** Audio controls */ +#define PW_STREAM_CONTROL_VOLUME "volume" + +/** Video controls */ +#define PW_STREAM_CONTROL_CONTRAST "contrast" +#define PW_STREAM_CONTROL_BRIGHTNESS "brightness" +#define PW_STREAM_CONTROL_HUE "hue" +#define PW_STREAM_CONTROL_SATURATION "saturation" + +/** Set a control value */ +int pw_stream_set_control(struct pw_stream *stream, const char *name, float value); +/** Get a control value */ +int pw_stream_get_control(struct pw_stream *stream, const char *name, float *value); + /** Activate or deactivate the stream \memberof pw_stream */ int pw_stream_set_active(struct pw_stream *stream, bool active); +/** A time structure \memberof pw_stream */ +struct pw_time { + int64_t now; /**< the monotonic time */ + int64_t ticks; /**< the ticks at \a now */ + struct spa_fraction rate; /**< the rate of \a ticks */ +}; /** Query the time on the stream \memberof pw_stream */ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time); -/** Get the id of an empty buffer that can be filled \memberof pw_stream - * \return the id of an empty buffer or \ref SPA_ID_INVALID when no buffer is - * available. */ -uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream); +/** Get a buffer that can be filled for playback streams or consumed + * for capture streams. */ +struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream); -/** Recycle the buffer with \a id \memberof pw_stream - * \return 0 on success, < 0 when \a id is invalid or not a used buffer - * Let the PipeWire server know that it can reuse the buffer with \a id. */ -int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id); +/** Submit a buffer for playback or recycle a buffer for capture. */ +int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer); -/** Get the buffer with \a id from \a stream \memberof pw_stream - * \return a \ref spa_buffer or NULL when there is no buffer - * - * This function should be called from the new-buffer event. */ -struct spa_buffer * -pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id); - -/** Send a buffer with \a id to \a stream \memberof pw_stream - * \return 0 when \a id was handled, < 0 on error - * - * For provider or playback streams, this function should be called whenever - * there is a new buffer available. */ -int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id); #ifdef __cplusplus }