Rework transport protocol

Use a more simple tranport protocol for the realtime data. Use a piece
of shared memory and a socket where we use a 1byte read/write to trigger
remote actions. Also use a ringbuffer for events.
This commit is contained in:
Wim Taymans 2016-11-07 10:24:13 +01:00
parent 7e46f9e3ad
commit b774b99db5
22 changed files with 797 additions and 364 deletions

View file

@ -38,6 +38,7 @@
#include "pinos/client/format.h"
#include "pinos/client/private.h"
#include "pinos/client/serialize.h"
#include "pinos/client/transport.h"
#define MAX_BUFFER_SIZE 4096
#define MAX_FDS 32
@ -90,12 +91,13 @@ struct _PinosStreamPrivate
GSocket *socket;
GSource *socket_source;
int fd;
GSocket *rtsocket;
GSource *rtsocket_source;
int rtfd;
PinosConnection *conn;
PinosConnection *rtconn;
PinosTransport *trans;
GSource *timeout_source;
@ -662,21 +664,11 @@ add_port_update (PinosStream *stream, uint32_t change_mask)
pinos_connection_add_message (priv->conn, PINOS_MESSAGE_PORT_UPDATE, &pu);
}
static void
send_need_input (PinosStream *stream, uint32_t port_id)
static inline void
send_need_input (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
PinosMessageNodeEvent cne;
SpaNodeEventNeedInput ni;
cne.event = &ni.event;
ni.event.type = SPA_NODE_EVENT_TYPE_NEED_INPUT;
ni.event.size = sizeof (ni);
ni.port_id = port_id;
pinos_connection_add_message (priv->rtconn, PINOS_MESSAGE_NODE_EVENT, &cne);
if (!pinos_connection_flush (priv->rtconn))
pinos_log_warn ("stream %p: error writing connection", stream);
pinos_transport_write_cmd (priv->trans, PINOS_TRANSPORT_CMD_NEED_DATA);
}
static void
@ -712,47 +704,6 @@ add_async_complete (PinosStream *stream,
pinos_connection_add_message (priv->conn, PINOS_MESSAGE_NODE_EVENT, &cne);
}
static void
send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id)
{
PinosStreamPrivate *priv = stream->priv;
PinosMessageNodeEvent cne;
SpaNodeEventReuseBuffer rb;
cne.event = &rb.event;
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
rb.event.size = sizeof (rb);
rb.port_id = port_id;
rb.buffer_id = buffer_id;
pinos_connection_add_message (priv->rtconn, PINOS_MESSAGE_NODE_EVENT, &cne);
if (!pinos_connection_flush (priv->rtconn))
pinos_log_warn ("stream %p: error writing connection", stream);
}
static void
send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id)
{
PinosStreamPrivate *priv = stream->priv;
PinosMessageProcessBuffer pb;
PinosMessageNodeEvent cne;
SpaNodeEventHaveOutput ho;
pb.direction = priv->direction;
pb.port_id = port_id;
pb.buffer_id = buffer_id;
pinos_connection_add_message (priv->rtconn, PINOS_MESSAGE_PROCESS_BUFFER, &pb);
cne.event = &ho.event;
ho.event.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT;
ho.event.size = sizeof (ho);
ho.port_id = port_id;
pinos_connection_add_message (priv->rtconn, PINOS_MESSAGE_NODE_EVENT, &cne);
if (!pinos_connection_flush (priv->rtconn))
pinos_log_warn ("stream %p: error writing connection", stream);
}
static void
do_node_init (PinosStream *stream)
{
@ -822,47 +773,6 @@ handle_node_event (PinosStream *stream,
return TRUE;
}
static gboolean
handle_rtnode_event (PinosStream *stream,
SpaNodeEvent *event)
{
PinosStreamPrivate *priv = stream->priv;
switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unexpected node event %d", event->type);
break;
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
pinos_log_warn ("unhandled node event %d", event->type);
break;
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
{
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
if (p->port_id != priv->port_id)
break;
if (priv->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->buffer_id))) {
bid->used = FALSE;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p->buffer_id);
}
break;
}
}
return TRUE;
}
static gboolean
handle_node_command (PinosStream *stream,
uint32_t seq,
@ -896,7 +806,7 @@ handle_node_command (PinosStream *stream,
pinos_log_warn ("stream %p: error writing connection", stream);
if (priv->direction == SPA_DIRECTION_INPUT)
send_need_input (stream, priv->port_id);
send_need_input (stream);
stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL);
break;
@ -1145,6 +1055,29 @@ parse_connection (PinosStream *stream)
break;
}
case PINOS_MESSAGE_TRANSPORT_UPDATE:
{
PinosMessageTransportUpdate p;
PinosTransportInfo info;
if (!pinos_connection_parse_message (conn, &p))
break;
info.memfd = pinos_connection_get_fd (conn, p.memfd_index, false);
if (info.memfd == -1)
break;
info.offset = p.offset;
info.size = p.size;
info.fd = priv->rtfd;
pinos_log_debug ("transport update %d %p", priv->rtfd, priv->trans);
if (priv->trans)
pinos_transport_free (priv->trans);
priv->trans = pinos_transport_new_from_info (&info);
break;
}
case PINOS_MESSAGE_INVALID:
pinos_log_warn ("unhandled message %d", type);
break;
@ -1153,73 +1086,6 @@ parse_connection (PinosStream *stream)
return TRUE;
}
static gboolean
parse_rtconnection (PinosStream *stream)
{
PinosStreamPrivate *priv = stream->priv;
PinosConnection *conn = priv->rtconn;
while (pinos_connection_has_next (conn)) {
PinosMessageType type = pinos_connection_get_type (conn);
switch (type) {
case PINOS_MESSAGE_INVALID:
case PINOS_MESSAGE_NODE_UPDATE:
case PINOS_MESSAGE_PORT_UPDATE:
case PINOS_MESSAGE_PORT_STATUS_CHANGE:
case PINOS_MESSAGE_NODE_STATE_CHANGE:
case PINOS_MESSAGE_ADD_PORT:
case PINOS_MESSAGE_REMOVE_PORT:
case PINOS_MESSAGE_SET_FORMAT:
case PINOS_MESSAGE_SET_PROPERTY:
case PINOS_MESSAGE_ADD_MEM:
case PINOS_MESSAGE_USE_BUFFERS:
case PINOS_MESSAGE_NODE_COMMAND:
pinos_log_warn ("got unexpected command %d", type);
break;
case PINOS_MESSAGE_PROCESS_BUFFER:
{
PinosMessageProcessBuffer p;
guint i;
BufferId *bid;
if (priv->direction != SPA_DIRECTION_INPUT)
break;
if (!pinos_connection_parse_message (conn, &p))
break;
if ((bid = find_buffer (stream, p.buffer_id))) {
for (i = 0; i < bid->buf->n_datas; i++) {
bid->buf->datas[i].size = bid->datas[i].size;
}
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p.buffer_id);
send_need_input (stream, priv->port_id);
}
break;
}
case PINOS_MESSAGE_NODE_EVENT:
{
PinosMessageNodeEvent p;
if (!pinos_connection_parse_message (conn, &p))
break;
handle_rtnode_event (stream, p.event);
break;
}
case PINOS_MESSAGE_PORT_COMMAND:
{
break;
}
}
}
return TRUE;
}
static gboolean
on_socket_condition (GSocket *socket,
GIOCondition condition,
@ -1244,22 +1110,95 @@ on_socket_condition (GSocket *socket,
return TRUE;
}
static gboolean
handle_rtnode_event (PinosStream *stream,
SpaNodeEvent *event)
{
PinosStreamPrivate *priv = stream->priv;
switch (event->type) {
case SPA_NODE_EVENT_TYPE_INVALID:
case SPA_NODE_EVENT_TYPE_ASYNC_COMPLETE:
case SPA_NODE_EVENT_TYPE_ERROR:
case SPA_NODE_EVENT_TYPE_BUFFERING:
case SPA_NODE_EVENT_TYPE_REQUEST_REFRESH:
case SPA_NODE_EVENT_TYPE_REQUEST_CLOCK_UPDATE:
pinos_log_warn ("unexpected node event %d", event->type);
break;
case SPA_NODE_EVENT_TYPE_HAVE_OUTPUT:
case SPA_NODE_EVENT_TYPE_NEED_INPUT:
pinos_log_warn ("unhandled node event %d", event->type);
break;
case SPA_NODE_EVENT_TYPE_REUSE_BUFFER:
{
SpaNodeEventReuseBuffer *p = (SpaNodeEventReuseBuffer *) event;
BufferId *bid;
if (p->port_id != priv->port_id)
break;
if (priv->direction != SPA_DIRECTION_OUTPUT)
break;
if ((bid = find_buffer (stream, p->buffer_id))) {
bid->used = FALSE;
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, p->buffer_id);
}
break;
}
}
return TRUE;
}
static gboolean
on_rtsocket_condition (GSocket *socket,
GIOCondition condition,
gpointer user_data)
{
PinosStream *stream = user_data;
PinosStreamPrivate *priv = stream->priv;
switch (condition) {
case G_IO_IN:
{
parse_rtconnection (stream);
uint8_t cmd;
int i;
pinos_transport_read_cmd (priv->trans, &cmd);
if (cmd & PINOS_TRANSPORT_CMD_HAVE_DATA) {
BufferId *bid;
for (i = 0; i < priv->trans->area->n_input_info; i++) {
SpaPortInputInfo *info = &priv->trans->input_info[i];
if (info->buffer_id == SPA_ID_INVALID)
continue;
if ((bid = find_buffer (stream, info->buffer_id))) {
for (i = 0; i < bid->buf->n_datas; i++) {
bid->buf->datas[i].size = bid->datas[i].size;
}
g_signal_emit (stream, signals[SIGNAL_NEW_BUFFER], 0, bid->id);
}
info->buffer_id = SPA_ID_INVALID;
}
send_need_input (stream);
}
if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) {
SpaNodeEvent event;
while (pinos_transport_next_event (priv->trans, &event) == SPA_RESULT_OK) {
SpaNodeEvent *ev = alloca (event.size);
pinos_transport_parse_event (priv->trans, ev);
handle_rtnode_event (stream, ev);
}
}
break;
}
case G_IO_OUT:
pinos_log_warn ("can do IO\n");
pinos_log_warn ("can do IO");
break;
default:
@ -1305,7 +1244,8 @@ handle_socket (PinosStream *stream, gint fd, gint rtfd)
priv->rtsocket_source = g_socket_create_source (priv->rtsocket, G_IO_IN, NULL);
g_source_set_callback (priv->rtsocket_source, (GSourceFunc) on_rtsocket_condition, stream, NULL);
g_source_attach (priv->rtsocket_source, priv->context->priv->context);
priv->rtconn = pinos_connection_new (priv->rtfd);
pinos_log_debug ("sockets %d %d", priv->fd, priv->rtfd);
priv->timeout_source = g_timeout_source_new (100);
g_source_set_callback (priv->timeout_source, (GSourceFunc) on_timeout, stream, NULL);
@ -1863,13 +1803,19 @@ pinos_stream_recycle_buffer (PinosStream *stream,
guint id)
{
PinosStreamPrivate *priv;
SpaNodeEventReuseBuffer rb;
g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE);
g_return_val_if_fail (id != SPA_ID_INVALID, FALSE);
priv = stream->priv;
g_return_val_if_fail (priv->direction == SPA_DIRECTION_INPUT, FALSE);
send_reuse_buffer (stream, priv->port_id, id);
rb.event.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER;
rb.event.size = sizeof (rb);
rb.port_id = priv->port_id;
rb.buffer_id = id;
pinos_transport_add_event (priv->trans, &rb.event);
pinos_transport_write_cmd (priv->trans, PINOS_TRANSPORT_CMD_HAVE_EVENT);
return TRUE;
}
@ -1929,7 +1875,9 @@ pinos_stream_send_buffer (PinosStream *stream,
for (i = 0; i < bid->buf->n_datas; i++) {
bid->datas[i].size = bid->buf->datas[i].size;
}
send_process_buffer (stream, priv->port_id, id);
priv->trans->output_info[0].buffer_id = id;
priv->trans->output_info[0].status = SPA_RESULT_OK;
pinos_transport_write_cmd (priv->trans, PINOS_TRANSPORT_CMD_HAVE_DATA);
return TRUE;
} else {
return FALSE;