stream: API break

Add pw_buffer
Add queue/dequeue methods, remove old methods
Add get and set for properties
Update gst elements and examples

Update the API to work branch which is more future proof
This commit is contained in:
Wim Taymans 2018-07-08 14:47:43 +02:00
parent 4574adcd2e
commit 67e567b9c7
11 changed files with 454 additions and 527 deletions

View file

@ -139,19 +139,19 @@ do_render(struct spa_loop *loop, bool async, uint32_t seq,
}
static void
on_stream_new_buffer(void *_data, uint32_t id)
on_stream_process(void *_data)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct spa_buffer *buf;
struct pw_buffer *buf;
buf = pw_stream_peek_buffer(stream, id);
buf = pw_stream_dequeue_buffer(stream);
pw_loop_invoke(pw_main_loop_get_loop(data->loop), do_render,
SPA_ID_INVALID, &buf, sizeof(struct spa_buffer *),
SPA_ID_INVALID, &buf->buffer, sizeof(struct spa_buffer *),
true, data);
pw_stream_recycle_buffer(stream, id);
pw_stream_queue_buffer(stream, buf);
}
static void on_stream_state_changed(void *_data, enum pw_stream_state old,
@ -239,14 +239,14 @@ static Uint32 id_to_sdl_format(struct data *data, uint32_t id)
}
static void
on_stream_format_changed(void *_data, struct spa_pod *format)
on_stream_format_changed(void *_data, const struct spa_pod *format)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct pw_type *t = data->t;
uint8_t params_buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer));
struct spa_pod *params[2];
const struct spa_pod *params[2];
Uint32 sdl_format;
void *d;
@ -291,7 +291,7 @@ static const struct pw_stream_events stream_events = {
PW_VERSION_STREAM_EVENTS,
.state_changed = on_stream_state_changed,
.format_changed = on_stream_format_changed,
.new_buffer = on_stream_new_buffer,
.process = on_stream_process,
};
static void on_state_changed(void *_data, enum pw_remote_state old, enum pw_remote_state state, const char *error)

View file

@ -71,35 +71,35 @@ struct data {
static void on_timeout(void *userdata, uint64_t expirations)
{
struct data *data = userdata;
uint32_t id;
struct spa_buffer *buf;
int i, j;
uint8_t *p, *map;
struct spa_meta_header *h;
struct pw_buffer *buf;
struct spa_buffer *b;
id = pw_stream_get_empty_buffer(data->stream);
if (id == SPA_ID_INVALID)
buf = pw_stream_dequeue_buffer(data->stream);
if (buf == NULL)
return;
buf = pw_stream_peek_buffer(data->stream, id);
b = buf->buffer;
if (buf->datas[0].type == data->t->data.MemFd ||
buf->datas[0].type == data->t->data.DmaBuf) {
if (b->datas[0].type == data->t->data.MemFd ||
b->datas[0].type == data->t->data.DmaBuf) {
map =
mmap(NULL, buf->datas[0].maxsize + buf->datas[0].mapoffset,
PROT_READ | PROT_WRITE, MAP_SHARED, buf->datas[0].fd, 0);
mmap(NULL, b->datas[0].maxsize + b->datas[0].mapoffset,
PROT_READ | PROT_WRITE, MAP_SHARED, b->datas[0].fd, 0);
if (map == MAP_FAILED) {
printf("failed to mmap: %s\n", strerror(errno));
return;
}
p = SPA_MEMBER(map, buf->datas[0].mapoffset, uint8_t);
} else if (buf->datas[0].type == data->t->data.MemPtr) {
p = SPA_MEMBER(map, b->datas[0].mapoffset, uint8_t);
} else if (b->datas[0].type == data->t->data.MemPtr) {
map = NULL;
p = buf->datas[0].data;
p = b->datas[0].data;
} else
return;
if ((h = spa_buffer_find_meta(buf, data->t->meta.Header))) {
if ((h = spa_buffer_find_meta(b, data->t->meta.Header))) {
#if 0
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
@ -116,16 +116,16 @@ static void on_timeout(void *userdata, uint64_t expirations)
for (j = 0; j < data->format.size.width * BPP; j++) {
p[j] = data->counter + j * i;
}
p += buf->datas[0].chunk->stride;
p += b->datas[0].chunk->stride;
data->counter += 13;
}
if (map)
munmap(map, buf->datas[0].maxsize + buf->datas[0].mapoffset);
munmap(map, b->datas[0].maxsize + b->datas[0].mapoffset);
buf->datas[0].chunk->size = buf->datas[0].maxsize;
b->datas[0].chunk->size = b->datas[0].maxsize;
pw_stream_send_buffer(data->stream, id);
pw_stream_queue_buffer(data->stream, buf);
}
static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum pw_stream_state state,
@ -157,14 +157,14 @@ static void on_stream_state_changed(void *_data, enum pw_stream_state old, enum
}
static void
on_stream_format_changed(void *_data, struct spa_pod *format)
on_stream_format_changed(void *_data, const struct spa_pod *format)
{
struct data *data = _data;
struct pw_stream *stream = data->stream;
struct pw_type *t = data->t;
uint8_t params_buffer[1024];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(params_buffer, sizeof(params_buffer));
struct spa_pod *params[2];
const struct spa_pod *params[2];
if (format == NULL) {
pw_stream_finish_format(stream, 0, NULL, 0);

View file

@ -50,12 +50,12 @@ gst_pipewire_clock_get_internal_time (GstClock * clock)
pw_stream_get_time (pclock->stream, &t);
if (t.rate)
result = gst_util_uint64_scale_int (t.ticks, GST_SECOND, t.rate);
if (t.rate.denom)
result = gst_util_uint64_scale_int (t.ticks, GST_SECOND * t.rate.num, t.rate.denom);
else
result = GST_CLOCK_TIME_NONE;
GST_DEBUG ("%"PRId64", %d %"PRId64, t.ticks, t.rate, result);
GST_DEBUG ("%"PRId64", %d %"PRId64, t.ticks, t.rate.denom, result);
return result;
}

View file

@ -21,8 +21,13 @@
#include "config.h"
#endif
#include <unistd.h>
#include <gst/gst.h>
#include <gst/allocators/gstfdmemory.h>
#include <gst/allocators/gstdmabuf.h>
#include "gstpipewirepool.h"
GST_DEBUG_CATEGORY_STATIC (gst_pipewire_pool_debug_category);
@ -40,6 +45,8 @@ enum
static guint pool_signals[LAST_SIGNAL] = { 0 };
static GQuark pool_data_quark;
GstPipeWirePool *
gst_pipewire_pool_new (void)
{
@ -50,6 +57,74 @@ gst_pipewire_pool_new (void)
return pool;
}
static void
pool_data_destroy (gpointer user_data)
{
GstPipeWirePoolData *data = user_data;
gst_object_unref (data->pool);
g_slice_free (GstPipeWirePoolData, data);
}
void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *b)
{
GstBuffer *buf;
uint32_t i;
GstPipeWirePoolData *data;
struct pw_type *t = pool->t;
GST_LOG_OBJECT (pool, "wrap buffer");
data = g_slice_new (GstPipeWirePoolData);
buf = gst_buffer_new ();
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *gmem = NULL;
GST_LOG_OBJECT (pool, "wrap buffer %d %d", d->mapoffset, d->maxsize);
if (d->type == t->data.MemFd) {
gmem = gst_fd_allocator_alloc (pool->fd_allocator, dup (d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data->offset = d->mapoffset;
}
else if(d->type == t->data.DmaBuf) {
gmem = gst_dmabuf_allocator_alloc (pool->dmabuf_allocator, dup (d->fd),
d->mapoffset + d->maxsize);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data->offset = d->mapoffset;
}
else if (d->type == t->data.MemPtr) {
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
data->offset = 0;
}
if (gmem)
gst_buffer_append_memory (buf, gmem);
}
data->pool = gst_object_ref (pool);
data->owner = NULL;
data->header = spa_buffer_find_meta (b->buffer, t->meta.Header);
data->flags = GST_BUFFER_FLAGS (buf);
data->b = b;
data->buf = buf;
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf),
pool_data_quark,
data,
pool_data_destroy);
b->user_data = data;
}
GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer)
{
return gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer), pool_data_quark);
}
#if 0
gboolean
gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer)
{
@ -78,25 +153,31 @@ gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer)
return res;
}
#endif
static GstFlowReturn
acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
GstBufferPoolAcquireParams * params)
{
GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool);
GstPipeWirePoolData *data;
struct pw_buffer *b;
GST_OBJECT_LOCK (pool);
while (TRUE) {
if (G_UNLIKELY (GST_BUFFER_POOL_IS_FLUSHING (pool)))
goto flushing;
if (p->available.length > 0)
if ((b = pw_stream_dequeue_buffer(p->stream)))
break;
GST_WARNING ("queue empty");
g_cond_wait (&p->cond, GST_OBJECT_GET_LOCK (pool));
}
*buffer = g_queue_pop_head (&p->available);
data = b->user_data;
*buffer = data->buf;
GST_OBJECT_UNLOCK (pool);
GST_DEBUG ("acquire buffer %p", *buffer);
@ -123,13 +204,7 @@ flush_start (GstBufferPool * pool)
static void
release_buffer (GstBufferPool * pool, GstBuffer *buffer)
{
GstPipeWirePool *p = GST_PIPEWIRE_POOL (pool);
GST_DEBUG ("release buffer %p", buffer);
GST_OBJECT_LOCK (pool);
g_queue_push_tail (&p->available, buffer);
g_cond_signal (&p->cond);
GST_OBJECT_UNLOCK (pool);
}
static gboolean
@ -145,6 +220,8 @@ gst_pipewire_pool_finalize (GObject * object)
GstPipeWirePool *pool = GST_PIPEWIRE_POOL (object);
GST_DEBUG_OBJECT (pool, "finalize");
g_object_unref (pool->fd_allocator);
g_object_unref (pool->dmabuf_allocator);
G_OBJECT_CLASS (gst_pipewire_pool_parent_class)->finalize (object);
}
@ -168,11 +245,14 @@ gst_pipewire_pool_class_init (GstPipeWirePoolClass * klass)
GST_DEBUG_CATEGORY_INIT (gst_pipewire_pool_debug_category, "pipewirepool", 0,
"debug category for pipewirepool object");
pool_data_quark = g_quark_from_static_string ("GstPipeWirePoolDataQuark");
}
static void
gst_pipewire_pool_init (GstPipeWirePool * pool)
{
pool->fd_allocator = gst_fd_allocator_new ();
pool->dmabuf_allocator = gst_dmabuf_allocator_new ();
g_cond_init (&pool->cond);
g_queue_init (&pool->available);
}

