stream: improve stream API

Simplify the stream API. make just 2 methods to queue and dequeue
buffers. Make just one callback when new buffers can be dequeued.
Add support for driver nodes such as the video-src.
Pass a pw_buffer structure to add/remove_buffer and make it possible
to attach metadata to it. This makes it a lot easier to implement
the gstreamer pipewire pool.
Call the stream process function from the main loop and use a lockfree
ringbuffer to pass buffers between the threads. Make it possible to
also call process from the RT thread.
unmap the buffer data when needed.
This commit is contained in:
Wim Taymans 2018-03-22 16:40:27 +01:00
parent 97547d726f
commit f9ceedb714
11 changed files with 451 additions and 508 deletions

View file

@ -89,18 +89,17 @@ static void fill_f32(struct data *d, void *dest, int avail)
}
}
static void on_need_buffer(void *userdata)
static void on_process(void *userdata)
{
struct data *data = userdata;
uint32_t id;
struct pw_buffer *b;
struct spa_buffer *buf;
uint8_t *p;
id = pw_stream_get_empty_buffer(data->stream);
if (id == SPA_ID_INVALID)
if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL)
return;
buf = pw_stream_peek_buffer(data->stream, id);
buf = b->buffer;
if ((p = buf->datas[0].data) == NULL)
return;
@ -109,12 +108,12 @@ static void on_need_buffer(void *userdata)
buf->datas[0].chunk->size = buf->datas[0].maxsize;
pw_stream_send_buffer(data->stream, id);
pw_stream_queue_buffer(data->stream, b);
}
static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.need_buffer = on_need_buffer,
.process = on_process,
};
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)
@ -159,7 +158,8 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo
PW_DIRECTION_OUTPUT,
NULL,
PW_STREAM_FLAG_AUTOCONNECT |
PW_STREAM_FLAG_MAP_BUFFERS,
PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS,
params, 1);
break;
}

View file

@ -87,25 +87,31 @@ static void handle_events(struct data *data)
}
}
static int
do_render(struct spa_loop *loop, bool async, uint32_t seq,
const void *_data, size_t size, void *user_data)
static void
on_process(void *_data)
{
struct data *data = user_data;
struct spa_buffer *buf = ((struct spa_buffer **) _data)[0];
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct pw_buffer *b;
struct spa_buffer *buf;
void *sdata, *ddata;
int sstride, dstride, ostride;
uint32_t i;
uint8_t *src, *dst;
b = pw_stream_dequeue_buffer(stream);
buf = b->buffer;
pw_log_trace("new buffer %d", buf->id);
handle_events(data);
if ((sdata = buf->datas[0].data) == NULL)
return -EINVAL;
return;
if (SDL_LockTexture(data->texture, NULL, &ddata, &dstride) < 0) {
fprintf(stderr, "Couldn't lock texture: %s\n", SDL_GetError());
return -EIO;
return;
}
sstride = buf->datas[0].chunk->stride;
ostride = SPA_MIN(sstride, dstride);
@ -123,25 +129,8 @@ do_render(struct spa_loop *loop, bool async, uint32_t seq,
SDL_RenderCopy(data->renderer, data->texture, NULL, NULL);
SDL_RenderPresent(data->renderer);
return 0;
}
static void
on_stream_new_buffer(void *_data, uint32_t id)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct spa_buffer *buf;
buf = pw_stream_peek_buffer(stream, id);
pw_log_trace("new buffer %d", id);
pw_loop_invoke(pw_main_loop_get_loop(data->loop), do_render,
SPA_ID_INVALID, &buf, sizeof(struct spa_buffer *),
true, data);
pw_stream_recycle_buffer(stream, id);
pw_stream_queue_buffer(stream, b);
}
static void on_stream_state_changed(void *_data, enum pw_stream_state old,
@ -281,7 +270,7 @@ static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_stream_state_changed,
.format_changed = on_stream_format_changed,
.new_buffer = on_stream_new_buffer,
.process = on_process,
};
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)

View file

