More work on implementing remote protocol

Rework things so that we negotiate buffer pools beforehand and only pass
buffer ids around We can then remove the refcount of buffers, events and
commands.
More work on buffer reuse
Use the node state changes to trigger the next step in the configuration
sequence.
Move most of the client-node to a plugin
Do buffer allocation in the port link.
This commit is contained in:
Wim Taymans 2016-08-02 16:34:44 +02:00
parent 05829f33e6
commit 3ace7e9648
36 changed files with 1780 additions and 1450 deletions

View file

@ -25,6 +25,8 @@
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <dlfcn.h>
#include <poll.h>
#include <gio/gunixfdlist.h>
@ -42,24 +44,17 @@
#include "spa/include/spa/control.h"
#define MAX_BUFFER_SIZE 1024
#define MAX_FDS 16
struct _PinosClientNodePrivate
{
int fd;
GSource *socket_source;
GSocket *sockets[2];
SpaControl recv_control;
SpaPollFd fds[16];
unsigned int n_fds;
SpaPollItem poll;
guint8 recv_data[MAX_BUFFER_SIZE];
int recv_fds[MAX_FDS];
guint8 send_data[MAX_BUFFER_SIZE];
int send_fds[MAX_FDS];
GHashTable *mem_ids;
gboolean running;
pthread_t thread;
};
#define PINOS_CLIENT_NODE_GET_PRIVATE(obj) \
@ -110,193 +105,6 @@ pinos_client_node_set_property (GObject *_object,
}
}
static gboolean
parse_control (PinosClientNode *cnode,
SpaControl *ctrl)
{
PinosNode *node = PINOS_NODE (cnode);
PinosClientNodePrivate *priv = cnode->priv;
SpaControlIter it;
spa_control_iter_init (&it, ctrl);
while (spa_control_iter_next (&it) == SPA_RESULT_OK) {
SpaControlCmd cmd = spa_control_iter_get_cmd (&it);
switch (cmd) {
case SPA_CONTROL_CMD_ADD_PORT:
case SPA_CONTROL_CMD_REMOVE_PORT:
case SPA_CONTROL_CMD_SET_FORMAT:
case SPA_CONTROL_CMD_SET_PROPERTY:
case SPA_CONTROL_CMD_END_CONFIGURE:
case SPA_CONTROL_CMD_PAUSE:
case SPA_CONTROL_CMD_START:
case SPA_CONTROL_CMD_STOP:
g_warning ("client-node %p: got unexpected control %d", node, cmd);
break;
case SPA_CONTROL_CMD_NODE_UPDATE:
case SPA_CONTROL_CMD_PORT_UPDATE:
case SPA_CONTROL_CMD_PORT_REMOVED:
g_warning ("client-node %p: command not implemented %d", node, cmd);
break;
case SPA_CONTROL_CMD_START_CONFIGURE:
{
SpaControlBuilder builder;
SpaControl control;
guint8 buffer[1024];
/* set port format */
/* send end-configure */
spa_control_builder_init_into (&builder, buffer, 1024, NULL, 0);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_END_CONFIGURE, NULL);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("client-node %p: error writing control", node);
break;
}
case SPA_CONTROL_CMD_PORT_STATUS_CHANGE:
{
g_warning ("client-node %p: command not implemented %d", node, cmd);
break;
}
case SPA_CONTROL_CMD_START_ALLOC:
{
SpaControlBuilder builder;
SpaControl control;
guint8 buffer[1024];
GList *ports, *walk;
/* FIXME read port memory requirements */
/* FIXME add_mem */
/* send start */
spa_control_builder_init_into (&builder, buffer, 1024, NULL, 0);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_START, NULL);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd) < 0)
g_warning ("client-node %p: error writing control", node);
ports = pinos_node_get_ports (node);
for (walk = ports; walk; walk = g_list_next (walk)) {
PinosPort *port = walk->data;
pinos_port_activate (port);
}
break;
}
case SPA_CONTROL_CMD_NEED_INPUT:
{
break;
}
case SPA_CONTROL_CMD_HAVE_OUTPUT:
{
break;
}
case SPA_CONTROL_CMD_ADD_MEM:
break;
case SPA_CONTROL_CMD_REMOVE_MEM:
break;
case SPA_CONTROL_CMD_ADD_BUFFER:
break;
case SPA_CONTROL_CMD_REMOVE_BUFFER:
break;
case SPA_CONTROL_CMD_PROCESS_BUFFER:
{
break;
}
case SPA_CONTROL_CMD_REUSE_BUFFER:
{
break;
}
default:
g_warning ("client-node %p: command unhandled %d", node, cmd);
break;
}
}
spa_control_iter_end (&it);
return TRUE;
}
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
PinosClientNode *node = user_data;
PinosClientNodePrivate *priv = node->priv;
switch (condition) {
case G_IO_IN:
{
SpaControl *control = &priv->recv_control;
if (spa_control_read (control,
priv->fd,
priv->recv_data,
MAX_BUFFER_SIZE,
priv->recv_fds,
MAX_FDS) < 0) {
g_warning ("client-node %p: failed to read buffer", node);
return TRUE;
}
parse_control (node, control);
#if 0
if (!pinos_port_receive_buffer (priv->port, buffer, &error)) {
g_warning ("client-node %p: port %p failed to receive buffer: %s", node, priv->port, error->message);
g_clear_error (&error);
}
#endif
spa_control_clear (control);
break;
}
case G_IO_OUT:
g_warning ("can do IO OUT\n");
break;
default:
break;
}
return TRUE;
}
static void
handle_socket (PinosClientNode *node, GSocket *socket)
{
PinosClientNodePrivate *priv = node->priv;
GMainContext *context = g_main_context_get_thread_default();
g_debug ("client-node %p: handle socket in context %p", node, context);
priv->fd = g_socket_get_fd (socket);
priv->socket_source = g_socket_create_source (socket, G_IO_IN, NULL);
g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, node, NULL);
g_source_attach (priv->socket_source, context);
}
static void
unhandle_socket (PinosClientNode *node)
{
PinosClientNodePrivate *priv = node->priv;
g_debug ("client-node %p: unhandle socket", node);
if (priv->socket_source) {
g_source_destroy (priv->socket_source);
g_clear_pointer (&priv->socket_source, g_source_unref);
priv->fd = -1;
}
}
/**
* pinos_client_node_get_socket_pair:
* @node: a #PinosClientNode
@ -308,15 +116,19 @@ unhandle_socket (PinosClientNode *node)
* Returns: a #GSocket that can be used to send/receive buffers to node.
*/
GSocket *
pinos_client_node_get_socket_pair (PinosClientNode *node,
GError **error)
pinos_client_node_get_socket_pair (PinosClientNode *this,
GError **error)
{
PinosNode *node;
PinosClientNodePrivate *priv;
g_return_val_if_fail (PINOS_IS_CLIENT_NODE (node), FALSE);
priv = node->priv;
g_return_val_if_fail (PINOS_IS_CLIENT_NODE (this), FALSE);
node = PINOS_NODE (this);
priv = this->priv;
if (priv->sockets[1] == NULL) {
SpaProps *props;
SpaPropValue value;
int fd[2];
if (socketpair (AF_UNIX, SOCK_STREAM, 0, fd) != 0)
@ -330,7 +142,14 @@ pinos_client_node_get_socket_pair (PinosClientNode *node,
if (priv->sockets[1] == NULL)
goto create_failed;
handle_socket (node, priv->sockets[0]);
priv->fd = g_socket_get_fd (priv->sockets[0]);
spa_node_get_props (node->node, &props);
value.type = SPA_PROP_TYPE_INT;
value.value = &priv->fd;
value.size = sizeof (int);
spa_props_set_prop (props, spa_props_index_for_name (props, "socket"), &value);
spa_node_set_props (node->node, props);
}
return g_object_ref (priv->sockets[1]);
@ -351,156 +170,169 @@ create_failed:
}
}
static void
on_format_change (GObject *obj,
GParamSpec *pspec,
gpointer user_data)
{
PinosClientNode *node = user_data;
PinosClientNodePrivate *priv = node->priv;
GBytes *format;
SpaControl control;
SpaControlBuilder builder;
SpaControlCmdSetFormat sf;
guint8 buf[1024];
g_object_get (obj, "format", &format, NULL);
if (format == NULL)
return ;
g_debug ("port %p: format change %s", obj, (gchar*)g_bytes_get_data (format, NULL));
spa_control_builder_init_into (&builder, buf, 1024, NULL, 0);
sf.port = 0;
sf.format = NULL;
sf.str = g_bytes_get_data (format, NULL);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_SET_FORMAT, &sf);
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd))
g_warning ("client-node %p: error writing control", node);
}
static int
tmpfile_create (void *data, gsize size)
{
char filename[] = "/dev/shm/tmpfilepay.XXXXXX";
int fd;
fd = mkostemp (filename, O_CLOEXEC);
if (fd == -1) {
g_debug ("Failed to create temporary file: %s", strerror (errno));
return -1;
}
unlink (filename);
if (write (fd, data, size) != (gssize) size)
g_debug ("Failed to write data: %s", strerror (errno));
return fd;
}
typedef struct {
SpaBuffer buffer;
SpaData datas[16];
int idx[16];
SpaBuffer *orig;
} MyBuffer;
static gboolean
on_received_buffer (PinosPort *port, SpaBuffer *buffer, GError **error, gpointer user_data)
on_received_buffer (PinosPort *port, uint32_t buffer_id, GError **error, gpointer user_data)
{
PinosClientNode *node = user_data;
PinosClientNodePrivate *priv = node->priv;
SpaControl control;
SpaControlBuilder builder;
guint8 buf[1024];
int fds[16];
SpaControlCmdAddBuffer ab;
SpaControlCmdProcessBuffer pb;
SpaControlCmdRemoveBuffer rb;
bool tmpfile = false;
PinosNode *node = user_data;
PinosClientNode *this = PINOS_CLIENT_NODE (node);
PinosClientNodePrivate *priv = this->priv;
SpaResult res;
SpaInputInfo info[1];
if (pinos_port_get_direction (port) == PINOS_DIRECTION_OUTPUT) {
/* FIXME, does not happen */
spa_control_builder_init_into (&builder, buf, 1024, NULL, 0);
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_HAVE_OUTPUT, NULL);
spa_control_builder_end (&builder, &control);
info[0].port_id = port->id;
info[0].buffer_id = buffer_id;
info[0].flags = SPA_INPUT_FLAG_NONE;
info[0].offset = 0;
info[0].size = -1;
if (spa_control_write (&control, priv->fd)) {
g_warning ("client-node %p: error writing control", node);
return FALSE;
}
} else {
unsigned int i;
MyBuffer b;
spa_control_builder_init_into (&builder, buf, 1024, fds, 16);
b.buffer.refcount = 1;
b.buffer.notify = NULL;
b.buffer.id = buffer->id;
b.buffer.size = buffer->size;
b.buffer.n_metas = buffer->n_metas;
b.buffer.metas = buffer->metas;
b.buffer.n_datas = buffer->n_datas;
b.buffer.datas = b.datas;
for (i = 0; i < buffer->n_datas; i++) {
SpaData *d = &buffer->datas[i];
int fd;
SpaControlCmdAddMem am;
if (d->type == SPA_DATA_TYPE_FD) {
fd = *((int *)d->ptr);
} else {
fd = tmpfile_create (d->ptr, d->size + d->offset);
tmpfile = true;
}
am.port = 0;
am.id = i;
am.type = 0;
am.fd_index = spa_control_builder_add_fd (&builder, fd, tmpfile ? true : false);
am.offset = 0;
am.size = d->offset + d->size;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am);
b.idx[i] = i;
b.datas[i].type = SPA_DATA_TYPE_MEMID;
b.datas[i].ptr_type = NULL;
b.datas[i].ptr = &b.idx[i];
b.datas[i].offset = d->offset;
b.datas[i].size = d->size;
b.datas[i].stride = d->stride;
}
ab.port = 0;
ab.buffer = &b.buffer;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_BUFFER, &ab);
pb.port = 0;
pb.id = b.buffer.id;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb);
rb.port = 0;
rb.id = b.buffer.id;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REMOVE_BUFFER, &rb);
for (i = 0; i < buffer->n_datas; i++) {
SpaControlCmdRemoveMem rm;
rm.port = 0;
rm.id = i;
spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_REMOVE_MEM, &rm);
}
spa_control_builder_end (&builder, &control);
if (spa_control_write (&control, priv->fd))
g_warning ("client-node %p: error writing control", node);
spa_control_clear (&control);
}
if ((res = spa_node_port_push_input (node->node, 1, info)) < 0)
g_warning ("client-node %p: error pushing buffer: %d, %d", node, res, info[0].status);
return TRUE;
}
static gboolean
on_received_event (PinosPort *port, SpaEvent *event, GError **error, gpointer user_data)
{
PinosNode *node = user_data;
PinosClientNode *this = PINOS_CLIENT_NODE (node);
PinosClientNodePrivate *priv = this->priv;
SpaResult res;
if ((res = spa_node_port_push_event (node->node, port->id, event)) < 0)
g_warning ("client-node %p: error pushing event: %d", node, res);
return TRUE;
}
static void *
loop (void *user_data)
{
PinosClientNode *this = user_data;
PinosClientNodePrivate *priv = this->priv;
int r;
g_debug ("client-node %p: enter thread", this);
while (priv->running) {
SpaPollNotifyData ndata;
r = poll ((struct pollfd *) priv->fds, priv->n_fds, -1);
if (r < 0) {
if (errno == EINTR)
continue;
break;
}
if (r == 0) {
g_debug ("client-node %p: select timeout", this);
break;
}
if (priv->poll.after_cb) {
ndata.fds = priv->poll.fds;
ndata.n_fds = priv->poll.n_fds;
ndata.user_data = priv->poll.user_data;
priv->poll.after_cb (&ndata);
}
}
g_debug ("client-node %p: leave thread", this);
return NULL;
}
static void
start_thread (PinosClientNode *this)
{
PinosClientNodePrivate *priv = this->priv;
int err;
if (!priv->running) {
priv->running = true;
if ((err = pthread_create (&priv->thread, NULL, loop, this)) != 0) {
g_debug ("client-node %p: can't create thread", strerror (err));
priv->running = false;
}
}
}
static void
stop_thread (PinosClientNode *this)
{
PinosClientNodePrivate *priv = this->priv;
if (priv->running) {
priv->running = false;
pthread_join (priv->thread, NULL);
}
}
static void
on_node_event (SpaNode *node, SpaEvent *event, void *user_data)
{
PinosClientNode *this = user_data;
PinosClientNodePrivate *priv = this->priv;
switch (event->type) {
case SPA_EVENT_TYPE_STATE_CHANGE:
{
SpaEventStateChange *sc = event->data;
switch (sc->state) {
case SPA_NODE_STATE_CONFIGURE:
{
GList *ports, *walk;
ports = pinos_node_get_ports (PINOS_NODE (this));
for (walk = ports; walk; walk = g_list_next (walk)) {
PinosPort *port = walk->data;
pinos_port_activate (port);
}
}
default:
break;
}
break;
}
case SPA_EVENT_TYPE_ADD_POLL:
{
SpaPollItem *poll = event->data;
priv->poll = *poll;
priv->fds[0] = poll->fds[0];
priv->n_fds = 1;
priv->poll.fds = priv->fds;
start_thread (this);
break;
}
case SPA_EVENT_TYPE_REMOVE_POLL:
{
stop_thread (this);
break;
}
case SPA_EVENT_TYPE_REUSE_BUFFER:
{
PinosPort *port;
GError *error = NULL;
port = pinos_node_find_port (PINOS_NODE (this), event->port_id);
pinos_port_send_event (port, event, &error);
break;
}
default:
g_debug ("client-node %p: got event %d", this, event->type);
break;
}
}
static void
setup_node (PinosClientNode *this)
{
PinosNode *node = PINOS_NODE (this);
SpaResult res;
if ((res = spa_node_set_event_callback (node->node, on_node_event, this)) < 0)
g_warning ("client-node %p: error setting callback", this);
}
static PinosPort *
add_port (PinosNode *node,
PinosDirection direction,
@ -509,12 +341,13 @@ add_port (PinosNode *node,
{
PinosPort *port;
if (spa_node_add_port (node->node, direction, id) < 0)
g_warning ("client-node %p: error adding port", node);
port = PINOS_NODE_CLASS (pinos_client_node_parent_class)->add_port (node, direction, id, error);
if (port) {
pinos_port_set_received_buffer_cb (port, on_received_buffer, node, NULL);
g_signal_connect (port, "notify::format", (GCallback) on_format_change, node);
pinos_port_set_received_cb (port, on_received_buffer, on_received_event, node, NULL);
}
return port;
}
@ -523,16 +356,18 @@ static gboolean
remove_port (PinosNode *node,
guint id)
{
if (spa_node_remove_port (node->node, id) < 0)
g_warning ("client-node %p: error removing port", node);
return PINOS_NODE_CLASS (pinos_client_node_parent_class)->remove_port (node, id);
}
static void
pinos_client_node_dispose (GObject * object)
{
PinosClientNode *node = PINOS_CLIENT_NODE (object);
PinosClientNode *this = PINOS_CLIENT_NODE (object);
g_debug ("client-node %p: dispose", node);
unhandle_socket (node);
g_debug ("client-node %p: dispose", this);
G_OBJECT_CLASS (pinos_client_node_parent_class)->dispose (object);
}
@ -540,11 +375,9 @@ pinos_client_node_dispose (GObject * object)
static void
pinos_client_node_finalize (GObject * object)
{
PinosClientNode *node = PINOS_CLIENT_NODE (object);
PinosClientNodePrivate *priv = node->priv;
PinosClientNode *this = PINOS_CLIENT_NODE (object);
g_debug ("client-node %p: finalize", node);
g_hash_table_unref (priv->mem_ids);
g_debug ("client-node %p: finalize", this);
G_OBJECT_CLASS (pinos_client_node_parent_class)->finalize (object);
}
@ -552,11 +385,13 @@ pinos_client_node_finalize (GObject * object)
static void
pinos_client_node_constructed (GObject * object)
{
PinosClientNode *node = PINOS_CLIENT_NODE (object);
PinosClientNode *this = PINOS_CLIENT_NODE (object);
g_debug ("client-node %p: constructed", node);
g_debug ("client-node %p: constructed", this);
G_OBJECT_CLASS (pinos_client_node_parent_class)->constructed (object);
setup_node (this);
}
static void
@ -584,3 +419,85 @@ pinos_client_node_init (PinosClientNode * node)
g_debug ("client-node %p: new", node);
}
static SpaResult
make_node (SpaNode **node, const char *lib, const char *name)
{
SpaHandle *handle;
SpaResult res;
void *hnd, *state = NULL;
SpaEnumHandleFactoryFunc enum_func;
if ((hnd = dlopen (lib, RTLD_NOW)) == NULL) {
g_error ("can't load %s: %s", lib, dlerror());
return SPA_RESULT_ERROR;
}
if ((enum_func = dlsym (hnd, "spa_enum_handle_factory")) == NULL) {
g_error ("can't find enum function");
return SPA_RESULT_ERROR;
}
while (true) {
const SpaHandleFactory *factory;
void *iface;
if ((res = enum_func (&factory, &state)) < 0) {
if (res != SPA_RESULT_ENUM_END)
g_error ("can't enumerate factories: %d", res);
break;
}
if (strcmp (factory->name, name))
continue;
handle = calloc (1, factory->size);
if ((res = factory->init (factory, handle)) < 0) {
g_error ("can't make factory instance: %d", res);
return res;
}
if ((res = handle->get_interface (handle, SPA_INTERFACE_ID_NODE, &iface)) < 0) {
g_error ("can't get interface %d", res);
return res;
}
*node = iface;
return SPA_RESULT_OK;
}
return SPA_RESULT_ERROR;
}
/**
* pinos_client_node_new:
* @daemon: a #PinosDaemon
* @sender: the path of the owner
* @name: a name
* @properties: extra properties
*
* Create a new #PinosNode.
*
* Returns: a new #PinosNode
*/
PinosNode *
pinos_client_node_new (PinosDaemon *daemon,
const gchar *sender,
const gchar *name,
PinosProperties *properties)
{
SpaNode *n;
SpaResult res;
g_return_val_if_fail (PINOS_IS_DAEMON (daemon), NULL);
if ((res = make_node (&n,
"spa/build/plugins/remote/libspa-remote.so",
"proxy")) < 0) {
g_error ("can't create proxy: %d", res);
return NULL;
}
return g_object_new (PINOS_TYPE_CLIENT_NODE,
"daemon", daemon,
"sender", sender,
"name", name,
"properties", properties,
"node", n,
NULL);
}