View file

@ -39,14 +39,29 @@ G_BEGIN_DECLS
#define GST_PIPEWIRE_POOL_GET_CLASS(klass) \
(G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_PIPEWIRE_POOL, GstPipeWirePoolClass))
typedef struct _GstPipeWirePoolData GstPipeWirePoolData;
typedef struct _GstPipeWirePool GstPipeWirePool;
typedef struct _GstPipeWirePoolClass GstPipeWirePoolClass;
struct _GstPipeWirePoolData {
GstPipeWirePool *pool;
void *owner;
struct spa_meta_header *header;
guint flags;
goffset offset;
struct pw_buffer *b;
GstBuffer *buf;
};
struct _GstPipeWirePool {
GstBufferPool parent;
struct pw_stream *stream;
GQueue available;
struct pw_type *t;
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
GCond cond;
};
@ -58,8 +73,12 @@ GType gst_pipewire_pool_get_type (void);
GstPipeWirePool * gst_pipewire_pool_new (void);
gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
void gst_pipewire_pool_wrap_buffer (GstPipeWirePool *pool, struct pw_buffer *buffer);
GstPipeWirePoolData *gst_pipewire_pool_get_data (GstBuffer *buffer);
//gboolean gst_pipewire_pool_add_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
//gboolean gst_pipewire_pool_remove_buffer (GstPipeWirePool *pool, GstBuffer *buffer);
G_END_DECLS

View file