@ -71,7 +71,7 @@ struct data {
static void on_timeout(void *userdata, uint64_t expirations)
{
struct data *data = userdata;
uint32_t id;
struct pw_buffer *b;
struct spa_buffer *buf;
int i, j;
uint8_t *p, *map;
@ -79,13 +79,12 @@ static void on_timeout(void *userdata, uint64_t expirations)
pw_log_trace("timeout");
id = pw_stream_get_empty_buffer(data->stream);
if (id == SPA_ID_INVALID) {
b = pw_stream_dequeue_buffer(data->stream);
if (b == NULL) {
pw_log_warn("out of buffers");
return;
}
buf = pw_stream_peek_buffer(data->stream, id);
buf = b->buffer;
if (buf->datas[0].type == data->t->data.MemFd ||
buf->datas[0].type == data->t->data.DmaBuf) {
@ -129,7 +128,7 @@ static void on_timeout(void *userdata, uint64_t expirations)
buf->datas[0].chunk->size = buf->datas[0].maxsize;
pw_stream_send_buffer(data->stream, id);
pw_stream_queue_buffer(data->stream, b);
}
static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum pw_stream_state state,
@ -240,7 +239,7 @@ static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remo
pw_stream_connect(data->stream,
PW_DIRECTION_OUTPUT,
NULL, PW_STREAM_FLAG_NONE,
NULL, PW_STREAM_FLAG_DRIVER,
params, 1);
break;
}

View file

@ -21,8 +21,13 @@
#include "config.h"
#endif
#include <unistd.h>
#include <gst/gst.h>
#include <gst/allocators/gstfdmemory.h>
#include <gst/allocators/gstdmabuf.h>
#include "gstpipewirepool.h"
GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category);
@ -40,6 +45,8 @@ enum
static guint pool_signals[LAST_SIGNAL] = { 0 };
static GQuark pool_data_quark;
GstPipeWirePool *
gst_pipewire_pool_new (void)
{
@ -50,6 +57,74 @@ gst_pipewire_pool_new (void)
return pool;
}
static void
pool_data_destroy (gpointer user_data)
{
GstPipeWirePoolData *data = user_data;
gst_object_unref (data->pool);
g_slice_free (GstPipeWirePoolData, data);
}
void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
{
GstBuffer *buf;
uint32_t i;
GstPipeWirePoolData *data;
struct pw_type *t = pool->t;
GST_LOG_OBJECT (pool, "wrap buffer");
data = g_slice_new (GstPipeWirePoolData);
buf = gst_buffer_new ();
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *gmem = NULL;
GST_LOG_OBJECT (pool, "wrap buffer %d %d", d->mapoffset, d->maxsize);
if (d->type == t->data.MemFd) {
gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup (d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data->offset = d->mapoffset;
}
else if(d->type == t->data.DmaBuf) {
gmem = gst_dmabuf_allocator_alloc (pool->dmabuf_allocator, dup (d->fd),
d->mapoffset + d->maxsize);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data->offset = d->mapoffset;
}
else if (d->type == t->data.MemPtr) {
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
data->offset = 0;
}
if (gmem)
gst_buffer_append_memory (buf, gmem);
}
data->pool = gst_object_ref (pool);
data->owner = NULL;
data->header = spa_buffer_find_meta (b->buffer, t->meta.Header);
data->flags = GST_BUFFER_FLAGS (buf);
data->b = b;
data->buf = buf;
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf),
pool_data_quark,
data,
pool_data_destroy);
b->user_data = data;
}
GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer)
{
return gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), pool_data_quark);
}
#if 0
gboolean
gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer)
{
@ -78,25 +153,31 @@ gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer)
return res;
}
#endif
static GstFlowReturn
acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolAcquireParams * params)
{
GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool);
GstPipeWirePoolData *data;
struct pw_buffer *b;
GST_OBJECT_LOCK (pool);
while (TRUE) {
if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool)))
goto flushing;
if (p->available.length > 0)
if ((b = pw_stream_dequeue_buffer(p->stream)))
break;
GST_WARNING ("queue empty");
g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool));
}
*buffer = g_queue_pop_head (&p->available);
data = b->user_data;
*buffer = data->buf;
GST_OBJECT_UNLOCK (pool);
GST_DEBUG ("acquire buffer %p", *buffer);
@ -123,13 +204,7 @@ flush_start (GstBufferPool * pool)
static void
release_buffer (GstBufferPool * pool, GstBuffer *buffer)
{
GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool);
GST_DEBUG ("release buffer %p", buffer);
GST_OBJECT_LOCK (pool);
g_queue_push_tail (&p->available, buffer);
g_cond_signal (&p->cond);
GST_OBJECT_UNLOCK (pool);
}
static gboolean
@ -145,6 +220,8 @@ gst_pipewire_pool_finalize (GObject * object)
GstPipeWirePool *pool = GST_PIPEWIRE_POOL (object);
GST_DEBUG_OBJECT (pool, "finalize");
g_object_unref (pool->fd_allocator);
g_object_unref (pool->dmabuf_allocator);
G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object);
}
@ -168,11 +245,14 @@ gst_pipewire_pool_class_init (GstPipeWirePoolClass * klass)
GST_DEBUG_CATEGORY_INIT (gst_pipewire_pool_debug_category, "pipewirepool", 0,
"debug category for pipewirepool object");
pool_data_quark = g_quark_from_static_string ("GstPipeWirePoolDataQuark");
}
static void
gst_pipewire_pool_init (GstPipeWirePool * pool)
{
pool->fd_allocator = gst_fd_allocator_new ();
pool->dmabuf_allocator = gst_dmabuf_allocator_new ();
g_cond_init (&pool->cond);
g_queue_init (&pool->available);
}

View file

@ -39,14 +39,29 @@ G_BEGIN_DECLS
#define GST_PIPEWIRE_POOL_GET_CLASS(klass) \
(G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PIPEWIRE_POOL, GstPipeWirePoolClass))
typedef struct _GstPipeWirePoolData GstPipeWirePoolData;
typedef struct _GstPipeWirePool GstPipeWirePool;
typedef struct _GstPipeWirePoolClass GstPipeWirePoolClass;
struct _GstPipeWirePoolData {
GstPipeWirePool *pool;
void *owner;
struct spa_meta_header *header;
guint flags;
goffset offset;
struct pw_buffer *b;
GstBuffer *buf;
};
struct _GstPipeWirePool {
GstBufferPool parent;
struct pw_stream *stream;
GQueue available;
struct pw_type *t;
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
GCond cond;
};
@ -58,8 +73,12 @@ GType gst_pipewire_pool_get_type (void);
GstPipeWirePool * gst_pipewire_pool_new (void);
gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *buffer);
GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer);
//gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
//gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
G_END_DECLS

View file

