Rework transport protocol

Remove the old PinosBuffer object and replace it with SpaControl, this
communication protocol is designed to make it possible to implement
remote nodes and so it is moved to Spa.
Move SpaBuffer into to API
Work on easier API to make formats, implement enumeration and support
for all formats in v4l2.
Improve format output in -inspect
This commit is contained in:
Wim Taymans 2016-07-28 21:19:20 +02:00
parent b795fb851f
commit 4cb90f3b86
37 changed files with 2658 additions and 1032 deletions

View file

@ -31,10 +31,37 @@
#include "pinos/client/enumtypes.h"
#include "pinos/client/private.h"
#include "spa/include/spa/control.h"
#define MAX_BUFFER_SIZE 1024
#define MAX_FDS 16
typedef struct {
bool cleanup;
uint32_t id;
int fd;
} MemId;
static void
clear_mem_id (MemId *id)
{
close (id->fd);
id->fd = -1;
}
typedef struct {
bool cleanup;
uint32_t id;
SpaBuffer *buf;
} BufferId;
static void
clear_buffer_id (BufferId *id)
{
spa_buffer_unref (id->buf);
id->buf = NULL;
}
struct _PinosStreamPrivate
{
PinosContext *context;
@ -61,15 +88,18 @@ struct _PinosStreamPrivate
GSource *socket_source;
int fd;
PinosBuffer *buffer;
PinosBuffer recv_buffer;
SpaBuffer *buffer;
SpaControl *control;
SpaControl recv_control;
guint8 recv_data[MAX_BUFFER_SIZE];
int recv_fds[MAX_FDS];
guint8 send_data[MAX_BUFFER_SIZE];
int send_fds[MAX_FDS];
GHashTable *mem_ids;
GArray *mem_ids;
GArray *buffer_ids;
};
#define PINOS_STREAM_GET_PRIVATE(obj) \
@ -260,9 +290,6 @@ pinos_stream_finalize (GObject * object)
g_bytes_unref (priv->format);
g_free (priv->path);
if (priv->possible_formats)
g_bytes_unref (priv->possible_formats);
g_clear_error (&priv->error);
if (priv->properties)
@ -397,6 +424,10 @@ pinos_stream_init (PinosStream * stream)
g_debug ("new stream %p", stream);
priv->state = PINOS_STREAM_STATE_UNCONNECTED;
priv->mem_ids = g_array_sized_new (FALSE, FALSE, sizeof (MemId), 64);
g_array_set_clear_func (priv->mem_ids, (GDestroyNotify) clear_mem_id);
priv->buffer_ids = g_array_sized_new (FALSE, FALSE, sizeof (BufferId), 64);
g_array_set_clear_func (priv->buffer_ids, (GDestroyNotify) clear_buffer_id);
}
/**
@ -487,86 +518,224 @@ pinos_stream_get_error (PinosStream *stream)
return stream->priv->error;
}
static void
send_need_input (PinosStream *stream, uint32_t port)
{
PinosStreamPrivate *priv = stream->priv;
SpaControlBuilder builder;
SpaControl control;
guint8 buffer[64];
SpaControlCmdNeedInput ni;
spa_control_builder_init_into (&builder, buffer, 64, NULL, 0);
ni.port = port;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NEED_INPUT, &ni);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
}
static BufferId *
find_buffer (PinosStream *stream, uint32_t id)
{
PinosStreamPrivate *priv = stream->priv;
guint i;
for (i = 0; i < priv->buffer_ids->len; i++) {
BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i);
if (bid->id == id)
return bid;
}
return NULL;
}
static MemId *
find_mem (PinosStream *stream, uint32_t id)
{
PinosStreamPrivate *priv = stream->priv;
guint i;
for (i = 0; i < priv->mem_ids->len; i++) {
MemId *mid = &g_array_index (priv->mem_ids, MemId, i);
if (mid->id == id)
return mid;
}
return NULL;
}
static gboolean
parse_buffer (PinosStream *stream,
PinosBuffer *pbuf)
parse_control (PinosStream *stream,
SpaControl *ctrl)
{
PinosBufferIter it;
SpaControlIter it;
PinosStreamPrivate *priv = stream->priv;
pinos_buffer_iter_init (&it, pbuf);
while (pinos_buffer_iter_next (&it)) {
PinosPacketType type = pinos_buffer_iter_get_type (&it);
spa_control_iter_init (&it, ctrl);
while (spa_control_iter_next (&it) == SPA_RESULT_OK) {
SpaControlCmd cmd = spa_control_iter_get_cmd (&it);
switch (type) {
case PINOS_PACKET_TYPE_ADD_MEM:
{
PinosPacketAddMem p;
int fd;
if (!pinos_buffer_iter_parse_add_mem (&it, &p))
break;
fd = pinos_buffer_get_fd (pbuf, p.fd_index);
if (fd == -1)
break;
// g_hash_table_insert (priv->mem_ids, GINT_TO_POINTER (p.id), NULL);
switch (cmd) {
case SPA_CONTROL_CMD_NODE_UPDATE:
case SPA_CONTROL_CMD_PORT_UPDATE:
case SPA_CONTROL_CMD_PORT_REMOVED:
case SPA_CONTROL_CMD_START_CONFIGURE:
case SPA_CONTROL_CMD_PORT_STATUS_CHANGE:
case SPA_CONTROL_CMD_START_ALLOC:
case SPA_CONTROL_CMD_NEED_INPUT:
case SPA_CONTROL_CMD_HAVE_OUTPUT:
g_warning ("got unexpected control %d", cmd);
break;
}
case PINOS_PACKET_TYPE_REMOVE_MEM:
{
PinosPacketRemoveMem p;
if (!pinos_buffer_iter_parse_remove_mem (&it, &p))
break;
// g_hash_table_remove (priv->mem_ids, GINT_TO_POINTER (p.id));
case SPA_CONTROL_CMD_ADD_PORT:
case SPA_CONTROL_CMD_REMOVE_PORT:
g_warning ("add/remove port not supported");
break;
}
case PINOS_PACKET_TYPE_FORMAT_CHANGE:
{
PinosPacketFormatChange p;
if (!pinos_buffer_iter_parse_format_change (&it, &p))
case SPA_CONTROL_CMD_SET_FORMAT:
{
SpaControlCmdSetFormat p;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
if (priv->format)
g_bytes_unref (priv->format);
priv->format = g_bytes_new (p.format, strlen (p.format) + 1);
priv->format = g_bytes_new (p.str, strlen (p.str) + 1);
g_object_notify (G_OBJECT (stream), "format");
break;
}
case PINOS_PACKET_TYPE_STREAMING:
case SPA_CONTROL_CMD_SET_PROPERTY:
g_warning ("set property not implemented");
break;
case SPA_CONTROL_CMD_END_CONFIGURE:
{
SpaControlBuilder builder;
SpaControl control;
guint8 buffer[1024];
/* FIXME send update port status */
/* send start-alloc */
spa_control_builder_init_into (&builder, buffer, 1024, NULL, 0);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_START_ALLOC, NULL);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: error writing control", stream);
break;
}
case SPA_CONTROL_CMD_PAUSE:
break;
case SPA_CONTROL_CMD_START:
{
if (priv->direction == PINOS_DIRECTION_INPUT)
send_need_input (stream, 0);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break;
}
case PINOS_PACKET_TYPE_STOPPED:
case SPA_CONTROL_CMD_STOP:
{
unhandle_socket (stream);
g_clear_pointer (&priv->format, g_bytes_unref);
g_object_notify (G_OBJECT (stream), "format");
stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL);
break;
}
case PINOS_PACKET_TYPE_HEADER:
case SPA_CONTROL_CMD_ADD_MEM:
{
SpaControlCmdAddMem p;
MemId mid;
int fd;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
fd = spa_control_get_fd (ctrl, p.fd_index, false);
if (fd == -1)
break;
mid.cleanup = false;
mid.id = p.id;
mid.fd = fd;
g_array_append_val (priv->mem_ids, mid);
break;
}
case PINOS_PACKET_TYPE_PROCESS_MEM:
case SPA_CONTROL_CMD_REMOVE_MEM:
{
SpaControlCmdRemoveMem p;
MemId *mid;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
if ((mid = find_mem (stream, p.id)))
mid->cleanup = true;
break;
}
default:
g_warning ("unhandled packet %d", type);
case SPA_CONTROL_CMD_ADD_BUFFER:
{
SpaControlCmdAddBuffer p;
BufferId bid;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
bid.cleanup = false;
bid.id = p.buffer->id;
bid.buf = p.buffer;
g_array_append_val (priv->buffer_ids, bid);
break;
}
case SPA_CONTROL_CMD_REMOVE_BUFFER:
{
SpaControlCmdRemoveBuffer p;
BufferId *bid;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
if ((bid = find_buffer (stream, p.id)))
bid->cleanup = true;
break;
}
case SPA_CONTROL_CMD_PROCESS_BUFFER:
{
SpaControlCmdProcessBuffer p;
unsigned int i;
BufferId *bid;
if (spa_control_iter_parse_cmd (&it, &p) < 0)
break;
if ((bid = find_buffer (stream, p.id))) {
SpaBuffer *b = bid->buf;
for (i = 0; i < b->n_datas; i++) {
SpaData *d = &b->datas[i];
if (d->type == SPA_DATA_TYPE_MEMID) {
int id = *((int*)(d->ptr));
MemId *mid;
if ((mid = find_mem (stream, id))) {
d->type = SPA_DATA_TYPE_FD;
*((int *)(d->ptr)) = mid->fd;
}
}
}
priv->buffer = b;
}
break;
}
case SPA_CONTROL_CMD_REUSE_BUFFER:
break;
case SPA_CONTROL_CMD_INVALID:
g_warning ("unhandled command %d", cmd);
break;
}
}
pinos_buffer_iter_end (&it);
spa_control_iter_end (&it);
return TRUE;
}
@ -582,28 +751,41 @@ on_socket_condition (GSocket *socket,
switch (condition) {
case G_IO_IN:
{
PinosBuffer *buffer = &priv->recv_buffer;
GError *error = NULL;
SpaControl *control = &priv->recv_control;
guint i;
if (!pinos_io_read_buffer (priv->fd,
buffer,
priv->recv_data,
MAX_BUFFER_SIZE,
priv->recv_fds,
MAX_FDS,
&error)) {
g_warning ("stream %p: failed to read buffer: %s", stream, error->message);
g_clear_error (&error);
if (spa_control_read (control,
priv->fd,
priv->recv_data,
MAX_BUFFER_SIZE,
priv->recv_fds,
MAX_FDS) < 0) {
g_warning ("stream %p: failed to read buffer", stream);
return TRUE;
}
parse_buffer (stream, buffer);
parse_control (stream, control);
priv->buffer = buffer;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
priv->buffer = NULL;
g_assert (pinos_buffer_unref (buffer) == FALSE);
if (priv->buffer) {
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, NULL);
priv->buffer = NULL;
send_need_input (stream, 0);
}
for (i = 0; i < priv->mem_ids->len; i++) {
MemId *mid = &g_array_index (priv->mem_ids, MemId, i);
if (mid->cleanup) {
g_array_remove_index_fast (priv->mem_ids, i);
i--;
}
}
for (i = 0; i < priv->buffer_ids->len; i++) {
BufferId *bid = &g_array_index (priv->buffer_ids, BufferId, i);
if (bid->cleanup) {
g_array_remove_index_fast (priv->buffer_ids, i);
i--;
}
}
spa_control_clear (control);
break;
}
@ -851,29 +1033,39 @@ pinos_stream_connect (PinosStream *stream,
return TRUE;
}
static void
control_builder_init (PinosStream *stream, SpaControlBuilder *builder)
{
PinosStreamPrivate *priv = stream->priv;
spa_control_builder_init_into (builder,
priv->send_data,
MAX_BUFFER_SIZE,
priv->send_fds,
MAX_FDS);
}
static gboolean
do_start (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
PinosBufferBuilder builder;
PinosPacketFormatChange fc;
PinosBuffer pbuf;
GError *error = NULL;
SpaControlBuilder builder;
SpaControlCmdPortUpdate pu;
SpaControl control;
handle_socket (stream, priv->fd);
pinos_stream_buffer_builder_init (stream, &builder);
fc.port = 0;
fc.id = 0;
fc.format = priv->format ? g_bytes_get_data (priv->format, NULL) : "ANY";
pinos_buffer_builder_add_format_change (&builder, &fc);
pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_START);
pinos_buffer_builder_end (&builder, &pbuf);
control_builder_init (stream, &builder);
pu.port = 0;
pu.change_mask = 0;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_START_CONFIGURE, NULL);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("stream %p: failed to write control", stream);
if (!pinos_io_write_buffer (priv->fd, &pbuf, &error)) {
g_warning ("stream %p: failed to read buffer: %s", stream, error->message);
g_clear_error (&error);
}
g_object_unref (stream);
return FALSE;
@ -923,10 +1115,10 @@ pinos_stream_start (PinosStream *stream,
static gboolean
do_stop (PinosStream *stream)
{
PinosBufferBuilder builder;
SpaControlBuilder builder;
pinos_stream_buffer_builder_init (stream, &builder);
pinos_buffer_builder_add_empty (&builder, PINOS_PACKET_TYPE_STOP);
control_builder_init (stream, &builder);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_STOP, NULL);
g_object_unref (stream);
return FALSE;
@ -1048,9 +1240,9 @@ pinos_stream_disconnect (PinosStream *stream)
* Get the current buffer from @stream. This function should be called from
* the new-buffer signal callback.
*
* Returns: a #PinosBuffer or %NULL when there is no buffer.
* Returns: a #SpaBuffer or %NULL when there is no buffer.
*/
PinosBuffer *
SpaBuffer *
pinos_stream_peek_buffer (PinosStream *stream)
{
PinosStreamPrivate *priv;
@ -1058,37 +1250,13 @@ pinos_stream_peek_buffer (PinosStream *stream)
g_return_val_if_fail (PINOS_IS_STREAM (stream), NULL);
priv = stream->priv;
return priv->buffer;
}
/**
* pinos_stream_buffer_builder_init:
* @stream: a #PinosStream
* @builder: a #PinosBufferBuilder
*
* Get a #PinosBufferBuilder for @stream.
*
* Returns: a #PinosBuffer or %NULL when there is no buffer.
*/
void
pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *builder)
{
PinosStreamPrivate *priv;
g_return_if_fail (PINOS_IS_STREAM (stream));
priv = stream->priv;
pinos_buffer_builder_init_into (builder,
priv->send_data,
MAX_BUFFER_SIZE,
priv->send_fds,
MAX_FDS);
return spa_buffer_ref (priv->buffer);
}
/**
* pinos_stream_send_buffer:
* @stream: a #PinosStream
* @buffer: a #PinosBuffer
* @buffer: a #SpaBuffer
*
* Send a buffer to @stream.
*
@ -1102,20 +1270,17 @@ pinos_stream_buffer_builder_init (PinosStream *stream, PinosBufferBuilder *buil
*/
gboolean
pinos_stream_send_buffer (PinosStream *stream,
PinosBuffer *buffer)
SpaBuffer *buffer)
{
PinosStreamPrivate *priv;
GError *error = NULL;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (buffer != NULL, FALSE);
priv = stream->priv;
if (!pinos_io_write_buffer (priv->fd, buffer, &error)) {
#if 0
if (!spa_control_write (priv->fd, buffer, &error)) {
g_warning ("stream %p: failed to write buffer: %s", stream, error->message);
g_clear_error (&error);
return FALSE;
}
#endif
return TRUE;
}