@ -37,14 +37,9 @@
#include <stdlib.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <unistd.h>
#include <gst/allocators/gstfdmemory.h>
#include "gstpipewireformat.h"
static GQuark process_mem_data_quark;
GST_DEBUG_CATEGORY_STATIC (pipewire_sink_debug);
#define GST_CAT_DEFAULT pipewire_sink_debug
@ -123,10 +118,8 @@ gst_pipewire_sink_finalize (GObject * object)
if (pwsink->properties)
gst_structure_free (pwsink->properties);
g_object_unref (pwsink->allocator);
g_free (pwsink->path);
g_free (pwsink->client_name);
g_hash_table_unref (pwsink->buf_ids);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -219,8 +212,6 @@ gst_pipewire_sink_class_init (GstPipeWireSinkClass * klass)
GST_DEBUG_CATEGORY_INIT (pipewire_sink_debug, "pipewiresink", 0,
"PipeWire Sink");
process_mem_data_quark = g_quark_from_static_string ("GstPipeWireSinkProcessMemQuark");
}
#define PROP_RANGE(min,max) 2,min,max
@ -234,7 +225,7 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink)
guint size;
guint min_buffers;
guint max_buffers;
struct spa_pod *port_params[2];
const struct spa_pod *port_params[2];
struct spa_pod_builder b = { NULL };
uint8_t buffer[1024];
@ -273,7 +264,6 @@ pool_activated (GstPipeWirePool *pool, GstPipeWireSink *sink)
static void
gst_pipewire_sink_init (GstPipeWireSink * sink)
{
sink->allocator = gst_fd_allocator_new ();
sink->pool = gst_pipewire_pool_new ();
sink->client_name = pw_get_client_name();
sink->mode = DEFAULT_PROP_MODE;
@ -281,15 +271,13 @@ gst_pipewire_sink_init (GstPipeWireSink * sink)
g_signal_connect (sink->pool, "activated", G_CALLBACK (pool_activated), sink);
sink->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
(GDestroyNotify) gst_buffer_unref);
g_queue_init (&sink->queue);
sink->loop = pw_loop_new (NULL);
sink->main_loop = pw_thread_loop_new (sink->loop, "pipewire-sink-loop");
sink->core = pw_core_new (sink->loop, NULL);
sink->type = pw_core_get_type (sink->core);
sink->pool->t = sink->type;
GST_DEBUG ("loop %p %p", sink->loop, sink->main_loop);
}
@ -407,126 +395,35 @@ gst_pipewire_sink_get_property (GObject * object, guint prop_id,
}
}
typedef struct {
GstPipeWireSink *sink;
guint id;
struct spa_buffer *buf;
struct spa_meta_header *header;
guint flags;
goffset offset;
} ProcessMemData;
static void
process_mem_data_destroy (gpointer user_data)
{
ProcessMemData *data = user_data;
gst_object_unref (data->sink);
g_slice_free (ProcessMemData, data);
}
static void
on_add_buffer (void *_data,
uint32_t id)
on_add_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = _data;
struct spa_buffer *b;
GstBuffer *buf;
uint32_t i;
ProcessMemData data;
struct pw_type *t = pwsink->type;
GST_LOG_OBJECT (pwsink, "add buffer");
if (!(b = pw_stream_peek_buffer (pwsink->stream, id))) {
g_warning ("failed to peek buffer");
return;
}
buf = gst_buffer_new ();
data.sink = gst_object_ref (pwsink);
data.id = id;
data.buf = b;
data.header = spa_buffer_find_meta (b, t->meta.Header);
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *gmem = NULL;
if (d->type == t->data.MemFd ||
d->type == t->data.DmaBuf) {
gmem = gst_fd_allocator_alloc (pwsink->allocator, dup (d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data.offset = d->mapoffset;
}
else if (d->type == t->data.MemPtr) {
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
data.offset = 0;
}
if (gmem)
gst_buffer_append_memory (buf, gmem);
}
data.flags = GST_BUFFER_FLAGS (buf);
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf),
process_mem_data_quark,
g_slice_dup (ProcessMemData, &data),
process_mem_data_destroy);
gst_pipewire_pool_add_buffer (pwsink->pool, buf);
g_hash_table_insert (pwsink->buf_ids, GINT_TO_POINTER (id), buf);
gst_pipewire_pool_wrap_buffer (pwsink->pool, b);
pw_thread_loop_signal (pwsink->main_loop, FALSE);
}
static void
on_remove_buffer (void *data,
uint32_t id)
on_remove_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSink *pwsink = data;
GstBuffer *buf;
GstPipeWireSink *pwsink = _data;
GstPipeWirePoolData *data = b->user_data;
GST_LOG_OBJECT (pwsink, "remove buffer");
buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id));
if (buf) {
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
if (!gst_pipewire_pool_remove_buffer (pwsink->pool, buf))
gst_buffer_ref (buf);
if (g_queue_remove (&pwsink->queue, buf))
gst_buffer_unref (buf);
g_hash_table_remove (pwsink->buf_ids, GINT_TO_POINTER (id));
}
}
static void
on_new_buffer (void *data,
uint32_t id)
{
GstPipeWireSink *pwsink = data;
GstBuffer *buf;
GST_LOG_OBJECT (pwsink, "got new buffer %u", id);
if (pwsink->stream == NULL) {
GST_LOG_OBJECT (pwsink, "no stream");
return;
}
buf = g_hash_table_lookup (pwsink->buf_ids, GINT_TO_POINTER (id));
if (buf) {
gst_buffer_unref (buf);
pw_thread_loop_signal (pwsink->main_loop, FALSE);
}
if (g_queue_remove (&pwsink->queue, data->buf))
gst_buffer_unref (data->buf);
gst_buffer_unref (data->buf);
}
static void
do_send_buffer (GstPipeWireSink *pwsink)
{
GstBuffer *buffer;
ProcessMemData *data;
GstPipeWirePoolData *data;
gboolean res;
guint i;
struct spa_buffer *b;
buffer = g_queue_pop_head (&pwsink->queue);
if (buffer == NULL) {
@ -534,23 +431,24 @@ do_send_buffer (GstPipeWireSink *pwsink)
return;
}
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buffer),
process_mem_data_quark);
data = gst_pipewire_pool_get_data(buffer);
b = data->b->buffer;
if (data->header) {
data->header->seq = GST_BUFFER_OFFSET (buffer);
data->header->pts = GST_BUFFER_PTS (buffer);
data->header->dts_offset = GST_BUFFER_DTS (buffer);
}
for (i = 0; i < data->buf->n_datas; i++) {
struct spa_data *d = &data->buf->datas[i];
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buffer, i);
d->chunk->offset = mem->offset - data->offset;
d->chunk->size = mem->size;
}
if (!(res = pw_stream_send_buffer (pwsink->stream, data->id))) {
g_warning ("can't send buffer");
if ((res = pw_stream_queue_buffer (pwsink->stream, data->b)) < 0) {
g_warning ("can't send buffer %s", spa_strerror(res));
pw_thread_loop_signal (pwsink->main_loop, FALSE);
} else
pwsink->need_ready--;
@ -558,9 +456,17 @@ do_send_buffer (GstPipeWireSink *pwsink)
static void
on_need_buffer (void *data)
on_process (void *data)
{
GstPipeWireSink *pwsink = data;
if (pwsink->stream == NULL) {
GST_LOG_OBJECT (pwsink, "no stream");
return;
}
g_cond_signal (&pwsink->pool->cond);
pwsink->need_ready++;
GST_DEBUG ("need buffer %u", pwsink->need_ready);
do_send_buffer (pwsink);
@ -590,7 +496,7 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta
}
static void
on_format_changed (void *data, struct spa_pod *format)
on_format_changed (void *data, const struct spa_pod *format)
{
GstPipeWireSink *pwsink = data;
@ -733,8 +639,7 @@ static const struct pw_stream_events stream_events = {
.format_changed = on_format_changed,
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.new_buffer = on_new_buffer,
.need_buffer = on_need_buffer,
.process = on_process,
};
static gboolean

View file

@ -89,12 +89,10 @@ struct _GstPipeWireSink {
struct pw_stream *stream;
struct spa_hook stream_listener;
GstAllocator *allocator;
GstStructure *properties;
GstPipeWireSinkMode mode;
GstPipeWirePool *pool;
GHashTable *buf_ids;
GQueue queue;
guint need_ready;
};

View file

@ -211,13 +211,10 @@ gst_pipewire_src_finalize (GObject * object)
if (pwsrc->properties)
gst_structure_free (pwsrc->properties);
g_object_unref (pwsrc->fd_allocator);
g_object_unref (pwsrc->dmabuf_allocator);
if (pwsrc->clock)
gst_object_unref (pwsrc->clock);
g_free (pwsrc->path);
g_free (pwsrc->client_name);
g_hash_table_unref (pwsrc->buf_ids);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@ -322,127 +319,60 @@ gst_pipewire_src_init (GstPipeWireSrc * src)
g_queue_init (&src->queue);
src->fd_allocator = gst_fd_allocator_new ();
src->dmabuf_allocator = gst_dmabuf_allocator_new ();
src->client_name = pw_get_client_name ();
src->buf_ids = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) gst_buffer_unref);
src->pool = gst_pipewire_pool_new ();
src->loop = pw_loop_new (NULL);
src->main_loop = pw_thread_loop_new (src->loop, "pipewire-main-loop");
src->core = pw_core_new (src->loop, NULL);
src->type = pw_core_get_type (src->core);
src->pool->t = src->type;
GST_DEBUG ("loop %p, mainloop %p", src->loop, src->main_loop);
}
typedef struct {
GstPipeWireSrc *src;
guint id;
struct spa_buffer *buf;
struct spa_meta_header *header;
guint flags;
goffset offset;
} ProcessMemData;
static void
process_mem_data_destroy (gpointer user_data)
{
ProcessMemData *data = user_data;
gst_object_unref (data->src);
g_slice_free (ProcessMemData, data);
}
static gboolean
buffer_recycle (GstMiniObject *obj)
{
ProcessMemData *data;
GstPipeWireSrc *src;
GstPipeWirePoolData *data;
gst_mini_object_ref (obj);
data = gst_mini_object_get_qdata (obj,
process_mem_data_quark);
data = gst_pipewire_pool_get_data (GST_BUFFER_CAST(obj));
GST_BUFFER_FLAGS (obj) = data->flags;
src = data->src;
src = data->owner;
GST_LOG_OBJECT (obj, "recycle buffer");
pw_thread_loop_lock (src->main_loop);
pw_stream_recycle_buffer (src->stream, data->id);
pw_stream_queue_buffer (src->stream, data->b);
pw_thread_loop_unlock (src->main_loop);
return FALSE;
}
static void
on_add_buffer (void *_data, guint id)
on_add_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSrc *pwsrc = _data;
struct spa_buffer *b;
GstBuffer *buf;
uint32_t i;
ProcessMemData data;
struct pw_core *core = pwsrc->core;
struct pw_type *t = pw_core_get_type(core);
GstPipeWirePoolData *data;
GST_LOG_OBJECT (pwsrc, "add buffer");
if (!(b = pw_stream_peek_buffer (pwsrc->stream, id))) {
g_warning ("failed to peek buffer");
return;
}
buf = gst_buffer_new ();
GST_MINI_OBJECT_CAST (buf)->dispose = buffer_recycle;
data.src = gst_object_ref (pwsrc);
data.id = id;
data.buf = b;
data.header = spa_buffer_find_meta (b, t->meta.Header);
for (i = 0; i < b->n_datas; i++) {
struct spa_data *d = &b->datas[i];
GstMemory *gmem = NULL;
if (d->type == t->data.MemFd) {
gmem = gst_fd_allocator_alloc (pwsrc->fd_allocator, dup (d->fd),
d->mapoffset + d->maxsize, GST_FD_MEMORY_FLAG_NONE);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data.offset = d->mapoffset;
}
else if(d->type == t->data.DmaBuf) {
gmem = gst_dmabuf_allocator_alloc (pwsrc->dmabuf_allocator, dup (d->fd),
d->mapoffset + d->maxsize);
gst_memory_resize (gmem, d->mapoffset, d->maxsize);
data.offset = d->mapoffset;
}
else if (d->type == t->data.MemPtr) {
gmem = gst_memory_new_wrapped (0, d->data, d->maxsize, 0,
d->maxsize, NULL, NULL);
data.offset = 0;
}
if (gmem)
gst_buffer_append_memory (buf, gmem);
}
data.flags = GST_BUFFER_FLAGS (buf);
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (buf),
process_mem_data_quark,
g_slice_dup (ProcessMemData, &data),
process_mem_data_destroy);
g_hash_table_insert (pwsrc->buf_ids, GINT_TO_POINTER (id), buf);
gst_pipewire_pool_wrap_buffer (pwsrc->pool, b);
data = b->user_data;
data->owner = pwsrc;
GST_MINI_OBJECT_CAST (data->buf)->dispose = buffer_recycle;
}
static void
on_remove_buffer (void *data,
guint id)
on_remove_buffer (void *_data, struct pw_buffer *b)
{
GstPipeWireSrc *pwsrc = data;
GstBuffer *buf;
GstPipeWireSrc *pwsrc = _data;
GstPipeWirePoolData *data = b->user_data;
GstBuffer *buf = data->buf;
GList *walk;
GST_LOG_OBJECT (pwsrc, "remove buffer");
buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id));
if (buf) {
GList *walk;
GST_MINI_OBJECT_CAST (buf)->dispose = NULL;
@ -456,29 +386,27 @@ on_remove_buffer (void *data,
}
walk = next;
}
g_hash_table_remove (pwsrc->buf_ids, GINT_TO_POINTER (id));
}
}
static void
on_new_buffer (void *_data,
guint id)
on_process (void *_data)
{
GstPipeWireSrc *pwsrc = _data;
struct pw_buffer *b;
GstBuffer *buf;
ProcessMemData *data;
GstPipeWirePoolData *data;
struct spa_meta_header *h;
guint i;
buf = g_hash_table_lookup (pwsrc->buf_ids, GINT_TO_POINTER (id));
if (buf == NULL) {
g_warning ("unknown buffer %d", id);
b = pw_stream_dequeue_buffer (pwsrc->stream);
if (b == NULL)
return;
}
data = b->user_data;
buf = data->buf;
GST_LOG_OBJECT (pwsrc, "got new buffer %p", buf);
data = gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (buf),
process_mem_data_quark);
h = data->header;
if (h) {
GST_INFO ("pts %" G_GUINT64_FORMAT ", dts_offset %"G_GUINT64_FORMAT, h->pts, h->dts_offset);
@ -490,8 +418,8 @@ on_new_buffer (void *_data,
}
GST_BUFFER_OFFSET (buf) = h->seq;
}
for (i = 0; i < data->buf->n_datas; i++) {
struct spa_data *d = &data->buf->datas[i];
for (i = 0; i < b->buffer->n_datas; i++) {
struct spa_data *d = &b->buffer->datas[i];
GstMemory *mem = gst_buffer_peek_memory (buf, i);
mem->offset = SPA_MIN(d->chunk->offset, d->maxsize);
mem->size = SPA_MIN(d->chunk->size, d->maxsize - mem->offset);
@ -690,7 +618,7 @@ gst_pipewire_src_negotiate (GstBaseSrc * basesrc)
pw_stream_connect (pwsrc->stream,
PW_DIRECTION_INPUT,
pwsrc->path,
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_CLOCK_UPDATE,
PW_STREAM_FLAG_AUTOCONNECT,
(const struct spa_pod **)possible->pdata,
possible->len);
g_ptr_array_free (possible, TRUE);
@ -755,7 +683,7 @@ connect_error:
static void
on_format_changed (void *data,
struct spa_pod *format)
const struct spa_pod *format)
{
GstPipeWireSrc *pwsrc = data;
GstCaps *caps;
@ -775,7 +703,7 @@ on_format_changed (void *data,
gst_caps_unref (caps);
if (res) {
struct spa_pod *params[2];
const struct spa_pod *params[2];
struct spa_pod_builder b = { NULL };
uint8_t buffer[512];
@ -1041,7 +969,7 @@ static const struct pw_stream_events stream_events = {
.format_changed = on_format_changed,
.add_buffer = on_add_buffer,
.remove_buffer = on_remove_buffer,
.new_buffer = on_new_buffer,
.process = on_process,
};
static gboolean

View file

@ -24,6 +24,7 @@
#include <gst/base/gstpushsrc.h>
#include <pipewire/pipewire.h>
#include <gst/gstpipewirepool.h>
G_BEGIN_DECLS
@ -76,11 +77,9 @@ struct _GstPipeWireSrc {
struct pw_stream *stream;
struct spa_hook stream_listener;
GstAllocator *fd_allocator;
GstAllocator *dmabuf_allocator;
GstStructure *properties;
GHashTable *buf_ids;
GstPipeWirePool *pool;
GQueue queue;
GstClock *clock;
};

View file

@ -42,7 +42,7 @@
#define MAX_INPUTS 64
#define MAX_OUTPUTS 64
struct mem_id {
struct mem {
uint32_t id;
int fd;
uint32_t flags;
@ -51,15 +51,14 @@ struct mem_id {
void *ptr;
};
struct buffer_id {
struct buffer {
struct pw_buffer buffer;
struct spa_list link;
uint32_t id;
bool used;
struct spa_buffer *buf;
bool queued;
void *ptr;
struct pw_map_range map;
uint32_t n_mem;
struct mem_id **mem;
struct mem **mem;
};
struct stream {
@ -101,9 +100,8 @@ struct stream {
bool client_reuse;
struct spa_list free;
bool in_need_buffer;
bool in_new_buffer;
struct spa_list queue;
bool in_process;
int64_t last_ticks;
int32_t last_rate;
@ -111,19 +109,19 @@ struct stream {
};
/** \endcond */
static struct mem_id *find_mem(struct pw_stream *stream, uint32_t id)
static struct mem *find_mem(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct mem_id *mid;
struct mem *m;
pw_array_for_each(mid, &impl->mem_ids) {
if (mid->id == id)
return mid;
pw_array_for_each(m, &impl->mem_ids) {
if (m->id == id)
return m;
}
return NULL;
}
static void *mem_map(struct pw_stream *stream, struct mem_id *m, uint32_t offset, uint32_t size)
static void *mem_map(struct pw_stream *stream, struct mem *m, uint32_t offset, uint32_t size)
{
if (m->ptr == NULL) {
pw_map_range_init(&m->map, offset, size, stream->remote->core->sc_pagesize);
@ -140,7 +138,7 @@ static void *mem_map(struct pw_stream *stream, struct mem_id *m, uint32_t offset
return SPA_MEMBER(m->ptr, m->map.start, void);
}
static void mem_unmap(struct stream *impl, struct mem_id *m)
static void mem_unmap(struct stream *impl, struct mem *m)
{
if (m->ptr != NULL) {
if (munmap(m->ptr, m->map.size) < 0)
@ -149,15 +147,15 @@ static void mem_unmap(struct stream *impl, struct mem_id *m)
}
}
static void clear_memid(struct stream *impl, struct mem_id *mid)
static void clear_mem(struct stream *impl, struct mem *m)
{
if (mid->fd != -1) {
if (m->fd != -1) {
bool has_ref = false;
struct mem_id *m2;
struct mem *m2;
int fd;
fd = mid->fd;
mid->fd = -1;
fd = m->fd;
m->fd = -1;
pw_array_for_each(m2, &impl->mem_ids) {
if (m2->fd == fd) {
@ -166,7 +164,7 @@ static void clear_memid(struct stream *impl, struct mem_id *mid)
}
}
if (!has_ref) {
mem_unmap(impl, mid);
mem_unmap(impl, m);
close(fd);
}
}
@ -175,33 +173,33 @@ static void clear_memid(struct stream *impl, struct mem_id *mid)
static void clear_mems(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct mem_id *mid;
struct mem *m;
pw_array_for_each(mid, &impl->mem_ids)
clear_memid(impl, mid);
pw_array_for_each(m, &impl->mem_ids)
clear_mem(impl, m);
impl->mem_ids.size = 0;
}
static void clear_buffers(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
pw_log_debug("stream %p: clear buffers", stream);
pw_array_for_each(bid, &impl->buffer_ids) {
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, bid->id);
if (bid->ptr != NULL)
if (munmap(bid->ptr, bid->map.size) < 0)
pw_array_for_each(b, &impl->buffer_ids) {
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, remove_buffer, &b->buffer);
if (b->ptr != NULL)
if (munmap(b->ptr, b->map.size) < 0)
pw_log_warn("failed to unmap buffer: %m");
bid->ptr = NULL;
free(bid->buf);
bid->buf = NULL;
bid->used = false;
b->ptr = NULL;
free(b->buffer.buffer);
b->buffer.buffer = NULL;
b->queued = false;
}
impl->buffer_ids.size = 0;
impl->in_order = true;
spa_list_init(&impl->free);
spa_list_init(&impl->queue);
}
static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, char *error)
@ -282,11 +280,11 @@ struct pw_stream *pw_stream_new(struct pw_remote *remote,
this->state = PW_STREAM_STATE_UNCONNECTED;
pw_array_init(&impl->mem_ids, 64);
pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem_id) * 64);
pw_array_ensure_size(&impl->mem_ids, sizeof(struct mem) * 64);
pw_array_init(&impl->buffer_ids, 32);
pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer_id) * 64);
pw_array_ensure_size(&impl->buffer_ids, sizeof(struct buffer) * 64);
impl->pending_seq = SPA_ID_INVALID;
spa_list_init(&impl->free);
spa_list_init(&impl->queue);
spa_list_append(&remote->stream_list, &this->link);
@ -374,7 +372,7 @@ set_init_params(struct pw_stream *stream,
}
}
static void set_params(struct pw_stream *stream, int n_params, struct spa_pod **params)
static void set_params(struct pw_stream *stream, int n_params, const struct spa_pod **params)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
int i;
@ -549,18 +547,18 @@ static void on_timeout(void *data, uint64_t expirations)
add_request_clock_update(stream);
}
static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id)
static struct buffer *find_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer_id)) {
return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer_id);
if (impl->in_order && pw_array_check_index(&impl->buffer_ids, id, struct buffer)) {
return pw_array_get_unchecked(&impl->buffer_ids, id, struct buffer);
} else {
struct buffer_id *bid;
struct buffer *b;
pw_array_for_each(bid, &impl->buffer_ids) {
if (bid->id == id)
return bid;
pw_array_for_each(b, &impl->buffer_ids) {
if (b->buffer.buffer->id == id)
return b;
}
}
return NULL;
@ -569,15 +567,12 @@ static struct buffer_id *find_buffer(struct pw_stream *stream, uint32_t id)
static inline void reuse_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
if ((bid = find_buffer(stream, id)) && bid->used) {
if ((b = find_buffer(stream, id)) && !b->queued) {
pw_log_trace("stream %p: reuse buffer %u", stream, id);
bid->used = false;
spa_list_append(&impl->free, &bid->link);
impl->in_new_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, new_buffer, id);
impl->in_new_buffer = false;
spa_list_append(&impl->queue, &b->link);
b->queued = true;
}
}
@ -592,7 +587,7 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i];
struct buffer_id *bid;
struct buffer *b;
uint32_t buffer_id;
buffer_id = input->buffer_id;
@ -600,20 +595,20 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
pw_log_trace("stream %p: process input %d %d", stream, input->status,
buffer_id);
if ((bid = find_buffer(stream, buffer_id)) == NULL)
if ((b = find_buffer(stream, buffer_id)) == NULL)
continue;
if (impl->client_reuse)
input->buffer_id = SPA_ID_INVALID;
if (input->status == SPA_STATUS_HAVE_BUFFER) {
bid->used = true;
impl->in_new_buffer = true;
spa_list_append(&impl->queue, &b->link);
b->queued = true;
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
new_buffer, buffer_id);
impl->in_new_buffer = false;
process);
impl->in_process = false;
}
input->status = SPA_STATUS_NEED_BUFFER;
}
send_need_input(stream);
@ -633,9 +628,9 @@ static void handle_rtnode_message(struct pw_stream *stream, struct pw_client_nod
output->buffer_id = SPA_ID_INVALID;
}
pw_log_trace("stream %p: process output", stream);
impl->in_need_buffer = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, need_buffer);
impl->in_need_buffer = false;
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, process);
impl->in_process = false;
break;
}
case PW_CLIENT_NODE_MESSAGE_PORT_REUSE_BUFFER:
@ -695,12 +690,11 @@ static void handle_socket(struct pw_stream *stream, int rtreadfd, int rtwritefd)
SPA_IO_ERR | SPA_IO_HUP,
true, on_rtsocket_condition, stream);
if (impl->flags & PW_STREAM_FLAG_CLOCK_UPDATE) {
impl->timeout_source = pw_loop_add_timer(stream->remote->core->main_loop, on_timeout, stream);
interval.tv_sec = 0;
interval.tv_nsec = 100000000;
pw_loop_update_timer(stream->remote->core->main_loop, impl->timeout_source, NULL, &interval, false);
}
return;
}
@ -751,10 +745,10 @@ static void client_node_command(void *data, uint32_t seq, const struct spa_comma
send_need_input(stream);
}
else {
impl->in_need_buffer = true;
impl->in_process = true;
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
need_buffer);
impl->in_need_buffer = false;
process);
impl->in_process = false;
}
stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL);
}
@ -839,15 +833,15 @@ client_node_add_mem(void *data,
{
struct stream *impl = data;
struct pw_stream *stream = &impl->this;
struct mem_id *m;
struct mem *m;
m = find_mem(stream, mem_id);
if (m) {
pw_log_debug("update mem %u, fd %d, flags %d",
mem_id, memfd, flags);
clear_memid(impl, m);
clear_mem(impl, m);
} else {
m = pw_array_add(&impl->mem_ids, sizeof(struct mem_id));
m = pw_array_add(&impl->mem_ids, sizeof(struct mem));
pw_log_debug("add mem %u, fd %d, flags %d",
mem_id, memfd, flags);
}
@ -868,7 +862,7 @@ client_node_port_use_buffers(void *data,
struct pw_stream *stream = &impl->this;
struct pw_core *core = stream->remote->core;
struct pw_type *t = &core->type;
struct buffer_id *bid;
struct buffer *bid;
uint32_t i, j, len;
struct spa_buffer *b;
int prot;
@ -881,29 +875,29 @@ client_node_port_use_buffers(void *data,
for (i = 0; i < n_buffers; i++) {
off_t offset;
struct mem_id *mid = find_mem(stream, buffers[i].mem_id);
if (mid == NULL) {
struct mem *m = find_mem(stream, buffers[i].mem_id);
if (m == NULL) {
pw_log_warn("unknown memory id %u", buffers[i].mem_id);
continue;
}
len = pw_array_get_len(&impl->buffer_ids, struct buffer_id);
bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer_id));
len = pw_array_get_len(&impl->buffer_ids, struct buffer);
bid = pw_array_add(&impl->buffer_ids, sizeof(struct buffer));
if (impl->direction == SPA_DIRECTION_OUTPUT) {
bid->used = false;
spa_list_append(&impl->free, &bid->link);
bid->queued = true;
spa_list_append(&impl->queue, &bid->link);
} else {
bid->used = true;
bid->queued = false;
}
b = buffers[i].buffer;
pw_map_range_init(&bid->map, buffers[i].offset, buffers[i].size, core->sc_pagesize);
bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, mid->fd, bid->map.offset);
bid->ptr = mmap(NULL, bid->map.size, prot, MAP_SHARED, m->fd, bid->map.offset);
if (bid->ptr == MAP_FAILED) {
bid->ptr = NULL;
pw_log_warn("Failed to mmap memory %d %p: %s", bid->map.size, mid,
pw_log_warn("Failed to mmap memory %d %p: %s", bid->map.size, m,
strerror(errno));
continue;
}
@ -912,35 +906,34 @@ client_node_port_use_buffers(void *data,
size_t size;
size = sizeof(struct spa_buffer);
size += sizeof(struct mem_id *);
size += sizeof(struct mem *);
for (j = 0; j < buffers[i].buffer->n_metas; j++)
size += sizeof(struct spa_meta);
for (j = 0; j < buffers[i].buffer->n_datas; j++) {
size += sizeof(struct spa_data);
size += sizeof(struct mem_id *);
size += sizeof(struct mem *);
}
b = bid->buf = malloc(size);
b = bid->buffer.buffer = malloc(size);
memcpy(b, buffers[i].buffer, sizeof(struct spa_buffer));
b->metas = SPA_MEMBER(b, sizeof(struct spa_buffer), struct spa_meta);
b->datas = SPA_MEMBER(b->metas, sizeof(struct spa_meta) * b->n_metas,
struct spa_data);
bid->mem = SPA_MEMBER(b->datas, sizeof(struct spa_data) * b->n_datas,
struct mem_id*);
struct mem*);
bid->n_mem = 0;
mid->ref++;
bid->mem[bid->n_mem++] = mid;
m->ref++;
bid->mem[bid->n_mem++] = m;
}
bid->id = b->id;
if (bid->id != len) {
pw_log_warn("unexpected id %u found, expected %u", bid->id, len);
if (b->id != len) {
pw_log_warn("unexpected id %u found, expected %u", b->id, len);
impl->in_order = false;
}
pw_log_debug("add buffer %d %d %u %u", mid->id,
bid->id, bid->map.offset, bid->map.size);
pw_log_debug("add buffer %d %d %u %u", m->id,
b->id, bid->map.offset, bid->map.size);
offset = bid->map.start;
for (j = 0; j < b->n_metas; j++) {
@ -959,22 +952,23 @@ client_node_port_use_buffers(void *data,
struct spa_chunk);
if (d->type == t->data.MemFd || d->type == t->data.DmaBuf) {
struct mem_id *bmid = find_mem(stream, SPA_PTR_TO_UINT32(d->data));
struct mem *bm = find_mem(stream, SPA_PTR_TO_UINT32(d->data));
d->data = NULL;
d->fd = bmid->fd;
bmid->ref++;
bid->mem[bid->n_mem++] = bmid;
pw_log_debug(" data %d %u -> fd %d", j, bmid->id, bmid->fd);
d->fd = bm->fd;
bm->ref++;
bid->mem[bid->n_mem++] = bm;
pw_log_debug(" data %d %u -> fd %d", j, bm->id, bm->fd);
} else if (d->type == t->data.MemPtr) {
d->data = SPA_MEMBER(bid->ptr,
bid->map.start + SPA_PTR_TO_INT(d->data), void);
d->fd = -1;
pw_log_debug(" data %d %u -> mem %p", j, bid->id, d->data);
pw_log_debug(" data %d %u -> mem %p", j, b->id, d->data);
} else {
pw_log_warn("unknown buffer data type %d", d->type);
}
}
spa_hook_list_call(&stream->listener_list, struct pw_stream_events, add_buffer, bid->id);
spa_hook_list_call(&stream->listener_list, struct pw_stream_events,
add_buffer, &bid->buffer);
}
add_async_complete(stream, seq, 0);
@ -1029,7 +1023,7 @@ static void client_node_port_set_io(void *data,
struct pw_stream *stream = &impl->this;
struct pw_core *core = stream->remote->core;
struct pw_type *t = &core->type;
struct mem_id *m;
struct mem *m;
void *ptr;
int res;
@ -1135,6 +1129,12 @@ pw_stream_connect(struct pw_stream *stream,
return 0;
}
struct pw_remote *
pw_stream_get_remote(struct pw_stream *stream)
{
return stream->remote;
}
uint32_t
pw_stream_get_node_id(struct pw_stream *stream)
{
@ -1144,7 +1144,7 @@ pw_stream_get_node_id(struct pw_stream *stream)
void
pw_stream_finish_format(struct pw_stream *stream,
int res,
struct spa_pod **params,
const struct spa_pod **params,
uint32_t n_params)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
@ -1203,81 +1203,78 @@ int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
elapsed = (time->now - impl->last_monotonic) / 1000;
time->ticks = impl->last_ticks + (elapsed * impl->last_rate) / SPA_USEC_PER_SEC;
time->rate = impl->last_rate;
time->rate.num = 1;
time->rate.denom = impl->last_rate;
return 0;
}
uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream)
int pw_stream_set_control(struct pw_stream *stream, const char *name, float value)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
if (spa_list_is_empty(&impl->free))
return SPA_ID_INVALID;
bid = spa_list_first(&impl->free, struct buffer_id, link);
return bid->id;
return -ENOTSUP;
}
int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id)
int pw_stream_get_control(struct pw_stream *stream, const char *name, float *value)
{
return -ENOTSUP;
}
struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
struct buffer *b;
if ((bid = find_buffer(stream, id)) == NULL || !bid->used)
if (spa_list_is_empty(&impl->queue))
return NULL;
b = spa_list_first(&impl->queue, struct buffer, link);
b->queued = false;
spa_list_remove(&b->link);
return &b->buffer;
}
int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer *b;
uint32_t id;
if ((b = find_buffer(stream, buffer->buffer->id)) == NULL)
return -EINVAL;
bid->used = false;
spa_list_append(&impl->free, &bid->link);
if (b->queued)
return -EINVAL;
if (impl->in_new_buffer) {
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i];
input->buffer_id = id;
}
} else {
send_reuse_buffer(stream, id);
}
return 0;
}
struct spa_buffer *pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id)
{
struct buffer_id *bid;
if ((bid = find_buffer(stream, id)))
return bid->buf;
return NULL;
}
int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id)
{
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
struct buffer_id *bid;
id = buffer->buffer->id;
if (impl->direction == SPA_DIRECTION_OUTPUT) {
if (impl->trans->outputs[0].buffer_id != SPA_ID_INVALID) {
pw_log_debug("can't send %u, pending buffer %u", id,
impl->trans->outputs[0].buffer_id);
return -EIO;
}
if ((bid = find_buffer(stream, id)) && !bid->used) {
bid->used = true;
spa_list_remove(&bid->link);
impl->trans->outputs[0].buffer_id = id;
impl->trans->outputs[0].status = SPA_STATUS_HAVE_BUFFER;
pw_log_trace("stream %p: send buffer %d", stream, id);
if (!impl->in_need_buffer)
if (!impl->in_process)
send_have_output(stream);
} else {
pw_log_debug("stream %p: output %u was used", stream, id);
}
else {
b->queued = true;
spa_list_append(&impl->queue, &b->link);
if (impl->in_process) {
int i;
for (i = 0; i < impl->trans->area->n_input_ports; i++) {
struct spa_io_buffers *input = &impl->trans->inputs[i];
input->buffer_id = id;
}
}
else {
send_reuse_buffer(stream, id);
}
}
return 0;
}

View file

@ -59,18 +59,6 @@ extern "C" {
* The stream is initially unconnected. To connect the stream, use
* \ref pw_stream_connect(). Pass the desired direction as an argument.
*
* \subsection ssec_stream_mode Stream modes
*
* The stream mode specifies how the data will be exchanged with PipeWire.
* The following stream modes are available
*
* \li \ref PW_STREAM_MODE_BUFFER: data is exchanged with fixed size
* buffers. This is ideal for video frames or equal sized audio
* frames.
* \li \ref PW_STREAM_MODE_RINGBUFFER: data is exhanged with a fixed
* size ringbuffer. This is ideal for variable sized audio packets
* or compressed media.
*
* \subsection ssec_stream_target Stream target
*
* To make the newly connected stream automatically connect to an existing
@ -107,7 +95,8 @@ extern "C" {
* between client and server.
*
* With the add_buffer event, a stream will be notified of a new buffer
* that can be used for data transport.
* that can be used for data transport. You can attach user_data to these
* buffers.
*
* Afer the buffers are negotiated, the stream will transition to the
* \ref PW_STREAM_STATE_PAUSED state.
@ -124,28 +113,23 @@ extern "C" {
*
* \subsection ssec_consume Consume data
*
* The new_buffer event is emited for each new buffer can can be
* The process event is emited for each new buffer that can can be
* consumed.
*
* \ref pw_stream_peek_buffer() should be used to get the data and metadata
* of the buffer.
* \ref pw_stream_dequeue_buffer() should be used to get the data and
* metadata of the buffer.
*
* When the buffer is no longer in use, call \ref pw_stream_recycle_buffer()
* When the buffer is no longer in use, call \ref pw_stream_queue_buffer()
* to let PipeWire reuse the buffer.
*
* \subsection ssec_produce Produce data
*
* The need_buffer event is emited when PipeWire needs a new buffer for this
* stream.
* \ref pw_stream_dequeue_buffer() gives an empty buffer that can be filled.
*
* \ref pw_stream_get_empty_buffer() gives the id of an empty buffer.
* Use \ref pw_stream_peek_buffer() to get the data and metadata that should
* be filled.
* Filled buffers should be queued with \ref pw_stream_queue_buffer().
*
* To send the filled buffer, use \ref pw_stream_send_buffer().
*
* The new_buffer event is emited when PipeWire no longer uses the buffer
* and it can be safely reused.
* The process event is emited when PipeWire has emptied a buffer that
* can now be refilled.
*
* \section sec_stream_disconnect Disconnect
*
@ -179,6 +163,11 @@ enum pw_stream_state {
PW_STREAM_STATE_STREAMING = 5 /**< streaming */
};
struct pw_buffer {
struct spa_buffer *buffer; /* the spa buffer */
void *user_data; /* user data attached to the buffer */
};
/** Events for a stream */
struct pw_stream_events {
#define PW_VERSION_STREAM_EVENTS 0
@ -191,17 +180,18 @@ struct pw_stream_events {
/** when the format changed. The listener should call
* pw_stream_finish_format() from within this callback or later to complete
* the format negotiation and start the buffer negotiation. */
void (*format_changed) (void *data, struct spa_pod *format);
void (*format_changed) (void *data, const struct spa_pod *format);
/** when a new buffer was created for this stream */
void (*add_buffer) (void *data, uint32_t id);
void (*add_buffer) (void *data, struct pw_buffer *buffer);
/** when a buffer was destroyed for this stream */
void (*remove_buffer) (void *data, uint32_t id);
/** when a buffer can be reused (for playback streams) or
* is filled (for capture streams */
void (*new_buffer) (void *data, uint32_t id);
/** when a buffer is needed (for playback streams) */
void (*need_buffer) (void *data);
void (*remove_buffer) (void *data, struct pw_buffer *buffer);
/** when a buffer can be queued (for playback streams) or
* dequeued (for capture streams). This is normally called from the
* mainloop but can also be called directly from the realtime data
* thread if the user is prepared to deal with this. */
void (*process) (void *data);
};
/** Convert a stream state to a readable string \memberof pw_stream */
@ -212,16 +202,14 @@ enum pw_stream_flags {
PW_STREAM_FLAG_NONE = 0, /**< no flags */
PW_STREAM_FLAG_AUTOCONNECT = (1 << 0), /**< try to automatically connect
* this stream */
PW_STREAM_FLAG_CLOCK_UPDATE = (1 << 1), /**< request periodic clock updates for
* this stream */
PW_STREAM_FLAG_INACTIVE = (1 << 2), /**< start the stream inactive */
};
/** A time structure \memberof pw_stream */
struct pw_time {
int64_t now; /**< the monotonic time */
int64_t ticks; /**< the ticks at \a now */
int32_t rate; /**< the rate of \a ticks */
PW_STREAM_FLAG_INACTIVE = (1 << 1), /**< start the stream inactive */
PW_STREAM_FLAG_MAP_BUFFERS = (1 << 2), /**< mmap the buffers */
PW_STREAM_FLAG_DRIVER = (1 << 3), /**< be a driver */
PW_STREAM_FLAG_RT_PROCESS = (1 << 4), /**< call process from the realtime
* thread */
PW_STREAM_FLAG_NO_CONVERT = (1 << 5), /**< don't convert format */
PW_STREAM_FLAG_EXCLUSIVE = (1 << 6), /**< require exclusive access to the
* device */
};
/** Create a new unconneced \ref pw_stream \memberof pw_stream
@ -231,6 +219,13 @@ pw_stream_new(struct pw_remote *remote, /**< a \ref pw_remote */
const char *name, /**< a stream name */
struct pw_properties *props /**< stream properties, ownership is taken */);
struct pw_stream *
pw_stream_new_simple(struct pw_loop *loop, /**< a \ref pw_loop to use */
const char *name, /**< a stream name */
struct pw_properties *props,/**< stream properties, ownership is taken */
const struct pw_stream_events *events, /**< stream events */
void *data /**< data passed to events */);
/** Destroy a stream \memberof pw_stream */
void pw_stream_destroy(struct pw_stream *stream);
@ -243,6 +238,8 @@ enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char **
const char *pw_stream_get_name(struct pw_stream *stream);
struct pw_remote *pw_stream_get_remote(struct pw_stream *stream);
/** Indicates that the stream is live, boolean default false */
#define PW_STREAM_PROP_IS_LIVE "pipewire.latency.is-live"
/** The minimum latency of the stream, int, default 0 */
@ -255,9 +252,8 @@ const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream);
/** Connect a stream for input or output on \a port_path. \memberof pw_stream
* \return 0 on success < 0 on error.
*
* When \a mode is \ref PW_STREAM_MODE_BUFFER, you should connect to the new-buffer
* event and use pw_stream_peek_buffer() to get the latest metadata and
* data. */
* You should connect to the process event and use pw_stream_dequeue_buffer()
* to get the latest metadata and data. */
int
pw_stream_connect(struct pw_stream *stream, /**< a \ref pw_stream */
enum pw_direction direction, /**< the stream direction */
@ -286,40 +282,45 @@ int pw_stream_disconnect(struct pw_stream *stream);
void
pw_stream_finish_format(struct pw_stream *stream, /**< a \ref pw_stream */
int res, /**< a result code */
struct spa_pod **params, /**< an array of params. The params should
const struct spa_pod **params, /**< an array of params. The params should
* ideally contain parameters for doing
* buffer allocation. */
uint32_t n_params /**< number of elements in \a params */);
/** Audio controls */
#define PW_STREAM_CONTROL_VOLUME "volume"
/** Video controls */
#define PW_STREAM_CONTROL_CONTRAST "contrast"
#define PW_STREAM_CONTROL_BRIGHTNESS "brightness"
#define PW_STREAM_CONTROL_HUE "hue"
#define PW_STREAM_CONTROL_SATURATION "saturation"
/** Set a control value */
int pw_stream_set_control(struct pw_stream *stream, const char *name, float value);
/** Get a control value */
int pw_stream_get_control(struct pw_stream *stream, const char *name, float *value);
/** Activate or deactivate the stream \memberof pw_stream */
int pw_stream_set_active(struct pw_stream *stream, bool active);
/** A time structure \memberof pw_stream */
struct pw_time {
int64_t now; /**< the monotonic time */
int64_t ticks; /**< the ticks at \a now */
struct spa_fraction rate; /**< the rate of \a ticks */
};
/** Query the time on the stream \memberof pw_stream */
int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time);
/** Get the id of an empty buffer that can be filled \memberof pw_stream
* \return the id of an empty buffer or \ref SPA_ID_INVALID when no buffer is
* available. */
uint32_t pw_stream_get_empty_buffer(struct pw_stream *stream);
/** Get a buffer that can be filled for playback streams or consumed
* for capture streams. */
struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream);
/** Recycle the buffer with \a id \memberof pw_stream
* \return 0 on success, < 0 when \a id is invalid or not a used buffer
* Let the PipeWire server know that it can reuse the buffer with \a id. */
int pw_stream_recycle_buffer(struct pw_stream *stream, uint32_t id);
/** Submit a buffer for playback or recycle a buffer for capture. */
int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer);
/** Get the buffer with \a id from \a stream \memberof pw_stream
* \return a \ref spa_buffer or NULL when there is no buffer
*
* This function should be called from the new-buffer event. */
struct spa_buffer *
pw_stream_peek_buffer(struct pw_stream *stream, uint32_t id);
/** Send a buffer with \a id to \a stream \memberof pw_stream
* \return 0 when \a id was handled, < 0 on error
*
* For provider or playback streams, this function should be called whenever
* there is a new buffer available. */
int pw_stream_send_buffer(struct pw_stream *stream, uint32_t id);
#ifdef __cplusplus
}