@ -37,14 +37,9 @@
#include <stdlib.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <gst/allocators/gstfdmemory.h>
#include "gstpipewireformat.h"
static GQuark process_mem_data_quark;
GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug);
#define GST_CAT_DEFAULT pipewire_sink_debug
@ -123,10 +118,8 @@ gst_pipewire_sink_finalize (GObject * object)
if (pwsink->properties)
gst_structure_free (pwsink->properties);
g_object_unref (pwsink->allocator);
g_free (pwsink->path);
g_free (pwsink->client_name);
g_hash_table_unref (pwsink->buf_ids);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -219,8 +212,6 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass)
GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0,
"PipeWire Sink");
process_mem_data_quark = g_quark_from_static_string ("GstPipeWireSinkProcessMemQuark");
}
#define PROP_RANGE(min,max) 2,min,max
@ -273,7 +264,6 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink)
static void
gst_pipewire_sink_init (GstPipeWireSink * sink)
{
sink->allocator = gst_fd_allocator_new ();
sink->pool = gst_pipewire_pool_new ();
sink->client_name = pw_get_client_name();
sink->mode = DEFAULT_PROP_MODE;
@ -281,15 +271,13 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink);
sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
(GDestroyNotify) gst_buffer_unref);
g_queue_init (&sink->queue);
sink->loop = pw_loop_new (NULL);
sink->main_loop = pw_thread_loop_new (sink->loop, "pipewire-sink-loop");
sink->core = pw_core_new (sink->loop, NULL);
sink->type = pw_core_get_type (sink->core);
sink->pool->t = sink->type;
GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop);
}
@ -407,126 +395,35 @@ gst_pipewire_sink_get_property (GObject * object, guint prop_id,
}
}
typedef struct {
GstPipeWireSink *sink;
guint id;
struct spa_buffer *buf;
struct spa_meta_header *header;
guint flags;
goffset offset;
} ProcessMemData;
static void
process_mem_data_destroy (gpointer user_data)
{
ProcessMemData *data = user_data;
gst_object_unref (data->sink);
g_slice_free (ProcessMemData, data);
}
static void
on_add_buffer (void *_data,
uint32_t id)
on_add_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = _data;
struct spa_buffer *b;
GstBuffer *buf;
uint32_t i;
ProcessMemData data;
struct pw_type *t = pwsink->type;
GST_LOG_OBJECT (pwsink, "add buffer");
if (!(b = pw_stream_peek_buffer (pwsink->stream, id))) {
g_warning ("failed to peek buffer");
return;
}
buf = gst_buffer_new ();
data.sink = gst_object_ref (pwsink);
data.id = id;
data.buf = b;
data.header = spa_buffer_find_meta (b, t->meta.Header);
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *gmem = NULL;
if (d->type == t->data.MemFd ||
d->type == t->data.DmaBuf) {
gmem = gst_fd_allocator_alloc (pwsink->allocator, dup (d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data.offset = d->mapoffset;
}
else if (d->type == t->data.MemPtr) {
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
data.offset = 0;
}
if (gmem)
gst_buffer_append_memory (buf, gmem);
}
data.flags = GST_BUFFER_FLAGS (buf);
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf),
process_mem_data_quark,
g_slice_dup (ProcessMemData, &data),
process_mem_data_destroy);
gst_pipewire_pool_add_buffer (pwsink->pool, buf);
g_hash_table_insert (pwsink->buf_ids, GINT_TO_POINTER (id), buf);
gst_pipewire_pool_wrap_buffer (pwsink->pool, b);
pw_thread_loop_signal (pwsink->main_loop, FALSE);
}
static void
on_remove_buffer (void *data,
uint32_t id)
on_remove_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = data;
GstBuffer *buf;
GstPipeWireSink *pwsink = _data;
GstPipeWirePoolData *data = b->user_data;
GST_LOG_OBJECT (pwsink, "remove buffer");
buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id));
if (buf) {
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
if (!gst_pipewire_pool_remove_buffer (pwsink->pool, buf))
gst_buffer_ref (buf);
if (g_queue_remove (&pwsink->queue, buf))
gst_buffer_unref (buf);
g_hash_table_remove (pwsink->buf_ids, GINT_TO_POINTER (id));
}
}
static void
on_new_buffer (void *data,
uint32_t id)
{
GstPipeWireSink *pwsink = data;
GstBuffer *buf;
GST_LOG_OBJECT (pwsink, "got new buffer %u", id);
if (pwsink->stream == NULL) {
GST_LOG_OBJECT (pwsink, "no stream");
return;
}
buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id));
if (buf) {
gst_buffer_unref (buf);
pw_thread_loop_signal (pwsink->main_loop, FALSE);
}
if (g_queue_remove (&pwsink->queue, data->buf))
gst_buffer_unref (data->buf);
gst_buffer_unref (data->buf);
}
static void
do_send_buffer (GstPipeWireSink *pwsink)
{
GstBuffer *buffer;
ProcessMemData *data;
GstPipeWirePoolData *data;
gboolean res;
guint i;
struct spa_buffer *b;
buffer = g_queue_pop_head (&pwsink->queue);
if (buffer == NULL) {
@ -534,23 +431,24 @@ do_send_buffer (GstPipeWireSink *pwsink)
return;
}
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer),
process_mem_data_quark);
data = gst_pipewire_pool_get_data(buffer);
b = data->b->buffer;
if (data->header) {
data->header->seq = GST_BUFFER_OFFSET (buffer);
data->header->pts = GST_BUFFER_PTS (buffer);
data->header->dts_offset = GST_BUFFER_DTS (buffer);
}
for (i = 0; i < data->buf->n_datas; i++) {
struct spa_data *d = &data->buf->datas[i];
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
d->chunk->offset = mem->offset - data->offset;
d->chunk->size = mem->size;
}
if (!(res = pw_stream_send_buffer (pwsink->stream, data->id))) {
g_warning ("can't send buffer");
if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) {
g_warning ("can't send buffer %s", spa_strerror(res));
pw_thread_loop_signal (pwsink->main_loop, FALSE);
} else
pwsink->need_ready--;
@ -558,9 +456,17 @@ do_send_buffer (GstPipeWireSink *pwsink)
static void
on_need_buffer (void *data)
on_process (void *data)
{
GstPipeWireSink *pwsink = data;
if (pwsink->stream == NULL) {
GST_LOG_OBJECT (pwsink, "no stream");
return;
}
g_cond_signal (&pwsink->pool->cond);
pwsink->need_ready++;
GST_DEBUG ("need buffer %u", pwsink->need_ready);
do_send_buffer (pwsink);
@ -733,8 +639,7 @@ static const struct pw_stream_events stream_events = {
.format_changed = on_format_changed,
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.new_buffer = on_new_buffer,
.need_buffer = on_need_buffer,
.process = on_process,
};
static gboolean

View file

@ -89,12 +89,10 @@ struct _GstPipeWireSink {
struct pw_stream *stream;
struct spa_hook stream_listener;
GstAllocator *allocator;
GstStructure *properties;
GstPipeWireSinkMode mode;
GstPipeWirePool *pool;
GHashTable *buf_ids;
GQueue queue;
guint need_ready;
};

View file

@ -211,13 +211,10 @@ gst_pipewire_src_finalize (GObject * object)
if (pwsrc->properties)
gst_structure_free (pwsrc->properties);
g_object_unref (pwsrc->fd_allocator);
g_object_unref (pwsrc->dmabuf_allocator);
if (pwsrc->clock)
gst_object_unref (pwsrc->clock);
g_free (pwsrc->path);
g_free (pwsrc->client_name);
g_hash_table_unref (pwsrc->buf_ids);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -322,127 +319,60 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
g_queue_init (&src->queue);
src->fd_allocator = gst_fd_allocator_new ();
src->dmabuf_allocator = gst_dmabuf_allocator_new ();
src->client_name = pw_get_client_name ();
src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref);
src->pool = gst_pipewire_pool_new ();
src->loop = pw_loop_new (NULL);
src->main_loop = pw_thread_loop_new (src->loop, "pipewire-main-loop");
src->core = pw_core_new (src->loop, NULL);
src->type = pw_core_get_type (src->core);
src->pool->t = src->type;
GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop);
}
typedef struct {
GstPipeWireSrc *src;
guint id;
struct spa_buffer *buf;
struct spa_meta_header *header;
guint flags;
goffset offset;
} ProcessMemData;
static void
process_mem_data_destroy (gpointer user_data)
{
ProcessMemData *data = user_data;
gst_object_unref (data->src);
g_slice_free (ProcessMemData, data);
}
static gboolean
buffer_recycle (GstMiniObject *obj)
{
ProcessMemData *data;
GstPipeWireSrc *src;
GstPipeWirePoolData *data;
gst_mini_object_ref (obj);
data = gst_mini_object_get_qdata (obj,
process_mem_data_quark);
data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
GST_BUFFER_FLAGS (obj) = data->flags;
src = data->src;
src = data->owner;
GST_LOG_OBJECT (obj, "recycle buffer");
pw_thread_loop_lock (src->main_loop);
pw_stream_recycle_buffer (src->stream, data->id);
pw_stream_queue_buffer (src->stream, data->b);
pw_thread_loop_unlock (src->main_loop);
return FALSE;
}
static void
on_add_buffer (void *_data, guint id)
on_add_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSrc *pwsrc = _data;
struct spa_buffer *b;
GstBuffer *buf;
uint32_t i;
ProcessMemData data;
struct pw_core *core = pwsrc->core;
struct pw_type *t = pw_core_get_type(core);
GstPipeWirePoolData *data;
GST_LOG_OBJECT (pwsrc, "add buffer");
if (!(b = pw_stream_peek_buffer (pwsrc->stream, id))) {
g_warning ("failed to peek buffer");
return;
}
buf = gst_buffer_new ();
GST_MINI_OBJECT_CAST (buf)->dispose = buffer_recycle;
data.src = gst_object_ref (pwsrc);
data.id = id;
data.buf = b;
data.header = spa_buffer_find_meta (b, t->meta.Header);
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *gmem = NULL;
if (d->type == t->data.MemFd) {
gmem = gst_fd_allocator_alloc (pwsrc->fd_allocator, dup (d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data.offset = d->mapoffset;
}
else if(d->type == t->data.DmaBuf) {
gmem = gst_dmabuf_allocator_alloc (pwsrc->dmabuf_allocator, dup (d->fd),
d->mapoffset + d->maxsize);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data.offset = d->mapoffset;
}
else if (d->type == t->data.MemPtr) {
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
data.offset = 0;
}
if (gmem)
gst_buffer_append_memory (buf, gmem);
}
data.flags = GST_BUFFER_FLAGS (buf);
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf),
process_mem_data_quark,
g_slice_dup (ProcessMemData, &data),
process_mem_data_destroy);
g_hash_table_insert (pwsrc->buf_ids, GINT_TO_POINTER (id), buf);
gst_pipewire_pool_wrap_buffer (pwsrc->pool, b);
data = b->user_data;
data->owner = pwsrc;
GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle;
}
static void
on_remove_buffer (void *data,
guint id)
on_remove_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSrc *pwsrc = data;
GstBuffer *buf;
GstPipeWireSrc *pwsrc = _data;
GstPipeWirePoolData *data = b->user_data;
GstBuffer *buf = data->buf;
GList *walk;
GST_LOG_OBJECT (pwsrc, "remove buffer");
buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id));
if (buf) {
GList *walk;
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
@ -456,29 +386,27 @@ on_remove_buffer (void *data,
}
walk = next;
}
g_hash_table_remove (pwsrc->buf_ids, GINT_TO_POINTER (id));
}
}
static void
on_new_buffer (void *_data,
guint id)
on_process (void *_data)
{
GstPipeWireSrc *pwsrc = _data;
struct pw_buffer *b;
GstBuffer *buf;
ProcessMemData *data;
GstPipeWirePoolData *data;
struct spa_meta_header *h;
guint i;
buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id));
if (buf == NULL) {
g_warning ("unknown buffer %d", id);
b = pw_stream_dequeue_buffer (pwsrc->stream);
if (b == NULL)
return;
}
data = b->user_data;
buf = data->buf;
GST_LOG_OBJECT (pwsrc, "got new buffer %p", buf);
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf),
process_mem_data_quark);
h = data->header;
if (h) {
GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset);
@ -490,8 +418,8 @@ on_new_buffer (void *_data,
}
GST_BUFFER_OFFSET (buf) = h->seq;
}
for (i = 0; i < data->buf->n_datas; i++) {
struct spa_data *d = &data->buf->datas[i];
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buf, i);
mem->offset = SPA_MIN(d->chunk->offset, d->maxsize);
mem->size = SPA_MIN(d->chunk->size, d->maxsize - mem->offset);
@ -1041,7 +969,7 @@ static const struct pw_stream_events stream_events = {
.format_changed = on_format_changed,
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.new_buffer = on_new_buffer,
.process = on_process,
};
static gboolean

View file

@ -24,6 +24,7 @@
#include <gst/base/gstpushsrc.h>
#include <pipewire/pipewire.h>
#include <gst/gstpipewirepool.h>
G_BEGIN_DECLS
@ -76,11 +77,9 @@ struct _GstPipeWireSrc {
struct pw_stream *stream;
struct spa_hook stream_listener;
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
GstStructure *properties;
GHashTable *buf_ids;
GstPipeWirePool *pool;
GQueue queue;
GstClock *clock;
};

View file

@ -28,6 +28,7 @@
#include <spa/param/audio/format-utils.h>
#include <spa/param/props.h>
#include <spa/node/io.h>
#include <spa/utils/ringbuffer.h>
#include <spa/lib/debug.h>
#include <spa/lib/pod.h>
@ -50,12 +51,15 @@ static inline void init_type(struct type *type, struct spa_type_map *map)
}
struct buffer {
struct spa_list link;
struct pw_buffer this;
uint32_t id;
#define BUFFER_FLAG_READY (1 << 0)
#define BUFFER_FLAG_MAPPED (1 << 1)
#define BUFFER_FLAG_MAPPED (1 << 0)
uint32_t flags;
struct spa_buffer *buffer;
};
struct queue {
uint32_t ids[MAX_BUFFERS];
struct spa_ringbuffer ring;
};
struct stream {
@ -81,11 +85,9 @@ struct stream {
struct buffer buffers[MAX_BUFFERS];
int n_buffers;
struct spa_list free;
struct spa_list ready;
bool in_need_buffer;
bool in_new_buffer;
struct queue dequeued;
struct queue queued;
uint32_t n_init_params;
struct spa_pod **init_params;
@ -104,34 +106,27 @@ struct stream {
int64_t last_monotonic;
};
static inline void queue_free(struct stream *stream, struct buffer *buffer)
static inline void push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
{
spa_list_append(&stream->free, &buffer->link);
uint32_t index;
spa_ringbuffer_get_write_index(&queue->ring, &index);
queue->ids[index & (MAX_BUFFERS-1)] = buffer->id;
spa_ringbuffer_write_update(&queue->ring, index + 1);
}
static inline struct buffer *dequeue_free(struct stream *stream)
static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue)
{
struct buffer *b = NULL;
if (!spa_list_is_empty(&stream->free)) {
b = spa_list_first(&stream->free, struct buffer, link);
spa_list_remove(&b->link);
}
return b;
}
int32_t avail;
uint32_t index, id;
static inline void queue_ready(struct stream *stream, struct buffer *buffer)
{
spa_list_append(&stream->ready, &buffer->link);
}
if ((avail = spa_ringbuffer_get_read_index(&queue->ring, &index)) <= 0)
return NULL;
static inline struct buffer *dequeue_ready(struct stream *stream)
{
struct buffer *b = NULL;
if (!spa_list_is_empty(&stream->ready)) {
b = spa_list_first(&stream->ready, struct buffer, link);
spa_list_remove(&b->link);
}
return b;
id = queue->ids[index & (MAX_BUFFERS-1)];
spa_ringbuffer_read_update(&queue->ring, index + 1);
return &stream->buffers[id];
}
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error)
@ -162,6 +157,27 @@ static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id)
return NULL;
}
static int
do_call_process(struct spa_loop *loop,
bool async, uint32_t seq, const void *data, size_t size, void *user_data)
{
struct stream *impl = user_data;
struct pw_stream *stream = &impl->this;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, process);
return 0;
}
static void call_process(struct stream *impl)
{
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_RT_PROCESS)) {
do_call_process(NULL, false, 1, NULL, 0, impl);
}
else {
pw_loop_invoke(impl->core->main_loop,
do_call_process, 1, NULL, 0, false, impl);
}
}
static int impl_send_command(struct spa_node *node, const struct spa_command *command)
{
struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node);
@ -181,10 +197,7 @@ static int impl_send_command(struct spa_node *node, const struct spa_command *co
impl->io->status = SPA_STATUS_NEED_BUFFER;
}
else {
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
need_buffer);
impl->in_need_buffer = false;
call_process(impl);
}
stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL);
}
@ -375,45 +388,111 @@ static int impl_port_set_param(struct spa_node *node,
return -ENOENT;
}
static int map_data(struct stream *impl, struct spa_data *data, int prot)
{
void *ptr;
struct pw_map_range range;
pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize);
ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset);
if (ptr == MAP_FAILED) {
pw_log_error("stream %p: failed to mmap buffer mem: %m", impl);
return -errno;
}
data->data = SPA_MEMBER(ptr, range.start, void);
pw_log_debug("stream %p: fd %d mapped %d %d %p", impl, data->fd,
range.offset, range.size, data->data);
return 0;
}
static int unmap_data(struct stream *impl, struct spa_data *data)
{
struct pw_map_range range;
pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->core->sc_pagesize);
if (munmap(SPA_MEMBER(data->data, -range.start, void), range.size) < 0)
pw_log_warn("failed to unmap: %m");
pw_log_debug("stream %p: fd %d unmapped", impl, data->fd);
return 0;
}
static void clear_buffers(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int i, j;
pw_log_debug("stream %p: clear buffers", stream);
for (i = 0; i < impl->n_buffers; i++) {
struct buffer *b = &impl->buffers[i];
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
remove_buffer, &b->this);
if (SPA_FLAG_CHECK(b->flags, BUFFER_FLAG_MAPPED)) {
for (j = 0; j < b->this.buffer->n_datas; j++) {
struct spa_data *d = &b->this.buffer->datas[j];
pw_log_debug("stream %p: clear buffer %d mem",
stream, b->id);
unmap_data(impl, d);
}
}
}
impl->n_buffers = 0;
spa_ringbuffer_init(&impl->dequeued.ring);
spa_ringbuffer_init(&impl->queued.ring);
}
static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direction, uint32_t port_id,
struct spa_buffer **buffers, uint32_t n_buffers)
{
struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node);
struct pw_stream *stream = &d->this;
struct pw_type *t = d->t;
int i, prot;
struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node);
struct pw_stream *stream = &impl->this;
struct pw_type *t = impl->t;
int i, j, prot, res;
prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
clear_buffers(stream);
for (i = 0; i < n_buffers; i++) {
struct buffer *b = &d->buffers[i];
struct spa_data *datas = buffers[i]->datas;
struct buffer *b = &impl->buffers[i];
int size = 0;
b->flags = 0;
b->id = buffers[i]->id;
if (datas[0].type == t->data.MemFd ||
datas[0].type == t->data.DmaBuf) {
void *ptr;
ptr = mmap(NULL, datas[0].maxsize + datas[0].mapoffset, prot,
MAP_SHARED, datas[0].fd, 0);
if (ptr == MAP_FAILED) {
pw_log_error("failed to mmap buffer mem");
return -errno;
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS)) {
for (j = 0; j < buffers[i]->n_datas; j++) {
struct spa_data *d = &buffers[i]->datas[j];
if (d->type == t->data.MemFd ||
d->type == t->data.DmaBuf) {
if ((res = map_data(impl, d, prot)) < 0)
return res;
}
datas[0].data = SPA_MEMBER(ptr, datas[0].mapoffset, void);
SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED);
}
else if (datas[0].data == NULL) {
else if (d->data == NULL) {
pw_log_error("invalid buffer mem");
return -EINVAL;
}
b->buffer = buffers[i];
pw_log_info("got buffer %d size %d", i, datas[0].maxsize);
queue_free(d, b);
size += d->maxsize;
}
d->n_buffers = n_buffers;
SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED);
}
b->this.buffer = buffers[i];
pw_log_info("got buffer %d %d datas, total size %d", i,
buffers[i]->n_datas, size);
if (impl->direction == SPA_DIRECTION_OUTPUT)
push_queue(impl, &impl->dequeued, b);
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
add_buffer, &b->this);
}
impl->n_buffers = n_buffers;
if (n_buffers > 0)
stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
@ -423,71 +502,63 @@ static int impl_port_use_buffers(struct spa_node *node, enum spa_direction direc
return 0;
}
static inline void reuse_buffer(struct stream *d, uint32_t id)
{
pw_log_trace("export-source %p: recycle buffer %d", d, id);
if (id < d->n_buffers)
queue_free(d, &d->buffers[id]);
}
static int impl_port_reuse_buffer(struct spa_node *node, uint32_t port_id, uint32_t buffer_id)
{
struct stream *d = SPA_CONTAINER_OF(node, struct stream, impl_node);
reuse_buffer(d, buffer_id);
pw_log_trace("export-source %p: recycle buffer %d", d, buffer_id);
if (buffer_id < d->n_buffers)
push_queue(d, &d->queued, &d->buffers[buffer_id]);
return 0;
}
static int impl_node_process(struct spa_node *node)
static int impl_node_process_input(struct spa_node *node)
{
struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node);
struct pw_stream *stream = &impl->this;
struct spa_io_buffers *io = impl->io;
struct buffer *b;
uint32_t buffer_id;
int res;
if (impl->direction == SPA_DIRECTION_INPUT) {
buffer_id = io->buffer_id;
pw_log_trace("stream %p: process input %d %d", stream, io->status, io->buffer_id);
pw_log_trace("stream %p: process input %d %d", stream, io->status,
buffer_id);
if (io->status != SPA_STATUS_HAVE_BUFFER)
goto done;
if ((b = find_buffer(stream, buffer_id)) == NULL)
return SPA_STATUS_NEED_BUFFER;
if ((b = find_buffer(stream, io->buffer_id)) == NULL)
goto done;
queue_ready(impl, b);
push_queue(impl, &impl->dequeued, b);
call_process(impl);
if (impl->client_reuse)
io->buffer_id = SPA_ID_INVALID;
if (io->status == SPA_STATUS_HAVE_BUFFER) {
impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, buffer_id);
impl->in_new_buffer = false;
}
done:
io->status = SPA_STATUS_NEED_BUFFER;
res = SPA_STATUS_NEED_BUFFER;
} else {
reuse_buffer(impl, io->buffer_id);
io->buffer_id = SPA_ID_INVALID;
return SPA_STATUS_HAVE_BUFFER;
}
pw_log_trace("stream %p: process output", stream);
static int impl_node_process_output(struct spa_node *node)
{
struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node);
struct pw_stream *stream = &impl->this;
struct spa_io_buffers *io = impl->io;
struct buffer *b;
if ((b = dequeue_ready(impl)) == NULL) {
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
need_buffer);
impl->in_need_buffer = false;
b = dequeue_ready(impl);
}
if (b != NULL) {
pw_log_trace("stream %p: process out %d %d", stream, io->status, io->buffer_id);
if ((b = find_buffer(stream, io->buffer_id)) != NULL)
push_queue(impl, &impl->dequeued, b);
if ((b = pop_queue(impl, &impl->queued)) != NULL) {
io->buffer_id = b->id;
io->status = SPA_STATUS_HAVE_BUFFER;
} else {
io->buffer_id = SPA_ID_INVALID;
io->status = SPA_STATUS_NEED_BUFFER;
}
res = SPA_STATUS_HAVE_BUFFER;
}
return res;
call_process(impl);
return SPA_STATUS_HAVE_BUFFER;
}
static const struct spa_node impl_node = {
@ -502,7 +573,6 @@ static const struct spa_node impl_node = {
.port_set_param = impl_port_set_param,
.port_use_buffers = impl_port_use_buffers,
.port_reuse_buffer = impl_port_reuse_buffer,
.process = impl_node_process,
};
#if 0
@ -565,8 +635,8 @@ struct pw_stream * pw_stream_new(struct pw_remote *remote, const char *name,
str = pw_properties_get(props, "pipewire.client.reuse");
impl->client_reuse = str && pw_properties_parse_bool(str);
spa_list_init(&impl->free);
spa_list_init(&impl->ready);
spa_ringbuffer_init(&impl->dequeued.ring);
spa_ringbuffer_init(&impl->queued.ring);
spa_hook_list_init(&this->listener_list);
@ -760,6 +830,12 @@ pw_stream_connect(struct pw_stream *stream,
impl->node = pw_node_new(impl->core, "export-source",
pw_properties_copy(stream->properties), 0);
impl->impl_node = impl_node;
if (impl->direction == SPA_DIRECTION_INPUT)
impl->impl_node.process = impl_node_process_input;
else
impl->impl_node.process = impl_node_process_output;
pw_node_set_implementation(impl->node, &impl->impl_node);
pw_node_register(impl->node, NULL, NULL, NULL);
@ -825,41 +901,18 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
return 0;
}
uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if ((b = dequeue_free(impl)) == NULL)
return SPA_ID_INVALID;
return b->id;
}
int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
if ((b = find_buffer(stream, id)) == NULL)
return -EINVAL;
queue_free(impl, b);
if (impl->in_new_buffer) {
struct spa_io_buffers *io = impl->io;
io->buffer_id = id;
}
return 0;
}
struct spa_buffer *
pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id)
{
struct buffer *b;
if ((b = find_buffer(stream, id)))
return b->buffer;
if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
pw_log_trace("stream %p: no more buffers", stream);
return NULL;
}
pw_log_trace("stream %p: dequeue buffer %d", stream, b->id);
return &b->this;
}
static int
@ -871,21 +924,20 @@ do_process(struct spa_loop *loop,
return 0;
}
int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id)
int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
struct spa_io_buffers *io = impl->io;
if ((b = find_buffer(stream, id))) {
queue_ready(impl, b);
pw_log_trace("stream %p: send buffer %d %p", stream, id, io);
if ((b = find_buffer(stream, buffer->buffer->id)) == NULL)
return -EINVAL;
if (!impl->in_need_buffer)
pw_log_trace("stream %p: queue buffer %d", stream, b->id);
push_queue(impl, &impl->queued, b);
if (SPA_FLAG_CHECK(impl->flags, PW_STREAM_FLAG_DRIVER)) {
pw_loop_invoke(impl->core->data_loop,
do_process, 1, NULL, 0, false, impl);
} else {
pw_log_debug("stream %p: output %u was used", stream, id);
}
return 0;
}

View file

@ -59,18 +59,6 @@ extern "C" {
* The stream is initially unconnected. To connect the stream, use
* \ref pw_stream_connect(). Pass the desired direction as an argument.
*
* \subsection ssec_stream_mode Stream modes
*
* The stream mode specifies how the data will be exchanged with PipeWire.
* The following stream modes are available
*
* \li \ref PW_STREAM_MODE_BUFFER: data is exchanged with fixed size
* buffers. This is ideal for video frames or equal sized audio
* frames.
* \li \ref PW_STREAM_MODE_RINGBUFFER: data is exhanged with a fixed
* size ringbuffer. This is ideal for variable sized audio packets
* or compressed media.
*
* \subsection ssec_stream_target Stream target
*
* To make the newly connected stream automatically connect to an existing
@ -107,7 +95,8 @@ extern "C" {
* between client and server.
*
* With the add_buffer event, a stream will be notified of a new buffer
* that can be used for data transport.
* that can be used for data transport. You can attach user_data to these
* buffers.
*
* Afer the buffers are negotiated, the stream will transition to the
* \ref PW_STREAM_STATE_PAUSED state.
@ -124,28 +113,23 @@ extern "C" {
*
* \subsection ssec_consume Consume data
*
* The new_buffer event is emited for each new buffer can can be
* The process event is emited for each new buffer that can can be
* consumed.
*
* \ref pw_stream_peek_buffer() should be used to get the data and metadata
* of the buffer.
* \ref pw_stream_dequeue_buffer() should be used to get the data and
* metadata of the buffer.
*
* When the buffer is no longer in use, call \ref pw_stream_recycle_buffer()
* When the buffer is no longer in use, call \ref pw_stream_queue_buffer()
* to let PipeWire reuse the buffer.
*
* \subsection ssec_produce Produce data
*
* The need_buffer event is emited when PipeWire needs a new buffer for this
* stream.
* \ref pw_stream_dequeue_buffer() gives an empty buffer that can be filled.
*
* \ref pw_stream_get_empty_buffer() gives the id of an empty buffer.
* Use \ref pw_stream_peek_buffer() to get the data and metadata that should
* be filled.
* Filled buffers should be queued with \ref pw_stream_queue_buffer().
*
* To send the filled buffer, use \ref pw_stream_send_buffer().
*
* The new_buffer event is emited when PipeWire no longer uses the buffer
* and it can be safely reused.
* The process event is emited when PipeWire has emptied a buffer that
* can now be refilled.
*
* \section sec_stream_disconnect Disconnect
*
@ -179,6 +163,11 @@ enum pw_stream_state {
PW_STREAM_STATE_STREAMING = 5 /**< streaming */
};
struct pw_buffer {
struct spa_buffer *buffer; /* the spa buffer */
void *user_data; /* user data attached to the buffer */
};
/** Events for a stream */
struct pw_stream_events {
#define PW_VERSION_STREAM_EVENTS 0
@ -194,14 +183,15 @@ struct pw_stream_events {
void (*format_changed) (void *data, struct spa_pod *format);
/** when a new buffer was created for this stream */
void (*add_buffer) (void *data, uint32_t id);
void (*add_buffer) (void *data, struct pw_buffer *buffer);
/** when a buffer was destroyed for this stream */
void (*remove_buffer) (void *data, uint32_t id);
/** when a buffer can be reused (for playback streams) or
* is filled (for capture streams */
void (*new_buffer) (void *data, uint32_t id);
/** when a buffer is needed (for playback streams) */
void (*need_buffer) (void *data);
void (*remove_buffer) (void *data, struct pw_buffer *buffer);
/** when a buffer can be queued (for playback streams) or
* dequeued (for capture streams). This is normally called from the
* mainloop but can also be called directly from the realtime data
* thread if the user is prepared to deal with this. */
void (*process) (void *data);
};
/** Convert a stream state to a readable string \memberof pw_stream */
@ -216,6 +206,8 @@ enum pw_stream_flags {
* this stream */
PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */
PW_STREAM_FLAG_MAP_BUFFERS = (1 << 3), /**< mmap the buffers */
PW_STREAM_FLAG_DRIVER = (1 << 4), /**< be a driver */
PW_STREAM_FLAG_RT_PROCESS = (1 << 5), /**< call process from the realtime thread */
};
/** A time structure \memberof pw_stream */
@ -256,9 +248,8 @@ const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream);
/** Connect a stream for input or output on \a port_path. \memberof pw_stream
* \return 0 on success < 0 on error.
*
* When \a mode is \ref PW_STREAM_MODE_BUFFER, you should connect to the new-buffer
* event and use pw_stream_peek_buffer() to get the latest metadata and
* data. */
* You should connect to the process event and use pw_stream_dequeue_buffer()
* to get the latest metadata and data. */
int
pw_stream_connect(struct pw_stream *stream, /**< a \ref pw_stream */
enum pw_direction direction, /**< the stream direction */
@ -298,29 +289,12 @@ int pw_stream_set_active(struct pw_stream *stream, bool active);
/** Query the time on the stream \memberof pw_stream */
int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time);
/** Get the id of an empty buffer that can be filled \memberof pw_stream
* \return the id of an empty buffer or \ref SPA_ID_INVALID when no buffer is
* available. */
uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream);
/** Get a buffer that can be filled for playback streams or consumed
* for capture streams. */
struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream);
/** Recycle the buffer with \a id \memberof pw_stream
* \return 0 on success, < 0 when \a id is invalid or not a used buffer
* Let the PipeWire server know that it can reuse the buffer with \a id. */
int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id);
/** Get the buffer with \a id from \a stream \memberof pw_stream
* \return a \ref spa_buffer or NULL when there is no buffer
*
* This function should be called from the new-buffer event. */
struct spa_buffer *
pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id);
/** Send a buffer with \a id to \a stream \memberof pw_stream
* \return 0 when \a id was handled, < 0 on error
*
* For provider or playback streams, this function should be called whenever
* there is a new buffer available. */
int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id);
/** Submit a buffer for playback or recycle a buffer for capture. */
int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer);
#ifdef __cplusplus
}