From d711e15f0ad631371d88e2d6376277fa682b7df3 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Mon, 17 Oct 2016 18:29:05 +0200 Subject: [PATCH] Redo the control code Make a simple connection object that you can use to send and receive our commands. --- pinos/client/connection.c | 850 ++++++++++++++++ pinos/client/{control.h => connection.h} | 92 +- pinos/client/control.c | 1147 ---------------------- pinos/client/meson.build | 2 +- pinos/client/stream.c | 291 ++---- pinos/server/client-node.c | 160 +-- 6 files changed, 1007 insertions(+), 1535 deletions(-) create mode 100644 pinos/client/connection.c rename pinos/client/{control.h => connection.h} (61%) delete mode 100644 pinos/client/control.c diff --git a/pinos/client/connection.c b/pinos/client/connection.c new file mode 100644 index 000000000..f49d0d097 --- /dev/null +++ b/pinos/client/connection.c @@ -0,0 +1,850 @@ +/* Simple Plugin API + * Copyright (C) 2016 Wim Taymans + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "connection.h" +#include "serialize.h" + +#define MAX_BUFFER_SIZE 4096 +#define MAX_FDS 28 + +typedef struct { + uint8_t buffer_data[MAX_BUFFER_SIZE]; + size_t buffer_size; + int fds[MAX_FDS]; + unsigned int n_fds; + + SpaControlCmd cmd; + off_t offset; + void *data; + size_t size; +} ConnectionBuffer; + +struct _SpaConnection { + ConnectionBuffer in, out; + int fd; + bool update; +}; + +#if 0 +#define SPA_DEBUG_CONTROL(format,args...) fprintf(stderr,format,##args) +#else +#define SPA_DEBUG_CONTROL(format,args...) +#endif + +static bool +read_length (uint8_t * data, unsigned int size, size_t * length, size_t * skip) +{ + size_t len, offset; + uint8_t b; + + /* start reading the length, we need this to skip to the data later */ + len = offset = 0; + do { + if (offset >= size) + return false; + + b = data[offset++]; + len = (len << 7) | (b & 0x7f); + } while (b & 0x80); + + /* check remaining command size */ + if (size - offset < len) + return false; + + *length = len; + *skip = offset; + + return true; +} + +static void +connection_parse_node_update (SpaConnection *conn, SpaControlCmdNodeUpdate *nu) +{ + memcpy (nu, conn->in.data, sizeof (SpaControlCmdNodeUpdate)); + if (nu->props) + nu->props = spa_serialize_props_deserialize (conn->in.data, SPA_PTR_TO_INT (nu->props)); +} + +static void +connection_parse_port_update (SpaConnection *conn, SpaControlCmdPortUpdate *pu) +{ + void *p; + unsigned int i; + + memcpy (pu, conn->in.data, sizeof (SpaControlCmdPortUpdate)); + + p = conn->in.data; + + if (pu->possible_formats) + pu->possible_formats = SPA_MEMBER (p, + SPA_PTR_TO_INT (pu->possible_formats), SpaFormat *); + for (i = 0; i < pu->n_possible_formats; i++) { + if (pu->possible_formats[i]) { + pu->possible_formats[i] = spa_serialize_format_deserialize (p, + SPA_PTR_TO_INT (pu->possible_formats[i])); + } + } + if (pu->format) + pu->format = spa_serialize_format_deserialize (p, SPA_PTR_TO_INT (pu->format)); + if (pu->props) + pu->props = spa_serialize_props_deserialize (p, SPA_PTR_TO_INT (pu->props)); + if (pu->info) + pu->info = spa_serialize_port_info_deserialize (p, SPA_PTR_TO_INT (pu->info)); +} + +static void +connection_parse_set_format (SpaConnection *conn, SpaControlCmdSetFormat *cmd) +{ + memcpy (cmd, conn->in.data, sizeof (SpaControlCmdSetFormat)); + if (cmd->format) + cmd->format = spa_serialize_format_deserialize (conn->in.data, SPA_PTR_TO_INT (cmd->format)); +} + +static void +connection_parse_use_buffers (SpaConnection *conn, SpaControlCmdUseBuffers *cmd) +{ + void *p; + + p = conn->in.data; + memcpy (cmd, p, sizeof (SpaControlCmdUseBuffers)); + if (cmd->buffers) + cmd->buffers = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->buffers), SpaControlMemRef); +} + +static void +connection_parse_node_event (SpaConnection *conn, SpaControlCmdNodeEvent *cmd) +{ + void *p = conn->in.data; + memcpy (cmd, p, sizeof (SpaControlCmdNodeEvent)); + if (cmd->event) + cmd->event = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event), SpaNodeEvent); + if (cmd->event->data) + cmd->event->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event->data), void); +} + +static void +connection_parse_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cmd) +{ + void *p = conn->in.data; + memcpy (cmd, p, sizeof (SpaControlCmdNodeCommand)); + if (cmd->command) + cmd->command = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command), SpaNodeCommand); + if (cmd->command->data) + cmd->command->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command->data), void); +} + +#define MAX(a,b) ((a) > (b) ? (a) : (b)) + +static void * +connection_ensure_size (SpaConnection *conn, size_t size) +{ + if (conn->out.buffer_size + size > MAX_BUFFER_SIZE) { + fprintf (stderr, "error connection %p: overflow\n", conn); + } + return (uint8_t *) conn->out.buffer_data + conn->out.buffer_size; +} + +static void * +connection_add_cmd (SpaConnection *conn, SpaControlCmd cmd, size_t size) +{ + uint8_t *p; + unsigned int plen; + + plen = 1; + while (size >> (7 * plen)) + plen++; + + /* 1 for cmd, plen for size and size for payload */ + p = connection_ensure_size (conn, 1 + plen + size); + + conn->out.cmd = cmd; + conn->out.offset = conn->out.buffer_size; + conn->out.buffer_size += 1 + plen + size; + + *p++ = cmd; + /* write length */ + while (plen) { + plen--; + *p++ = ((plen > 0) ? 0x80 : 0) | ((size >> (7 * plen)) & 0x7f); + } + return p; +} + +static void +connection_add_node_update (SpaConnection *conn, SpaControlCmdNodeUpdate *nu) +{ + size_t len; + void *p; + SpaControlCmdNodeUpdate *d; + + /* calc len */ + len = sizeof (SpaControlCmdNodeUpdate); + len += spa_serialize_props_get_size (nu->props); + + p = connection_add_cmd (conn, SPA_CONTROL_CMD_NODE_UPDATE, len); + memcpy (p, nu, sizeof (SpaControlCmdNodeUpdate)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeUpdate), void); + if (nu->props) { + len = spa_serialize_props_serialize (p, nu->props); + d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + } else { + d->props = 0; + } +} + +static void +connection_add_port_update (SpaConnection *conn, SpaControlCmdPortUpdate *pu) +{ + size_t len; + void *p; + int i; + SpaFormat **bfa; + SpaControlCmdPortUpdate *d; + + /* calc len */ + len = sizeof (SpaControlCmdPortUpdate); + len += pu->n_possible_formats * sizeof (SpaFormat *); + for (i = 0; i < pu->n_possible_formats; i++) { + len += spa_serialize_format_get_size (pu->possible_formats[i]); + } + len += spa_serialize_format_get_size (pu->format); + len += spa_serialize_props_get_size (pu->props); + if (pu->info) + len += spa_serialize_port_info_get_size (pu->info); + + p = connection_add_cmd (conn, SPA_CONTROL_CMD_PORT_UPDATE, len); + memcpy (p, pu, sizeof (SpaControlCmdPortUpdate)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdPortUpdate), void); + bfa = p; + if (pu->n_possible_formats) + d->possible_formats = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + else + d->possible_formats = 0; + + p = SPA_MEMBER (p, sizeof (SpaFormat*) * pu->n_possible_formats, void); + + for (i = 0; i < pu->n_possible_formats; i++) { + len = spa_serialize_format_serialize (p, pu->possible_formats[i]); + bfa[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } + if (pu->format) { + len = spa_serialize_format_serialize (p, pu->format); + d->format = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } else { + d->format = 0; + } + if (pu->props) { + len = spa_serialize_props_serialize (p, pu->props); + d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } else { + d->props = 0; + } + if (pu->info) { + len = spa_serialize_port_info_serialize (p, pu->info); + d->info = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + p = SPA_MEMBER (p, len, void); + } else { + d->info = 0; + } +} + +static void +connection_add_set_format (SpaConnection *conn, SpaControlCmdSetFormat *sf) +{ + size_t len; + void *p; + + /* calculate length */ + /* port_id + format + mask */ + len = sizeof (SpaControlCmdSetFormat) + spa_serialize_format_get_size (sf->format); + p = connection_add_cmd (conn, SPA_CONTROL_CMD_SET_FORMAT, len); + memcpy (p, sf, sizeof (SpaControlCmdSetFormat)); + sf = p; + + p = SPA_MEMBER (sf, sizeof (SpaControlCmdSetFormat), void); + if (sf->format) { + len = spa_serialize_format_serialize (p, sf->format); + sf->format = SPA_INT_TO_PTR (SPA_PTRDIFF (p, sf)); + } else + sf->format = 0; +} + +static void +connection_add_use_buffers (SpaConnection *conn, SpaControlCmdUseBuffers *ub) +{ + size_t len; + int i; + SpaControlCmdUseBuffers *d; + SpaControlMemRef *mr; + + /* calculate length */ + len = sizeof (SpaControlCmdUseBuffers); + len += ub->n_buffers * sizeof (SpaControlMemRef); + + d = connection_add_cmd (conn, SPA_CONTROL_CMD_USE_BUFFERS, len); + memcpy (d, ub, sizeof (SpaControlCmdUseBuffers)); + + mr = SPA_MEMBER (d, sizeof (SpaControlCmdUseBuffers), void); + + if (d->n_buffers) + d->buffers = SPA_INT_TO_PTR (SPA_PTRDIFF (mr, d)); + else + d->buffers = 0; + + for (i = 0; i < ub->n_buffers; i++) + memcpy (&mr[i], &ub->buffers[i], sizeof (SpaControlMemRef)); +} + +static void +connection_add_node_event (SpaConnection *conn, SpaControlCmdNodeEvent *ev) +{ + size_t len; + void *p; + SpaControlCmdNodeEvent *d; + SpaNodeEvent *ne; + + /* calculate length */ + len = sizeof (SpaControlCmdNodeEvent); + len += sizeof (SpaNodeEvent); + len += ev->event->size; + + p = connection_add_cmd (conn, SPA_CONTROL_CMD_NODE_EVENT, len); + memcpy (p, ev, sizeof (SpaControlCmdNodeEvent)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeEvent), void); + d->event = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + ne = p; + memcpy (p, ev->event, sizeof (SpaNodeEvent)); + p = SPA_MEMBER (p, sizeof (SpaNodeEvent), void); + ne->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + memcpy (p, ev->event->data, ev->event->size); +} + + +static void +connection_add_node_command (SpaConnection *conn, SpaControlCmdNodeCommand *cm) +{ + size_t len; + void *p; + SpaControlCmdNodeCommand *d; + SpaNodeCommand *nc; + + /* calculate length */ + len = sizeof (SpaControlCmdNodeCommand); + len += sizeof (SpaNodeCommand); + len += cm->command->size; + + p = connection_add_cmd (conn, SPA_CONTROL_CMD_NODE_COMMAND, len); + memcpy (p, cm, sizeof (SpaControlCmdNodeCommand)); + d = p; + + p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeCommand), void); + d->command = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + + nc = p; + memcpy (p, cm->command, sizeof (SpaNodeCommand)); + p = SPA_MEMBER (p, sizeof (SpaNodeCommand), void); + nc->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); + memcpy (p, cm->command->data, cm->command->size); +} + +static SpaResult +refill_buffer (SpaConnection *conn) +{ + ssize_t len; + struct cmsghdr *cmsg; + struct msghdr msg = {0}; + struct iovec iov[1]; + char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))]; + + conn->in.cmd = SPA_CONTROL_CMD_INVALID; + conn->in.offset = 0; + conn->in.size = 0; + conn->in.buffer_size = 0; + + iov[0].iov_base = conn->in.buffer_data; + iov[0].iov_len = MAX_BUFFER_SIZE; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + msg.msg_control = cmsgbuf; + msg.msg_controllen = sizeof (cmsgbuf); + msg.msg_flags = MSG_CMSG_CLOEXEC; + + while (true) { + len = recvmsg (conn->fd, &msg, msg.msg_flags); + if (len < 0) { + if (errno == EINTR) + continue; + else + goto recv_error; + } + break; + } + + if (len < 4) + return SPA_RESULT_ERROR; + + conn->in.buffer_size = len; + + /* handle control messages */ + for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) + continue; + + conn->in.n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int); + memcpy (conn->in.fds, CMSG_DATA (cmsg), conn->in.n_fds * sizeof (int)); + } + SPA_DEBUG_CONTROL ("connection %p: %d read %zd bytes and %d fds\n", conn, conn->fd, len, conn->in.n_fds); + + return SPA_RESULT_OK; + + /* ERRORS */ +recv_error: + { + fprintf (stderr, "could not recvmsg on fd %d: %s\n", conn->fd, strerror (errno)); + return SPA_RESULT_ERROR; + } +} + +SpaConnection * +spa_connection_new (int fd) +{ + SpaConnection *c; + + c = calloc (1, sizeof (SpaConnection)); + c->fd = fd; + + return c; +} + +/** + * spa_connection_has_next: + * @iter: a #SpaConnection + * + * Move to the next packet in @conn. + * + * Returns: %SPA_RESULT_OK if more packets are available. + */ +SpaResult +spa_connection_has_next (SpaConnection *conn) +{ + size_t len, size, skip; + uint8_t *data; + + if (conn == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + /* move to next packet */ + conn->in.offset += conn->in.size; + + if (conn->update) { + refill_buffer (conn); + conn->update = false; + } + + /* now read packet */ + data = conn->in.buffer_data; + size = conn->in.buffer_size; + + if (conn->in.offset >= size) { + conn->update = true; + return SPA_RESULT_ERROR; + } + + data += conn->in.offset; + size -= conn->in.offset; + + if (size < 1) + return SPA_RESULT_ERROR; + + conn->in.cmd = *data; + + data++; + size--; + + if (!read_length (data, size, &len, &skip)) + return SPA_RESULT_ERROR; + + conn->in.size = len; + conn->in.data = data + skip; + conn->in.offset += 1 + skip; + + return SPA_RESULT_OK; +} + +SpaControlCmd +spa_connection_get_cmd (SpaConnection *conn) +{ + if (conn == NULL) + return SPA_CONTROL_CMD_INVALID; + + return conn->in.cmd; +} + +SpaResult +spa_connection_parse_cmd (SpaConnection *conn, + void *command) +{ + SpaResult res = SPA_RESULT_OK; + + if (conn == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + switch (conn->in.cmd) { + /* C -> S */ + case SPA_CONTROL_CMD_NODE_UPDATE: + connection_parse_node_update (conn, command); + break; + + case SPA_CONTROL_CMD_PORT_UPDATE: + connection_parse_port_update (conn, command); + break; + + case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: + fprintf (stderr, "implement iter of %d\n", conn->in.cmd); + break; + + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + if (conn->in.size < sizeof (SpaControlCmdNodeStateChange)) + return SPA_RESULT_ERROR; + memcpy (command, conn->in.data, sizeof (SpaControlCmdNodeStateChange)); + break; + + /* S -> C */ + case SPA_CONTROL_CMD_ADD_PORT: + if (conn->in.size < sizeof (SpaControlCmdAddPort)) + return SPA_RESULT_ERROR; + memcpy (command, conn->in.data, sizeof (SpaControlCmdAddPort)); + break; + + case SPA_CONTROL_CMD_REMOVE_PORT: + if (conn->in.size < sizeof (SpaControlCmdRemovePort)) + return SPA_RESULT_ERROR; + memcpy (command, conn->in.data, sizeof (SpaControlCmdRemovePort)); + break; + + case SPA_CONTROL_CMD_SET_FORMAT: + connection_parse_set_format (conn, command); + break; + + case SPA_CONTROL_CMD_SET_PROPERTY: + fprintf (stderr, "implement iter of %d\n", conn->in.cmd); + break; + + /* bidirectional */ + case SPA_CONTROL_CMD_ADD_MEM: + if (conn->in.size < sizeof (SpaControlCmdAddMem)) + return SPA_RESULT_ERROR; + memcpy (command, conn->in.data, sizeof (SpaControlCmdAddMem)); + break; + + case SPA_CONTROL_CMD_REMOVE_MEM: + if (conn->in.size < sizeof (SpaControlCmdRemoveMem)) + return SPA_RESULT_ERROR; + memcpy (command, conn->in.data, sizeof (SpaControlCmdRemoveMem)); + break; + + case SPA_CONTROL_CMD_USE_BUFFERS: + connection_parse_use_buffers (conn, command); + break; + + case SPA_CONTROL_CMD_PROCESS_BUFFER: + if (conn->in.size < sizeof (SpaControlCmdProcessBuffer)) + return SPA_RESULT_ERROR; + memcpy (command, conn->in.data, sizeof (SpaControlCmdProcessBuffer)); + break; + + case SPA_CONTROL_CMD_NODE_EVENT: + connection_parse_node_event (conn, command); + break; + + case SPA_CONTROL_CMD_NODE_COMMAND: + connection_parse_node_command (conn, command); + break; + + case SPA_CONTROL_CMD_INVALID: + return SPA_RESULT_ERROR; + } + return res; +} + +/** + * spa_connection_get_fd: + * @conn: a #SpaConnection + * @index: an index + * @steal: steal the fd + * + * Get the file descriptor at @index in @conn. + * + * Returns: a file descriptor at @index in @conn. The file descriptor + * is not duplicated in any way. -1 is returned on error. + */ +int +spa_connection_get_fd (SpaConnection *conn, + unsigned int index, + bool close) +{ + int fd; + + if (conn == NULL) + return -1; + + if (conn->in.n_fds < index) + return -1; + + fd = conn->in.fds[index]; + if (fd < 0) + fd = -fd; + conn->in.fds[index] = close ? fd : -fd; + + return fd; +} + +/** + * spa_connection_add_fd: + * @conn: a #SpaConnection + * @fd: a valid fd + * @close: if the descriptor should be closed when sent + * + * Add the file descriptor @fd to @builder. + * + * Returns: the index of the file descriptor in @builder. + */ +int +spa_connection_add_fd (SpaConnection *conn, + int fd, + bool close) +{ + int index, i; + + if (conn == NULL) + return -1; + + for (i = 0; i < conn->out.n_fds; i++) { + if (conn->out.fds[i] == fd || conn->out.fds[i] == -fd) + return i; + } + + index = conn->out.n_fds; + conn->out.fds[index] = close ? fd : -fd; + conn->out.n_fds++; + + return index; +} + +/** + * spa_connection_add_cmd: + * @conn: a #SpaConnection + * @cmd: a #SpaControlCmd + * @command: a command + * + * Add a @cmd to @conn with data from @command. + * + * Returns: %SPA_RESULT_OK on success. + */ +SpaResult +spa_connection_add_cmd (SpaConnection *conn, + SpaControlCmd cmd, + void *command) +{ + void *p; + + if (conn == NULL || command == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + switch (cmd) { + /* C -> S */ + case SPA_CONTROL_CMD_NODE_UPDATE: + connection_add_node_update (conn, command); + break; + + case SPA_CONTROL_CMD_PORT_UPDATE: + connection_add_port_update (conn, command); + break; + + case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: + p = connection_add_cmd (conn, cmd, 0); + break; + + case SPA_CONTROL_CMD_NODE_STATE_CHANGE: + p = connection_add_cmd (conn, cmd, sizeof (SpaControlCmdNodeStateChange)); + memcpy (p, command, sizeof (SpaControlCmdNodeStateChange)); + break; + + /* S -> C */ + case SPA_CONTROL_CMD_ADD_PORT: + p = connection_add_cmd (conn, cmd, sizeof (SpaControlCmdAddPort)); + memcpy (p, command, sizeof (SpaControlCmdAddPort)); + break; + + case SPA_CONTROL_CMD_REMOVE_PORT: + p = connection_add_cmd (conn, cmd, sizeof (SpaControlCmdRemovePort)); + memcpy (p, command, sizeof (SpaControlCmdRemovePort)); + break; + + case SPA_CONTROL_CMD_SET_FORMAT: + connection_add_set_format (conn, command); + break; + + case SPA_CONTROL_CMD_SET_PROPERTY: + fprintf (stderr, "implement builder of %d\n", cmd); + break; + + /* bidirectional */ + case SPA_CONTROL_CMD_ADD_MEM: + p = connection_add_cmd (conn, cmd, sizeof (SpaControlCmdAddMem)); + memcpy (p, command, sizeof (SpaControlCmdAddMem)); + break; + + case SPA_CONTROL_CMD_REMOVE_MEM: + p = connection_add_cmd (conn, cmd, sizeof (SpaControlCmdRemoveMem)); + memcpy (p, command, sizeof (SpaControlCmdRemoveMem)); + break; + + case SPA_CONTROL_CMD_USE_BUFFERS: + connection_add_use_buffers (conn, command); + break; + + case SPA_CONTROL_CMD_PROCESS_BUFFER: + p = connection_add_cmd (conn, cmd, sizeof (SpaControlCmdProcessBuffer)); + memcpy (p, command, sizeof (SpaControlCmdProcessBuffer)); + break; + + case SPA_CONTROL_CMD_NODE_EVENT: + connection_add_node_event (conn, command); + break; + + case SPA_CONTROL_CMD_NODE_COMMAND: + connection_add_node_command (conn, command); + break; + + case SPA_CONTROL_CMD_INVALID: + return SPA_RESULT_INVALID_ARGUMENTS; + } + return SPA_RESULT_OK; +} + +SpaResult +spa_connection_flush (SpaConnection *conn) +{ + ssize_t len; + struct msghdr msg = {0}; + struct iovec iov[1]; + struct cmsghdr *cmsg; + char cmsgbuf[CMSG_SPACE (MAX_FDS * sizeof (int))]; + int *cm, i, fds_len; + + if (conn == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + if (conn->out.buffer_size == 0) + return SPA_RESULT_OK; + + fds_len = conn->out.n_fds * sizeof (int); + + iov[0].iov_base = conn->out.buffer_data; + iov[0].iov_len = conn->out.buffer_size; + msg.msg_iov = iov; + msg.msg_iovlen = 1; + + if (conn->out.n_fds > 0) { + msg.msg_control = cmsgbuf; + msg.msg_controllen = CMSG_SPACE (fds_len); + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN (fds_len); + cm = (int*)CMSG_DATA (cmsg); + for (i = 0; i < conn->out.n_fds; i++) + cm[i] = conn->out.fds[i] > 0 ? conn->out.fds[i] : -conn->out.fds[i]; + msg.msg_controllen = cmsg->cmsg_len; + } else { + msg.msg_control = NULL; + msg.msg_controllen = 0; + } + + while (true) { + len = sendmsg (conn->fd, &msg, 0); + if (len < 0) { + if (errno == EINTR) + continue; + else + goto send_error; + } + break; + } + conn->out.buffer_size -= len; + conn->out.n_fds = 0; + + SPA_DEBUG_CONTROL ("connection %p: %d written %zd bytes and %u fds\n", conn, conn->fd, len, conn->out.n_fds); + + return SPA_RESULT_OK; + + /* ERRORS */ +send_error: + { + fprintf (stderr, "could not sendmsg: %s\n", strerror (errno)); + return SPA_RESULT_ERROR; + } +} + +SpaResult +spa_connection_clear (SpaConnection *conn) +{ + unsigned int i; + + if (conn == NULL) + return SPA_RESULT_INVALID_ARGUMENTS; + + for (i = 0; i < conn->out.n_fds; i++) { + if (conn->out.fds[i] > 0) { + if (close (conn->out.fds[i]) < 0) + perror ("close"); + } + } + conn->out.n_fds = 0; + conn->out.buffer_size = 0; + for (i = 0; i < conn->in.n_fds; i++) { + if (conn->in.fds[i] > 0) { + if (close (conn->in.fds[i]) < 0) + perror ("close"); + } + } + conn->in.n_fds = 0; + conn->in.buffer_size = 0; + + return SPA_RESULT_OK; +} diff --git a/pinos/client/control.h b/pinos/client/connection.h similarity index 61% rename from pinos/client/control.h rename to pinos/client/connection.h index 3b834908d..e4a590403 100644 --- a/pinos/client/control.h +++ b/pinos/client/connection.h @@ -24,33 +24,13 @@ extern "C" { #endif -typedef struct _SpaControl SpaControl; -typedef struct _SpaControlIter SpaControlIter; -typedef struct _SpaControlBuilder SpaControlBuilder; - -#define SPA_CONTROL_VERSION 0 - #include #include #include #include #include -struct _SpaControl { - size_t x[16]; -}; - -SpaResult spa_control_init_data (SpaControl *control, - void *data, - size_t size, - int *fds, - unsigned int n_fds); - -SpaResult spa_control_clear (SpaControl *control); - -int spa_control_get_fd (SpaControl *control, - unsigned int index, - bool close); +typedef struct _SpaConnection SpaConnection; typedef enum { SPA_CONTROL_CMD_INVALID = 0, @@ -200,63 +180,27 @@ typedef struct { SpaNodeEvent *event; } SpaControlCmdNodeEvent; -struct _SpaControlIter { - /*< private >*/ - size_t x[16]; -}; -SpaResult spa_control_iter_init (SpaControlIter *iter, - SpaControl *control); +SpaConnection * spa_connection_new (int fd); +void spa_connection_free (SpaConnection *conn); -SpaResult spa_control_iter_next (SpaControlIter *iter); -SpaResult spa_control_iter_end (SpaControlIter *iter); +SpaResult spa_connection_has_next (SpaConnection *conn); +SpaControlCmd spa_connection_get_cmd (SpaConnection *conn); +SpaResult spa_connection_parse_cmd (SpaConnection *conn, + void *command); +int spa_connection_get_fd (SpaConnection *conn, + unsigned int index, + bool close); -SpaControlCmd spa_control_iter_get_cmd (SpaControlIter *iter); -void * spa_control_iter_get_data (SpaControlIter *iter, - size_t *size); -SpaResult spa_control_iter_set_data (SpaControlIter *iter, - void *data, - size_t size); +int spa_connection_add_fd (SpaConnection *conn, + int fd, + bool close); +SpaResult spa_connection_add_cmd (SpaConnection *conn, + SpaControlCmd cmd, + void *command); -SpaResult spa_control_iter_parse_cmd (SpaControlIter *iter, - void *command); - -/** - * SpaControlBuilder: - */ -struct _SpaControlBuilder { - /*< private >*/ - size_t x[16]; -}; - -SpaResult spa_control_builder_init_into (SpaControlBuilder *builder, - void *data, - size_t max_data, - int *fds, - unsigned int max_fds); -#define spa_control_builder_init(b) spa_control_builder_init_into(b, NULL, 0, NULL, 0); - -SpaResult spa_control_builder_clear (SpaControlBuilder *builder); -SpaResult spa_control_builder_end (SpaControlBuilder *builder, - SpaControl *control); - -int spa_control_builder_add_fd (SpaControlBuilder *builder, - int fd, - bool close); - -SpaResult spa_control_builder_add_cmd (SpaControlBuilder *builder, - SpaControlCmd cmd, - void *command); - -/* IO */ -SpaResult spa_control_read (SpaControl *control, - int fd, - void *data, - size_t max_data, - int *fds, - unsigned int max_fds); -SpaResult spa_control_write (SpaControl *control, - int fd); +SpaResult spa_connection_flush (SpaConnection *conn); +SpaResult spa_connection_clear (SpaConnection *conn); #ifdef __cplusplus } /* extern "C" */ diff --git a/pinos/client/control.c b/pinos/client/control.c deleted file mode 100644 index dcf25ad68..000000000 --- a/pinos/client/control.c +++ /dev/null @@ -1,1147 +0,0 @@ -/* Simple Plugin API - * Copyright (C) 2016 Wim Taymans - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, - * Boston, MA 02110-1301, USA. - */ - -#include -#include -#include -#include -#include -#include -#include - -#include "control.h" -#include "serialize.h" - -#if 0 -#define SPA_DEBUG_CONTROL(format,args...) fprintf(stderr,format,##args) -#else -#define SPA_DEBUG_CONTROL(format,args...) -#endif - -typedef struct { - void *data; - size_t size; - size_t max_size; - void *free_data; - int *fds; - int n_fds; - int max_fds; - void *free_fds; - size_t magic; -} SpaStackControl; - -#define SSC(c) ((SpaStackControl *) (c)) -#define SSC_MAGIC ((size_t) 5493683301u) -#define is_valid_control(c) (c != NULL && \ - SSC(c)->magic == SSC_MAGIC) - -/** - * spa_control_init_data: - * @control: a #SpaControl - * @data: data - * @size: size of @data - * @fds: file descriptors - * @n_fds: number of file descriptors - * - * Initialize @control with @data and @size and @fds and @n_fds. - * The memory pointer to by @data and @fds becomes property of @control - * and should not be freed or modified until all references to the control - * are gone. - */ -SpaResult -spa_control_init_data (SpaControl *control, - void *data, - size_t size, - int *fds, - unsigned int n_fds) -{ - SpaStackControl *sc = SSC (control); - - SPA_DEBUG_CONTROL ("control %p: init", control); - - sc->magic = SSC_MAGIC; - sc->data = data; - sc->size = size; - sc->max_size = size; - sc->free_data = NULL; - sc->fds = fds; - sc->n_fds = n_fds; - sc->max_fds = n_fds; - sc->free_fds = NULL; - - return SPA_RESULT_OK; -} - -/** - * spa_control_get_fd: - * @control: a #SpaControl - * @index: an index - * @steal: steal the fd - * - * Get the file descriptor at @index in @control. - * - * Returns: a file descriptor at @index in @control. The file descriptor - * is not duplicated in any way. -1 is returned on error. - */ -int -spa_control_get_fd (SpaControl *control, - unsigned int index, - bool close) -{ - SpaStackControl *sc = SSC (control); - int fd; - - if (!is_valid_control (control)) - return -1; - - if (sc->fds == NULL || sc->n_fds < index) - return -1; - - fd = sc->fds[index]; - if (fd < 0) - fd = -fd; - sc->fds[index] = close ? fd : -fd; - - return fd; -} - -SpaResult -spa_control_clear (SpaControl *control) -{ - SpaStackControl *sc = SSC (control); - int i; - - if (!is_valid_control (control)) - return SPA_RESULT_INVALID_ARGUMENTS; - - sc->magic = 0; - free (sc->free_data); - for (i = 0; i < sc->n_fds; i++) { - if (sc->fds[i] > 0) { - if (close (sc->fds[i]) < 0) - perror ("close"); - } - } - free (sc->free_fds); - sc->n_fds = 0; - - return SPA_RESULT_OK; -} - - -/** - * SpaControlIter: - * - * #SpaControlIter is an opaque data structure and can only be accessed - * using the following functions. - */ -struct stack_iter { - size_t magic; - SpaStackControl *control; - size_t offset; - - SpaControlCmd cmd; - size_t size; - void *data; -}; - -#define SCSI(i) ((struct stack_iter *) (i)) -#define SCSI_MAGIC ((size_t) 6739527471u) -#define is_valid_iter(i) (i != NULL && \ - SCSI(i)->magic == SCSI_MAGIC) - -/** - * spa_control_iter_init: - * @iter: a #SpaControlIter - * @control: a #SpaControl - * - * Initialize @iter to iterate the packets in @control. - */ -SpaResult -spa_control_iter_init (SpaControlIter *iter, - SpaControl *control) -{ - struct stack_iter *si = SCSI (iter); - - if (iter == NULL || !is_valid_control (control)) - return SPA_RESULT_INVALID_ARGUMENTS; - - si->magic = SCSI_MAGIC; - si->control = SSC (control); - si->offset = 0; - si->cmd = SPA_CONTROL_CMD_INVALID; - si->size = 0; - si->data = NULL; - - return SPA_RESULT_OK; -} - -static bool -read_length (uint8_t * data, unsigned int size, size_t * length, size_t * skip) -{ - size_t len, offset; - uint8_t b; - - /* start reading the length, we need this to skip to the data later */ - len = offset = 0; - do { - if (offset >= size) - return false; - - b = data[offset++]; - len = (len << 7) | (b & 0x7f); - } while (b & 0x80); - - /* check remaining control size */ - if (size - offset < len) - return false; - - *length = len; - *skip = offset; - - return true; -} - -/** - * spa_control_iter_next: - * @iter: a #SpaControlIter - * - * Move to the next packet in @iter. - * - * Returns: %SPA_RESULT_OK if more packets are available. - */ -SpaResult -spa_control_iter_next (SpaControlIter *iter) -{ - struct stack_iter *si = SCSI (iter); - size_t len, size, skip; - uint8_t *data; - - if (!is_valid_iter (iter)) - return SPA_RESULT_INVALID_ARGUMENTS; - - /* move to next packet */ - si->offset += si->size; - - /* now read packet */ - data = si->control->data; - size = si->control->size; - - if (si->offset >= size) - return SPA_RESULT_ERROR; - - data += si->offset; - size -= si->offset; - - if (size < 1) - return SPA_RESULT_ERROR; - - si->cmd = *data; - - data++; - size--; - - if (!read_length (data, size, &len, &skip)) - return SPA_RESULT_ERROR; - - si->size = len; - si->data = data + skip; - si->offset += 1 + skip; - - return SPA_RESULT_OK; -} - -/** - * spa_control_iter_done: - * @iter: a #SpaControlIter - * - * End iterations on @iter. - */ -SpaResult -spa_control_iter_end (SpaControlIter *iter) -{ - struct stack_iter *si = SCSI (iter); - - if (!is_valid_iter (iter)) - return SPA_RESULT_INVALID_ARGUMENTS; - - si->magic = 0; - - return SPA_RESULT_OK; -} - -SpaControlCmd -spa_control_iter_get_cmd (SpaControlIter *iter) -{ - struct stack_iter *si = SCSI (iter); - - if (!is_valid_iter (iter)) - return SPA_CONTROL_CMD_INVALID; - - return si->cmd; -} - -void * -spa_control_iter_get_data (SpaControlIter *iter, size_t *size) -{ - struct stack_iter *si = SCSI (iter); - - if (!is_valid_iter (iter)) - return NULL; - - if (size) - *size = si->size; - - return si->data; -} - -static void -iter_parse_node_update (struct stack_iter *si, SpaControlCmdNodeUpdate *nu) -{ - memcpy (nu, si->data, sizeof (SpaControlCmdNodeUpdate)); - if (nu->props) - nu->props = spa_serialize_props_deserialize (si->data, SPA_PTR_TO_INT (nu->props)); -} - -static void -iter_parse_port_update (struct stack_iter *si, SpaControlCmdPortUpdate *pu) -{ - void *p; - unsigned int i; - - memcpy (pu, si->data, sizeof (SpaControlCmdPortUpdate)); - - p = si->data; - - if (pu->possible_formats) - pu->possible_formats = SPA_MEMBER (p, - SPA_PTR_TO_INT (pu->possible_formats), SpaFormat *); - for (i = 0; i < pu->n_possible_formats; i++) { - if (pu->possible_formats[i]) { - pu->possible_formats[i] = spa_serialize_format_deserialize (p, - SPA_PTR_TO_INT (pu->possible_formats[i])); - } - } - if (pu->format) - pu->format = spa_serialize_format_deserialize (p, SPA_PTR_TO_INT (pu->format)); - if (pu->props) - pu->props = spa_serialize_props_deserialize (p, SPA_PTR_TO_INT (pu->props)); - if (pu->info) - pu->info = spa_serialize_port_info_deserialize (p, SPA_PTR_TO_INT (pu->info)); -} - -static void -iter_parse_set_format (struct stack_iter *si, SpaControlCmdSetFormat *cmd) -{ - memcpy (cmd, si->data, sizeof (SpaControlCmdSetFormat)); - if (cmd->format) - cmd->format = spa_serialize_format_deserialize (si->data, SPA_PTR_TO_INT (cmd->format)); -} - -static void -iter_parse_use_buffers (struct stack_iter *si, SpaControlCmdUseBuffers *cmd) -{ - void *p; - - p = si->data; - memcpy (cmd, p, sizeof (SpaControlCmdUseBuffers)); - if (cmd->buffers) - cmd->buffers = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->buffers), SpaControlMemRef); -} - -static void -iter_parse_node_event (struct stack_iter *si, SpaControlCmdNodeEvent *cmd) -{ - void *p = si->data; - memcpy (cmd, p, sizeof (SpaControlCmdNodeEvent)); - if (cmd->event) - cmd->event = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event), SpaNodeEvent); - if (cmd->event->data) - cmd->event->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->event->data), void); -} - -static void -iter_parse_node_command (struct stack_iter *si, SpaControlCmdNodeCommand *cmd) -{ - void *p = si->data; - memcpy (cmd, p, sizeof (SpaControlCmdNodeCommand)); - if (cmd->command) - cmd->command = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command), SpaNodeCommand); - if (cmd->command->data) - cmd->command->data = SPA_MEMBER (p, SPA_PTR_TO_INT (cmd->command->data), void); -} - -SpaResult -spa_control_iter_set_data (SpaControlIter *iter, - void *data, - size_t size) -{ - struct stack_iter *si = SCSI (iter); - - if (!is_valid_iter (iter)) - return SPA_RESULT_INVALID_ARGUMENTS; - - if (si->size > size) - return SPA_RESULT_INVALID_ARGUMENTS; - - si->size = size; - si->data = data; - - return SPA_RESULT_OK; -} - -SpaResult -spa_control_iter_parse_cmd (SpaControlIter *iter, - void *command) -{ - struct stack_iter *si = SCSI (iter); - SpaResult res = SPA_RESULT_OK; - - if (!is_valid_iter (iter)) - return SPA_RESULT_INVALID_ARGUMENTS; - - switch (si->cmd) { - /* C -> S */ - case SPA_CONTROL_CMD_NODE_UPDATE: - iter_parse_node_update (si, command); - break; - - case SPA_CONTROL_CMD_PORT_UPDATE: - iter_parse_port_update (si, command); - break; - - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: - fprintf (stderr, "implement iter of %d\n", si->cmd); - break; - - case SPA_CONTROL_CMD_NODE_STATE_CHANGE: - if (si->size < sizeof (SpaControlCmdNodeStateChange)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdNodeStateChange)); - break; - - /* S -> C */ - case SPA_CONTROL_CMD_ADD_PORT: - if (si->size < sizeof (SpaControlCmdAddPort)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdAddPort)); - break; - - case SPA_CONTROL_CMD_REMOVE_PORT: - if (si->size < sizeof (SpaControlCmdRemovePort)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdRemovePort)); - break; - - case SPA_CONTROL_CMD_SET_FORMAT: - iter_parse_set_format (si, command); - break; - - case SPA_CONTROL_CMD_SET_PROPERTY: - fprintf (stderr, "implement iter of %d\n", si->cmd); - break; - - /* bidirectional */ - case SPA_CONTROL_CMD_ADD_MEM: - if (si->size < sizeof (SpaControlCmdAddMem)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdAddMem)); - break; - - case SPA_CONTROL_CMD_REMOVE_MEM: - if (si->size < sizeof (SpaControlCmdRemoveMem)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdRemoveMem)); - break; - - case SPA_CONTROL_CMD_USE_BUFFERS: - iter_parse_use_buffers (si, command); - break; - - case SPA_CONTROL_CMD_PROCESS_BUFFER: - if (si->size < sizeof (SpaControlCmdProcessBuffer)) - return SPA_RESULT_ERROR; - memcpy (command, si->data, sizeof (SpaControlCmdProcessBuffer)); - break; - - case SPA_CONTROL_CMD_NODE_EVENT: - iter_parse_node_event (si, command); - break; - - case SPA_CONTROL_CMD_NODE_COMMAND: - iter_parse_node_command (si, command); - break; - - case SPA_CONTROL_CMD_INVALID: - return SPA_RESULT_ERROR; - } - return res; -} - -struct stack_builder { - size_t magic; - - void *data; - SpaStackControl control; - - SpaControlCmd cmd; - size_t offset; -}; - -#define SCSB(b) ((struct stack_builder *) (b)) -#define SCSB_MAGIC ((size_t) 8103647428u) -#define is_valid_builder(b) (b != NULL && \ - SCSB(b)->magic == SCSB_MAGIC) - - -/** - * spa_control_builder_init_into: - * @builder: a #SpaControlBuilder - * @data: data to build into or %NULL to allocate - * @max_data: allocated size of @data - * @fds: memory for fds - * @max_fds: maximum number of fds in @fds - * - * Initialize a stack allocated @builder - */ -SpaResult -spa_control_builder_init_into (SpaControlBuilder *builder, - void *data, - size_t max_data, - int *fds, - unsigned int max_fds) -{ - struct stack_builder *sb = SCSB (builder); - - if (builder == NULL) - return SPA_RESULT_INVALID_ARGUMENTS; - - sb->magic = SCSB_MAGIC; - - if (max_data < 8 || data == NULL) { - sb->control.max_size = 128; - sb->control.data = malloc (sb->control.max_size); - sb->control.free_data = sb->control.data; - fprintf (stderr, "builder %p: alloc control memory %zd -> %zd\n", - builder, max_data, sb->control.max_size); - } else { - sb->control.max_size = max_data; - sb->control.data = data; - sb->control.free_data = NULL; - } - sb->control.size = 0; - - sb->control.fds = fds; - sb->control.max_fds = max_fds; - sb->control.n_fds = 0; - sb->control.free_fds = NULL; - - sb->cmd = 0; - sb->offset = 0; - - return SPA_RESULT_OK; -} - -/** - * spa_control_builder_clear: - * @builder: a #SpaControlBuilder - * - * Clear the memory used by @builder. This can be used to abort building the - * control. - * - * @builder becomes invalid after this function and can be reused with - * spa_control_builder_init() - */ -SpaResult -spa_control_builder_clear (SpaControlBuilder *builder) -{ - struct stack_builder *sb = SCSB (builder); - - if (!is_valid_builder (builder)) - return SPA_RESULT_INVALID_ARGUMENTS; - - sb->magic = 0; - free (sb->control.free_data); - free (sb->control.free_fds); - - return SPA_RESULT_OK; -} - -/** - * spa_control_builder_end: - * @builder: a #SpaControlBuilder - * @control: a #SpaControl - * - * Ends the building process and fills @control with the constructed - * #SpaControl. - * - * @builder becomes invalid after this function and can be reused with - * spa_control_builder_init() - */ -SpaResult -spa_control_builder_end (SpaControlBuilder *builder, - SpaControl *control) -{ - struct stack_builder *sb = SCSB (builder); - SpaStackControl *sc = SSC (control); - - if (!is_valid_builder (builder) || control == NULL) - return SPA_RESULT_INVALID_ARGUMENTS; - - sb->magic = 0; - - sc->magic = SSC_MAGIC; - sc->data = sb->control.data; - sc->size = sb->control.size; - sc->max_size = sb->control.max_size; - sc->free_data = sb->control.free_data; - - sc->fds = sb->control.fds; - sc->n_fds = sb->control.n_fds; - sc->max_fds = sb->control.max_fds; - sc->free_fds = sb->control.free_fds; - - return SPA_RESULT_OK; -} - -/** - * spa_control_builder_add_fd: - * @builder: a #SpaControlBuilder - * @fd: a valid fd - * - * Add the file descriptor @fd to @builder. - * - * Returns: the index of the file descriptor in @builder. - */ -int -spa_control_builder_add_fd (SpaControlBuilder *builder, - int fd, - bool close) -{ - struct stack_builder *sb = SCSB (builder); - int index, i; - - if (!is_valid_builder (builder) || fd < 0) - return -1; - - for (i = 0; i < sb->control.n_fds; i++) { - if (sb->control.fds[i] == fd || sb->control.fds[i] == -fd) - return i; - } - - if (sb->control.n_fds >= sb->control.max_fds) { - int new_size = sb->control.max_fds + 8; - fprintf (stderr, "builder %p: realloc control fds %d -> %d\n", - builder, sb->control.max_fds, new_size); - sb->control.max_fds = new_size; - if (sb->control.free_fds == NULL) { - sb->control.free_fds = malloc (new_size * sizeof (int)); - memcpy (sb->control.free_fds, sb->control.fds, sb->control.n_fds * sizeof (int)); - } else { - sb->control.free_fds = realloc (sb->control.free_fds, new_size * sizeof (int)); - } - sb->control.fds = sb->control.free_fds; - } - index = sb->control.n_fds; - sb->control.fds[index] = close ? fd : -fd; - sb->control.n_fds++; - - return index; -} -#define MAX(a,b) ((a) > (b) ? (a) : (b)) - -static void * -builder_ensure_size (struct stack_builder *sb, size_t size) -{ - if (sb->control.size + size > sb->control.max_size) { - size_t new_size = sb->control.size + MAX (size, 1024); - fprintf (stderr, "builder %p: realloc control memory %zd -> %zd\n", - sb, sb->control.max_size, new_size); - sb->control.max_size = new_size; - if (sb->control.free_data == NULL) { - sb->control.free_data = malloc (new_size); - memcpy (sb->control.free_data, sb->control.data, sb->control.size); - } else { - sb->control.free_data = realloc (sb->control.free_data, new_size); - } - sb->control.data = sb->control.free_data; - } - return (uint8_t *) sb->control.data + sb->control.size; -} - -static void * -builder_add_cmd (struct stack_builder *sb, SpaControlCmd cmd, size_t size) -{ - uint8_t *p; - unsigned int plen; - - plen = 1; - while (size >> (7 * plen)) - plen++; - - /* 1 for cmd, plen for size and size for payload */ - p = builder_ensure_size (sb, 1 + plen + size); - - sb->cmd = cmd; - sb->offset = sb->control.size; - sb->control.size += 1 + plen + size; - - *p++ = cmd; - /* write length */ - while (plen) { - plen--; - *p++ = ((plen > 0) ? 0x80 : 0) | ((size >> (7 * plen)) & 0x7f); - } - return p; -} - -static void -builder_add_node_update (struct stack_builder *sb, SpaControlCmdNodeUpdate *nu) -{ - size_t len; - void *p; - SpaControlCmdNodeUpdate *d; - - /* calc len */ - len = sizeof (SpaControlCmdNodeUpdate); - len += spa_serialize_props_get_size (nu->props); - - p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_UPDATE, len); - memcpy (p, nu, sizeof (SpaControlCmdNodeUpdate)); - d = p; - - p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeUpdate), void); - if (nu->props) { - len = spa_serialize_props_serialize (p, nu->props); - d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - } else { - d->props = 0; - } -} - -static void -builder_add_port_update (struct stack_builder *sb, SpaControlCmdPortUpdate *pu) -{ - size_t len; - void *p; - int i; - SpaFormat **bfa; - SpaControlCmdPortUpdate *d; - - /* calc len */ - len = sizeof (SpaControlCmdPortUpdate); - len += pu->n_possible_formats * sizeof (SpaFormat *); - for (i = 0; i < pu->n_possible_formats; i++) { - len += spa_serialize_format_get_size (pu->possible_formats[i]); - } - len += spa_serialize_format_get_size (pu->format); - len += spa_serialize_props_get_size (pu->props); - if (pu->info) - len += spa_serialize_port_info_get_size (pu->info); - - p = builder_add_cmd (sb, SPA_CONTROL_CMD_PORT_UPDATE, len); - memcpy (p, pu, sizeof (SpaControlCmdPortUpdate)); - d = p; - - p = SPA_MEMBER (d, sizeof (SpaControlCmdPortUpdate), void); - bfa = p; - if (pu->n_possible_formats) - d->possible_formats = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - else - d->possible_formats = 0; - - p = SPA_MEMBER (p, sizeof (SpaFormat*) * pu->n_possible_formats, void); - - for (i = 0; i < pu->n_possible_formats; i++) { - len = spa_serialize_format_serialize (p, pu->possible_formats[i]); - bfa[i] = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - p = SPA_MEMBER (p, len, void); - } - if (pu->format) { - len = spa_serialize_format_serialize (p, pu->format); - d->format = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - p = SPA_MEMBER (p, len, void); - } else { - d->format = 0; - } - if (pu->props) { - len = spa_serialize_props_serialize (p, pu->props); - d->props = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - p = SPA_MEMBER (p, len, void); - } else { - d->props = 0; - } - if (pu->info) { - len = spa_serialize_port_info_serialize (p, pu->info); - d->info = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - p = SPA_MEMBER (p, len, void); - } else { - d->info = 0; - } -} - -static void -builder_add_set_format (struct stack_builder *sb, SpaControlCmdSetFormat *sf) -{ - size_t len; - void *p; - - /* calculate length */ - /* port_id + format + mask */ - len = sizeof (SpaControlCmdSetFormat) + spa_serialize_format_get_size (sf->format); - p = builder_add_cmd (sb, SPA_CONTROL_CMD_SET_FORMAT, len); - memcpy (p, sf, sizeof (SpaControlCmdSetFormat)); - sf = p; - - p = SPA_MEMBER (sf, sizeof (SpaControlCmdSetFormat), void); - if (sf->format) { - len = spa_serialize_format_serialize (p, sf->format); - sf->format = SPA_INT_TO_PTR (SPA_PTRDIFF (p, sf)); - } else - sf->format = 0; -} - -static void -builder_add_use_buffers (struct stack_builder *sb, SpaControlCmdUseBuffers *ub) -{ - size_t len; - int i; - SpaControlCmdUseBuffers *d; - SpaControlMemRef *mr; - - /* calculate length */ - len = sizeof (SpaControlCmdUseBuffers); - len += ub->n_buffers * sizeof (SpaControlMemRef); - - d = builder_add_cmd (sb, SPA_CONTROL_CMD_USE_BUFFERS, len); - memcpy (d, ub, sizeof (SpaControlCmdUseBuffers)); - - mr = SPA_MEMBER (d, sizeof (SpaControlCmdUseBuffers), void); - - if (d->n_buffers) - d->buffers = SPA_INT_TO_PTR (SPA_PTRDIFF (mr, d)); - else - d->buffers = 0; - - for (i = 0; i < ub->n_buffers; i++) - memcpy (&mr[i], &ub->buffers[i], sizeof (SpaControlMemRef)); -} - -static void -builder_add_node_event (struct stack_builder *sb, SpaControlCmdNodeEvent *ev) -{ - size_t len; - void *p; - SpaControlCmdNodeEvent *d; - SpaNodeEvent *ne; - - /* calculate length */ - len = sizeof (SpaControlCmdNodeEvent); - len += sizeof (SpaNodeEvent); - len += ev->event->size; - - p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_EVENT, len); - memcpy (p, ev, sizeof (SpaControlCmdNodeEvent)); - d = p; - - p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeEvent), void); - d->event = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - - ne = p; - memcpy (p, ev->event, sizeof (SpaNodeEvent)); - p = SPA_MEMBER (p, sizeof (SpaNodeEvent), void); - ne->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - memcpy (p, ev->event->data, ev->event->size); -} - - -static void -builder_add_node_command (struct stack_builder *sb, SpaControlCmdNodeCommand *cm) -{ - size_t len; - void *p; - SpaControlCmdNodeCommand *d; - SpaNodeCommand *nc; - - /* calculate length */ - len = sizeof (SpaControlCmdNodeCommand); - len += sizeof (SpaNodeCommand); - len += cm->command->size; - - p = builder_add_cmd (sb, SPA_CONTROL_CMD_NODE_COMMAND, len); - memcpy (p, cm, sizeof (SpaControlCmdNodeCommand)); - d = p; - - p = SPA_MEMBER (d, sizeof (SpaControlCmdNodeCommand), void); - d->command = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - - nc = p; - memcpy (p, cm->command, sizeof (SpaNodeCommand)); - p = SPA_MEMBER (p, sizeof (SpaNodeCommand), void); - nc->data = SPA_INT_TO_PTR (SPA_PTRDIFF (p, d)); - memcpy (p, cm->command->data, cm->command->size); -} - -/** - * spa_control_builder_add_cmd: - * @builder: a #SpaControlBuilder - * @cmd: a #SpaControlCmd - * @command: a command - * - * Add a @cmd to @builder with data from @command. - * - * Returns: %TRUE on success. - */ -SpaResult -spa_control_builder_add_cmd (SpaControlBuilder *builder, - SpaControlCmd cmd, - void *command) -{ - struct stack_builder *sb = SCSB (builder); - void *p; - SpaResult res = SPA_RESULT_OK; - - if (!is_valid_builder (builder)) - return SPA_RESULT_INVALID_ARGUMENTS; - - switch (cmd) { - /* C -> S */ - case SPA_CONTROL_CMD_NODE_UPDATE: - builder_add_node_update (sb, command); - break; - - case SPA_CONTROL_CMD_PORT_UPDATE: - builder_add_port_update (sb, command); - break; - - case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: - p = builder_add_cmd (sb, cmd, 0); - break; - - case SPA_CONTROL_CMD_NODE_STATE_CHANGE: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdNodeStateChange)); - memcpy (p, command, sizeof (SpaControlCmdNodeStateChange)); - break; - - /* S -> C */ - case SPA_CONTROL_CMD_ADD_PORT: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddPort)); - memcpy (p, command, sizeof (SpaControlCmdAddPort)); - break; - - case SPA_CONTROL_CMD_REMOVE_PORT: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdRemovePort)); - memcpy (p, command, sizeof (SpaControlCmdRemovePort)); - break; - - case SPA_CONTROL_CMD_SET_FORMAT: - builder_add_set_format (sb, command); - break; - - case SPA_CONTROL_CMD_SET_PROPERTY: - fprintf (stderr, "implement builder of %d\n", cmd); - break; - - /* bidirectional */ - case SPA_CONTROL_CMD_ADD_MEM: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdAddMem)); - memcpy (p, command, sizeof (SpaControlCmdAddMem)); - break; - - case SPA_CONTROL_CMD_REMOVE_MEM: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdRemoveMem)); - memcpy (p, command, sizeof (SpaControlCmdRemoveMem)); - break; - - case SPA_CONTROL_CMD_USE_BUFFERS: - builder_add_use_buffers (sb, command); - break; - - case SPA_CONTROL_CMD_PROCESS_BUFFER: - p = builder_add_cmd (sb, cmd, sizeof (SpaControlCmdProcessBuffer)); - memcpy (p, command, sizeof (SpaControlCmdProcessBuffer)); - break; - - case SPA_CONTROL_CMD_NODE_EVENT: - builder_add_node_event (sb, command); - break; - - case SPA_CONTROL_CMD_NODE_COMMAND: - builder_add_node_command (sb, command); - break; - - case SPA_CONTROL_CMD_INVALID: - return SPA_RESULT_INVALID_ARGUMENTS; - } - return res; -} - - -SpaResult -spa_control_read (SpaControl *control, - int fd, - void *data, - size_t max_data, - int *fds, - unsigned int max_fds) -{ - ssize_t len; - SpaStackControl *sc = (SpaStackControl *) control; - struct cmsghdr *cmsg; - struct msghdr msg = {0}; - struct iovec iov[1]; - char cmsgbuf[CMSG_SPACE (max_fds * sizeof (int))]; - - sc->data = data; - sc->max_size = max_data; - sc->size = 0; - sc->free_data = NULL; - sc->fds = fds; - sc->max_fds = max_fds; - sc->n_fds = 0; - sc->free_fds = NULL; - - /* read header and control messages first */ - iov[0].iov_base = sc->data; - iov[0].iov_len = sc->max_size; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - msg.msg_control = cmsgbuf; - msg.msg_controllen = sizeof (cmsgbuf); - msg.msg_flags = MSG_CMSG_CLOEXEC; - - while (true) { - len = recvmsg (fd, &msg, msg.msg_flags); - if (len < 0) { - if (errno == EINTR) - continue; - else - goto recv_error; - } - break; - } - - if (len < 4) - return SPA_RESULT_ERROR; - - sc->size = len; -#if 0 - if (sc->max_size < need) { - fprintf (stderr, "control: realloc receive memory %zd -> %zd\n", sc->max_size, need); - sc->max_size = need; - sc->free_data = realloc (sc->free_data, need); - hdr = sc->data = sc->free_data; - } - - if (hdr->length > 0) { - /* read data */ - while (true) { - len = recv (fd, (uint8_t *)sc->data + sizeof (SpaStackHeader), hdr->length, 0); - if (len < 0) { - if (errno == EINTR) - continue; - else - goto recv_error; - } - break; - } - if (len != hdr->length) - goto wrong_length; - } -#endif - - /* handle control messages */ - for (cmsg = CMSG_FIRSTHDR (&msg); cmsg != NULL; cmsg = CMSG_NXTHDR (&msg, cmsg)) { - if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) - continue; - - sc->n_fds = (cmsg->cmsg_len - ((char *)CMSG_DATA (cmsg) - (char *)cmsg)) / sizeof (int); - memcpy (sc->fds, CMSG_DATA (cmsg), sc->n_fds * sizeof (int)); - } - sc->magic = SSC_MAGIC; - - SPA_DEBUG_CONTROL ("control %p: %d read %zd bytes and %d fds\n", sc, fd, len, sc->n_fds); - - return SPA_RESULT_OK; - - /* ERRORS */ -recv_error: - { - fprintf (stderr, "could not recvmsg: %s\n", strerror (errno)); - return SPA_RESULT_ERROR; - } -} - - -SpaResult -spa_control_write (SpaControl *control, - int fd) -{ - SpaStackControl *sc = (SpaStackControl *) control; - ssize_t len; - struct msghdr msg = {0}; - struct iovec iov[1]; - struct cmsghdr *cmsg; - char cmsgbuf[CMSG_SPACE (sc->n_fds * sizeof (int))]; - int fds_len = sc->n_fds * sizeof (int), *cm, i; - - iov[0].iov_base = sc->data; - iov[0].iov_len = sc->size; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - if (sc->n_fds > 0) { - msg.msg_control = cmsgbuf; - msg.msg_controllen = CMSG_SPACE (fds_len); - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN (fds_len); - cm = (int*)CMSG_DATA (cmsg); - for (i = 0; i < sc->n_fds; i++) - cm[i] = sc->fds[i] > 0 ? sc->fds[i] : -sc->fds[i]; - msg.msg_controllen = cmsg->cmsg_len; - } else { - msg.msg_control = NULL; - msg.msg_controllen = 0; - } - - while (true) { - len = sendmsg (fd, &msg, 0); - if (len < 0) { - if (errno == EINTR) - continue; - else - goto send_error; - } - break; - } - if (len != (ssize_t) sc->size) - return SPA_RESULT_ERROR; - - SPA_DEBUG_CONTROL ("control %p: %d written %zd bytes and %d fds\n", sc, fd, len, sc->n_fds); - - return SPA_RESULT_OK; - - /* ERRORS */ -send_error: - { - fprintf (stderr, "could not sendmsg: %s\n", strerror (errno)); - return SPA_RESULT_ERROR; - } -} diff --git a/pinos/client/meson.build b/pinos/client/meson.build index 03c7358dc..5474e7ac0 100644 --- a/pinos/client/meson.build +++ b/pinos/client/meson.build @@ -11,8 +11,8 @@ pinos_headers = [ ] pinos_sources = [ + 'connection.c', 'context.c', - 'control.c', 'format.c', 'introspect.c', 'mainloop.c', diff --git a/pinos/client/stream.c b/pinos/client/stream.c index ba20ac4fb..778ff620b 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -31,7 +31,7 @@ #include "pinos/dbus/org-pinos.h" #include "pinos/server/daemon.h" #include "pinos/client/pinos.h" -#include "pinos/client/control.h" +#include "pinos/client/connection.h" #include "pinos/client/context.h" #include "pinos/client/stream.h" #include "pinos/client/enumtypes.h" @@ -95,16 +95,11 @@ struct _PinosStreamPrivate GSource *rtsocket_source; int rtfd; + SpaConnection *conn; + SpaConnection *rtconn; + GSource *timeout_source; - SpaControl *control; - SpaControl recv_control; - guint8 recv_data[MAX_BUFFER_SIZE]; - int recv_fds[MAX_FDS]; - - guint8 send_data[MAX_BUFFER_SIZE]; - int send_fds[MAX_FDS]; - GArray *mem_ids; GArray *buffer_ids; gboolean in_order; @@ -616,19 +611,7 @@ pinos_stream_get_error (PinosStream *stream) } static void -control_builder_init (PinosStream *stream, SpaControlBuilder *builder) -{ - PinosStreamPrivate *priv = stream->priv; - - spa_control_builder_init_into (builder, - priv->send_data, - MAX_BUFFER_SIZE, - priv->send_fds, - MAX_FDS); -} - -static void -add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask) +add_node_update (PinosStream *stream, uint32_t change_mask) { PinosStreamPrivate *priv = stream->priv; SpaControlCmdNodeUpdate nu = { 0, }; @@ -639,23 +622,23 @@ add_node_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang if (change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS) nu.max_output_ports = priv->direction == SPA_DIRECTION_OUTPUT ? 1 : 0; nu.props = NULL; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_UPDATE, &nu); + spa_connection_add_cmd (priv->conn, SPA_CONTROL_CMD_NODE_UPDATE, &nu); } static void -add_state_change (PinosStream *stream, SpaControlBuilder *builder, SpaNodeState state) +add_state_change (PinosStream *stream, SpaNodeState state) { PinosStreamPrivate *priv = stream->priv; SpaControlCmdNodeStateChange sc; if (priv->node_state != state) { sc.state = priv->node_state = state; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_STATE_CHANGE, &sc); + spa_connection_add_cmd (priv->conn, SPA_CONTROL_CMD_NODE_STATE_CHANGE, &sc); } } static void -add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t change_mask) +add_port_update (PinosStream *stream, uint32_t change_mask) { PinosStreamPrivate *priv = stream->priv; SpaControlCmdPortUpdate pu = { 0, };; @@ -675,12 +658,13 @@ add_port_update (PinosStream *stream, SpaControlBuilder *builder, uint32_t chang pu.info = &priv->port_info; spa_debug_port_info (pu.info); } - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_PORT_UPDATE, &pu); + spa_connection_add_cmd (priv->conn, SPA_CONTROL_CMD_PORT_UPDATE, &pu); } static void -add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_id) +add_need_input (PinosStream *stream, uint32_t port_id) { + PinosStreamPrivate *priv = stream->priv; SpaControlCmdNodeEvent cne; SpaNodeEvent ne; SpaNodeEventNeedInput ni; @@ -690,29 +674,24 @@ add_need_input (PinosStream *stream, SpaControlBuilder *builder, uint32_t port_i ne.data = ∋ ne.size = sizeof (ni); ni.port_id = port_id; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void send_need_input (PinosStream *stream, uint32_t port_id) { PinosStreamPrivate *priv = stream->priv; - SpaControlBuilder builder; - SpaControl control; - control_builder_init (stream, &builder); - add_need_input (stream, &builder, port_id); - spa_control_builder_end (&builder, &control); + add_need_input (stream, port_id); - if (spa_control_write (&control, priv->rtfd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->rtconn) < 0) + g_warning ("stream %p: error writing connection", stream); } static void -add_request_clock_update (PinosStream *stream, SpaControlBuilder *builder) +add_request_clock_update (PinosStream *stream) { + PinosStreamPrivate *priv = stream->priv; SpaControlCmdNodeEvent cne; SpaNodeEvent ne; SpaNodeEventRequestClockUpdate rcu; @@ -724,15 +703,15 @@ add_request_clock_update (PinosStream *stream, SpaControlBuilder *builder) rcu.update_mask = SPA_NODE_EVENT_REQUEST_CLOCK_UPDATE_TIME; rcu.timestamp = 0; rcu.offset = 0; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_connection_add_cmd (priv->conn, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void add_async_complete (PinosStream *stream, - SpaControlBuilder *builder, uint32_t seq, SpaResult res) { + PinosStreamPrivate *priv = stream->priv; SpaControlCmdNodeEvent cne; SpaNodeEvent ne; SpaNodeEventAsyncComplete ac; @@ -743,90 +722,70 @@ add_async_complete (PinosStream *stream, ne.size = sizeof (ac); ac.seq = seq; ac.res = res; - spa_control_builder_add_cmd (builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_connection_add_cmd (priv->conn, SPA_CONTROL_CMD_NODE_EVENT, &cne); } static void send_reuse_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) { PinosStreamPrivate *priv = stream->priv; - SpaControlBuilder builder; - SpaControl control; SpaControlCmdNodeEvent cne; SpaNodeEvent ne; SpaNodeEventReuseBuffer rb; - guint8 buffer[128]; - spa_control_builder_init_into (&builder, buffer, sizeof (buffer), NULL, 0); cne.event = ≠ ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; ne.data = &rb; ne.size = sizeof (rb); rb.port_id = port_id; rb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); - spa_control_builder_end (&builder, &control); + spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne); - if (spa_control_write (&control, priv->rtfd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->rtconn) < 0) + g_warning ("stream %p: error writing connection", stream); } static void send_process_buffer (PinosStream *stream, uint32_t port_id, uint32_t buffer_id) { PinosStreamPrivate *priv = stream->priv; - SpaControlBuilder builder; - SpaControl control; SpaControlCmdProcessBuffer pb; SpaControlCmdNodeEvent cne; SpaNodeEvent ne; SpaNodeEventHaveOutput ho; - control_builder_init (stream, &builder); pb.direction = priv->direction; pb.port_id = port_id; pb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); + spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); cne.event = ≠ ne.type = SPA_NODE_EVENT_TYPE_HAVE_OUTPUT; ne.data = &ho; ne.size = sizeof (ho); ho.port_id = port_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); + spa_connection_add_cmd (priv->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne); - spa_control_builder_end (&builder, &control); - - if (spa_control_write (&control, priv->rtfd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->rtconn) < 0) + g_warning ("stream %p: error writing connection", stream); } static void do_node_init (PinosStream *stream) { PinosStreamPrivate *priv = stream->priv; - SpaControlBuilder builder; - SpaControl control; - control_builder_init (stream, &builder); - add_node_update (stream, &builder, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | - SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); + add_node_update (stream, SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS | + SPA_CONTROL_CMD_NODE_UPDATE_MAX_OUTPUTS); priv->port_info.flags = SPA_PORT_INFO_FLAG_CAN_USE_BUFFERS; - add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | - SPA_CONTROL_CMD_PORT_UPDATE_INFO); + add_port_update (stream, SPA_CONTROL_CMD_PORT_UPDATE_POSSIBLE_FORMATS | + SPA_CONTROL_CMD_PORT_UPDATE_INFO); - add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); - spa_control_builder_end (&builder, &control); + add_state_change (stream, SPA_NODE_STATE_CONFIGURE); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->conn) < 0) + g_warning ("stream %p: error writing connection", stream); } static MemId * @@ -938,41 +897,31 @@ handle_node_command (PinosStream *stream, break; case SPA_NODE_COMMAND_PAUSE: { - SpaControlBuilder builder; - SpaControl control; - g_debug ("stream %p: pause", stream); - control_builder_init (stream, &builder); - add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); - add_async_complete (stream, &builder, seq, SPA_RESULT_OK); - spa_control_builder_end (&builder, &control); + add_state_change (stream, SPA_NODE_STATE_PAUSED); + add_async_complete (stream, seq, SPA_RESULT_OK); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->conn) < 0) + g_warning ("stream %p: error writing connection", stream); stream_set_state (stream, PINOS_STREAM_STATE_READY, NULL); break; } case SPA_NODE_COMMAND_START: { - SpaControlBuilder builder; - SpaControl control; - g_debug ("stream %p: start", stream); - control_builder_init (stream, &builder); - if (priv->direction == SPA_DIRECTION_INPUT) - add_need_input (stream, &builder, priv->port_id); - add_state_change (stream, &builder, SPA_NODE_STATE_STREAMING); - add_async_complete (stream, &builder, seq, SPA_RESULT_OK); - spa_control_builder_end (&builder, &control); + add_state_change (stream, SPA_NODE_STATE_STREAMING); + add_async_complete (stream, seq, SPA_RESULT_OK); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); + if (spa_connection_flush (priv->conn) < 0) + g_warning ("stream %p: error writing connection", stream); - spa_control_clear (&control); + if (priv->direction == SPA_DIRECTION_INPUT) { + add_need_input (stream, priv->port_id); + if (spa_connection_flush (priv->rtconn) < 0) + g_warning ("stream %p: error writing connection", stream); + } stream_set_state (stream, PINOS_STREAM_STATE_STREAMING, NULL); break; @@ -981,18 +930,11 @@ handle_node_command (PinosStream *stream, case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: { - SpaControlBuilder builder; - SpaControl control; - g_warning ("unhandled node command %d", command->type); - control_builder_init (stream, &builder); - add_async_complete (stream, &builder, seq, SPA_RESULT_NOT_IMPLEMENTED); - spa_control_builder_end (&builder, &control); + add_async_complete (stream, seq, SPA_RESULT_NOT_IMPLEMENTED); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->conn) < 0) + g_warning ("stream %p: error writing connection", stream); break; } @@ -1015,15 +957,13 @@ handle_node_command (PinosStream *stream, } static gboolean -parse_control (PinosStream *stream, - SpaControl *ctrl) +parse_connection (PinosStream *stream) { - SpaControlIter it; PinosStreamPrivate *priv = stream->priv; + SpaConnection *conn = priv->conn; - spa_control_iter_init (&it, ctrl); - while (spa_control_iter_next (&it) == SPA_RESULT_OK) { - SpaControlCmd cmd = spa_control_iter_get_cmd (&it); + while (spa_connection_has_next (conn) == SPA_RESULT_OK) { + SpaControlCmd cmd = spa_connection_get_cmd (conn); switch (cmd) { case SPA_CONTROL_CMD_NODE_UPDATE: @@ -1031,7 +971,7 @@ parse_control (PinosStream *stream, case SPA_CONTROL_CMD_PORT_STATUS_CHANGE: case SPA_CONTROL_CMD_NODE_STATE_CHANGE: case SPA_CONTROL_CMD_PROCESS_BUFFER: - g_warning ("got unexpected control %d", cmd); + g_warning ("got unexpected connection %d", cmd); break; case SPA_CONTROL_CMD_ADD_PORT: @@ -1044,7 +984,7 @@ parse_control (PinosStream *stream, SpaControlCmdSetFormat p; gpointer mem; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; if (priv->format) @@ -1067,10 +1007,10 @@ parse_control (PinosStream *stream, int fd; MemId mid; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; - fd = spa_control_get_fd (ctrl, p.fd_index, false); + fd = spa_connection_get_fd (conn, p.fd_index, false); if (fd == -1) break; @@ -1089,7 +1029,7 @@ parse_control (PinosStream *stream, SpaControlCmdRemoveMem p; MemId *mid; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; g_debug ("stream %p: remove mem %d", stream, p.mem_id); @@ -1102,11 +1042,9 @@ parse_control (PinosStream *stream, SpaControlCmdUseBuffers p; BufferId bid; unsigned int i, j; - SpaControlBuilder builder; - SpaControl control; SpaBuffer *b; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; /* clear previous buffers */ @@ -1196,26 +1134,22 @@ parse_control (PinosStream *stream, g_signal_emit (stream, signals[SIGNAL_ADD_BUFFER], 0, bid.id); } - control_builder_init (stream, &builder); if (p.n_buffers) { - add_state_change (stream, &builder, SPA_NODE_STATE_PAUSED); + add_state_change (stream, SPA_NODE_STATE_PAUSED); } else { - add_state_change (stream, &builder, SPA_NODE_STATE_READY); + add_state_change (stream, SPA_NODE_STATE_READY); } - add_async_complete (stream, &builder, p.seq, SPA_RESULT_OK); - spa_control_builder_end (&builder, &control); + add_async_complete (stream, p.seq, SPA_RESULT_OK); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (conn) < 0) + g_warning ("stream %p: error writing connection", stream); break; } case SPA_CONTROL_CMD_NODE_EVENT: { SpaControlCmdNodeEvent p; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; handle_node_event (stream, p.event); @@ -1225,7 +1159,7 @@ parse_control (PinosStream *stream, { SpaControlCmdNodeCommand p; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; handle_node_command (stream, p.seq, p.command); @@ -1237,21 +1171,17 @@ parse_control (PinosStream *stream, break; } } - spa_control_iter_end (&it); - return TRUE; } static gboolean -parse_rtcontrol (PinosStream *stream, - SpaControl *ctrl) +parse_rtconnection (PinosStream *stream) { - SpaControlIter it; PinosStreamPrivate *priv = stream->priv; + SpaConnection *conn = priv->rtconn; - spa_control_iter_init (&it, ctrl); - while (spa_control_iter_next (&it) == SPA_RESULT_OK) { - SpaControlCmd cmd = spa_control_iter_get_cmd (&it); + while (spa_connection_has_next (conn) == SPA_RESULT_OK) { + SpaControlCmd cmd = spa_connection_get_cmd (conn); switch (cmd) { case SPA_CONTROL_CMD_INVALID: @@ -1267,7 +1197,7 @@ parse_rtcontrol (PinosStream *stream, case SPA_CONTROL_CMD_REMOVE_MEM: case SPA_CONTROL_CMD_USE_BUFFERS: case SPA_CONTROL_CMD_NODE_COMMAND: - g_warning ("got unexpected control %d", cmd); + g_warning ("got unexpected connection %d", cmd); break; case SPA_CONTROL_CMD_PROCESS_BUFFER: @@ -1279,7 +1209,7 @@ parse_rtcontrol (PinosStream *stream, if (priv->direction != SPA_DIRECTION_INPUT) break; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; if ((bid = find_buffer (stream, p.buffer_id))) { @@ -1296,7 +1226,7 @@ parse_rtcontrol (PinosStream *stream, { SpaControlCmdNodeEvent p; - if (spa_control_iter_parse_cmd (&it, &p) < 0) + if (spa_connection_parse_cmd (conn, &p) < 0) break; handle_rtnode_event (stream, p.event); @@ -1304,7 +1234,6 @@ parse_rtcontrol (PinosStream *stream, } } } - spa_control_iter_end (&it); return TRUE; } @@ -1315,26 +1244,11 @@ on_socket_condition (GSocket *socket, gpointer user_data) { PinosStream *stream = user_data; - PinosStreamPrivate *priv = stream->priv; switch (condition) { case G_IO_IN: { - SpaControl *control = &priv->recv_control; - - if (spa_control_read (control, - priv->fd, - priv->recv_data, - MAX_BUFFER_SIZE, - priv->recv_fds, - MAX_FDS) < 0) { - g_warning ("stream %p: failed to read buffer", stream); - return TRUE; - } - - parse_control (stream, control); - - spa_control_clear (control); + parse_connection (stream); break; } @@ -1354,27 +1268,11 @@ on_rtsocket_condition (GSocket *socket, gpointer user_data) { PinosStream *stream = user_data; - PinosStreamPrivate *priv = stream->priv; switch (condition) { case G_IO_IN: { - SpaControl *control = &priv->recv_control; - guint8 buffer[4096]; - - if (spa_control_read (control, - priv->rtfd, - buffer, - sizeof (buffer), - NULL, - 0) < 0) { - g_warning ("stream %p: failed to read buffer", stream); - return TRUE; - } - - parse_rtcontrol (stream, control); - - spa_control_clear (control); + parse_rtconnection (stream); break; } @@ -1393,17 +1291,11 @@ on_timeout (gpointer user_data) { PinosStream *stream = user_data; PinosStreamPrivate *priv = stream->priv; - SpaControlBuilder builder; - SpaControl control; - control_builder_init (stream, &builder); - add_request_clock_update (stream, &builder); - spa_control_builder_end (&builder, &control); + add_request_clock_update (stream); - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->conn) < 0) + g_warning ("stream %p: error writing connection", stream); return G_SOURCE_CONTINUE; } @@ -1425,11 +1317,13 @@ handle_socket (PinosStream *stream, gint fd, gint rtfd) priv->socket_source = g_socket_create_source (priv->socket, G_IO_IN, NULL); g_source_set_callback (priv->socket_source, (GSourceFunc) on_socket_condition, stream, NULL); g_source_attach (priv->socket_source, priv->context->priv->context); + priv->conn = spa_connection_new (priv->fd); priv->rtfd = g_socket_get_fd (priv->rtsocket); priv->rtsocket_source = g_socket_create_source (priv->rtsocket, G_IO_IN, NULL); g_source_set_callback (priv->rtsocket_source, (GSourceFunc) on_rtsocket_condition, stream, NULL); g_source_attach (priv->rtsocket_source, priv->context->priv->context); + priv->rtconn = spa_connection_new (priv->rtfd); priv->timeout_source = g_timeout_source_new (100); g_source_set_callback (priv->timeout_source, (GSourceFunc) on_timeout, stream, NULL); @@ -1742,8 +1636,6 @@ pinos_stream_finish_format (PinosStream *stream, { PinosStreamPrivate *priv; PinosContext *context; - SpaControlBuilder builder; - SpaControl control; g_return_val_if_fail (PINOS_IS_STREAM (stream), FALSE); priv = stream->priv; @@ -1755,30 +1647,25 @@ pinos_stream_finish_format (PinosStream *stream, priv->port_info.params = params; priv->port_info.n_params = n_params; - control_builder_init (stream, &builder); - if (SPA_RESULT_IS_OK (res)) { - add_port_update (stream, &builder, SPA_CONTROL_CMD_PORT_UPDATE_INFO | - SPA_CONTROL_CMD_PORT_UPDATE_FORMAT); + add_port_update (stream, SPA_CONTROL_CMD_PORT_UPDATE_INFO | + SPA_CONTROL_CMD_PORT_UPDATE_FORMAT); if (priv->format) { - add_state_change (stream, &builder, SPA_NODE_STATE_READY); + add_state_change (stream, SPA_NODE_STATE_READY); } else { clear_buffers (stream); - add_state_change (stream, &builder, SPA_NODE_STATE_CONFIGURE); + add_state_change (stream, SPA_NODE_STATE_CONFIGURE); } } priv->port_info.params = NULL; priv->port_info.n_params = 0; - add_async_complete (stream, &builder, priv->pending_seq, res); - spa_control_builder_end (&builder, &control); + add_async_complete (stream, priv->pending_seq, res); priv->pending_seq = SPA_ID_INVALID; - if (spa_control_write (&control, priv->fd) < 0) - g_warning ("stream %p: error writing control", stream); - - spa_control_clear (&control); + if (spa_connection_flush (priv->conn) < 0) + g_warning ("stream %p: error writing connection", stream); return TRUE; } diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 0402b030f..a5405cb51 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -32,7 +32,7 @@ #include "pinos/client/pinos.h" #include "pinos/client/enumtypes.h" #include "pinos/client/private.h" -#include "pinos/client/control.h" +#include "pinos/client/connection.h" #include "pinos/client/serialize.h" #include "pinos/server/daemon.h" @@ -103,8 +103,11 @@ struct _SpaProxy SpaPollFd fds[1]; SpaPollItem poll; + SpaConnection *conn; + SpaPollFd rtfds[1]; SpaPollItem rtpoll; + SpaConnection *rtconn; unsigned int max_inputs; unsigned int n_inputs; @@ -207,22 +210,15 @@ spa_proxy_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_DRAIN: case SPA_NODE_COMMAND_MARKER: { - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; SpaControlCmdNodeCommand cnc; /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); cnc.seq = this->seq++; cnc.command = command; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - spa_control_builder_end (&builder, &control); + spa_connection_add_cmd (this->conn, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); + if ((res = spa_connection_flush (this->conn)) < 0) + spa_log_error (this->log, "proxy %p: error writing connection %d\n", this, res); res = SPA_RESULT_RETURN_ASYNC (cnc.seq); break; @@ -230,21 +226,15 @@ spa_proxy_node_send_command (SpaNode *node, case SPA_NODE_COMMAND_CLOCK_UPDATE: { - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; SpaControlCmdNodeCommand cnc; /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); cnc.command = command; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - spa_control_builder_end (&builder, &control); + spa_connection_add_cmd (this->conn, SPA_CONTROL_CMD_NODE_COMMAND, &cnc); - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res); + if ((res = spa_connection_flush (this->conn)) < 0) + spa_log_error (this->log, "proxy %p: error writing connection %d\n", this, res); - spa_control_clear (&control); break; } } @@ -502,10 +492,7 @@ spa_proxy_node_port_set_format (SpaNode *node, const SpaFormat *format) { SpaProxy *this; - SpaControl control; - SpaControlBuilder builder; SpaControlCmdSetFormat sf; - uint8_t buf[128]; SpaResult res; if (node == NULL) @@ -516,19 +503,15 @@ spa_proxy_node_port_set_format (SpaNode *node, if (!CHECK_PORT (this, direction, port_id)) return SPA_RESULT_INVALID_PORT; - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); sf.seq = this->seq++; sf.direction = direction; sf.port_id = port_id; sf.flags = flags; sf.format = (SpaFormat *) format; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_SET_FORMAT, &sf); - spa_control_builder_end (&builder, &control); + spa_connection_add_cmd (this->conn, SPA_CONTROL_CMD_SET_FORMAT, &sf); - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - spa_log_error (this->log, "proxy %p: error writing control\n", this); - - spa_control_clear (&control); + if ((res = spa_connection_flush (this->conn)) < 0) + spa_log_error (this->log, "proxy %p: error writing connection\n", this); return SPA_RESULT_RETURN_ASYNC (sf.seq); } @@ -639,10 +622,6 @@ spa_proxy_node_port_use_buffers (SpaNode *node, SpaProxy *this; SpaProxyPort *port; unsigned int i, j; - SpaControl control; - SpaControlBuilder builder; - uint8_t buf[4096]; - int fds[32]; SpaResult res; SpaControlCmdAddMem am; SpaControlCmdUseBuffers ub; @@ -666,8 +645,6 @@ spa_proxy_node_port_use_buffers (SpaNode *node, clear_buffers (this, port); - spa_control_builder_init_into (&builder, buf, sizeof (buf), fds, SPA_N_ELEMENTS (fds)); - /* find size to store buffers */ size = 0; n_mem = 0; @@ -698,11 +675,11 @@ spa_proxy_node_port_use_buffers (SpaNode *node, am.port_id = port_id; am.mem_id = n_mem; am.type = d->type; - am.fd_index = spa_control_builder_add_fd (&builder, d->fd, false); + am.fd_index = spa_connection_add_fd (this->conn, d->fd, false); am.flags = d->flags; am.offset = d->offset; am.size = d->maxsize; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am); + spa_connection_add_cmd (this->conn, SPA_CONTROL_CMD_ADD_MEM, &am); b->buffer.datas[j].type = SPA_DATA_TYPE_ID; b->buffer.datas[j].data = SPA_UINT32_TO_PTR (n_mem); @@ -770,11 +747,11 @@ spa_proxy_node_port_use_buffers (SpaNode *node, am.port_id = port_id; am.mem_id = port->buffer_mem_id; am.type = SPA_DATA_TYPE_MEMFD; - am.fd_index = spa_control_builder_add_fd (&builder, port->buffer_mem_fd, false); + am.fd_index = spa_connection_add_fd (this->conn, port->buffer_mem_fd, false); am.flags = 0; am.offset = 0; am.size = size; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_ADD_MEM, &am); + spa_connection_add_cmd (this->conn, SPA_CONTROL_CMD_ADD_MEM, &am); memref = alloca (n_buffers * sizeof (SpaControlMemRef)); for (i = 0; i < n_buffers; i++) { @@ -792,14 +769,10 @@ spa_proxy_node_port_use_buffers (SpaNode *node, ub.port_id = port_id; ub.n_buffers = n_buffers; ub.buffers = memref; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_USE_BUFFERS, &ub); + spa_connection_add_cmd (this->conn, SPA_CONTROL_CMD_USE_BUFFERS, &ub); - spa_control_builder_end (&builder, &control); - - if ((res = spa_control_write (&control, this->fds[0].fd)) < 0) - spa_log_error (this->log, "proxy %p: error writing control\n", this); - - spa_control_clear (&control); + if ((res = spa_connection_flush (this->conn)) < 0) + spa_log_error (this->log, "proxy %p: error writing connection\n", this); return SPA_RESULT_RETURN_ASYNC (ub.seq); } @@ -882,10 +855,7 @@ spa_proxy_node_port_push_input (SpaNode *node, unsigned int i; bool have_error = false; bool have_enough = false; - SpaControl control; - SpaControlBuilder builder; SpaControlCmdProcessBuffer pb; - uint8_t buf[64]; SpaResult res; if (node == NULL || n_info == 0 || info == NULL) @@ -893,8 +863,6 @@ spa_proxy_node_port_push_input (SpaNode *node, this = SPA_CONTAINER_OF (node, SpaProxy, node); - spa_control_builder_init_into (&builder, buf, sizeof(buf), NULL, 0); - for (i = 0; i < n_info; i++) { if (!CHECK_IN_PORT (this, SPA_DIRECTION_INPUT, info[i].port_id)) { spa_log_warn (this->log, "invalid port %d\n", info[i].port_id); @@ -923,21 +891,18 @@ spa_proxy_node_port_push_input (SpaNode *node, pb.direction = SPA_DIRECTION_INPUT; pb.port_id = info[i].port_id; pb.buffer_id = info[i].buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); + spa_connection_add_cmd (this->rtconn, SPA_CONTROL_CMD_PROCESS_BUFFER, &pb); info[i].status = SPA_RESULT_OK; } - spa_control_builder_end (&builder, &control); if (have_error) return SPA_RESULT_ERROR; if (have_enough) return SPA_RESULT_HAVE_ENOUGH_INPUT; - if ((res = spa_control_write (&control, this->rtfds[0].fd)) < 0) - spa_log_error (this->log, "proxy %p: error writing control\n", this); - - spa_control_clear (&control); + if ((res = spa_connection_flush (this->rtconn)) < 0) + spa_log_error (this->log, "proxy %p: error writing connection\n", this); return SPA_RESULT_OK; } @@ -993,9 +958,6 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, uint32_t buffer_id) { SpaProxy *this; - SpaControlBuilder builder; - SpaControl control; - uint8_t buf[128]; SpaResult res; SpaControlCmdNodeEvent cne; SpaNodeEvent ne; @@ -1010,20 +972,16 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node, return SPA_RESULT_INVALID_PORT; /* send start */ - spa_control_builder_init_into (&builder, buf, sizeof (buf), NULL, 0); cne.event = ≠ ne.type = SPA_NODE_EVENT_TYPE_REUSE_BUFFER; ne.data = &rb; ne.size = sizeof (rb); rb.port_id = port_id; rb.buffer_id = buffer_id; - spa_control_builder_add_cmd (&builder, SPA_CONTROL_CMD_NODE_EVENT, &cne); - spa_control_builder_end (&builder, &control); + spa_connection_add_cmd (this->rtconn, SPA_CONTROL_CMD_NODE_EVENT, &cne); - if ((res = spa_control_write (&control, this->rtfds[0].fd)) < 0) - spa_log_error (this->log, "proxy %p: error writing control %d\n", this, res); - - spa_control_clear (&control); + if ((res = spa_connection_flush (this->rtconn)) < 0) + spa_log_error (this->log, "proxy %p: error writing connection %d\n", this, res); return res; } @@ -1075,14 +1033,12 @@ handle_node_event (SpaProxy *this, } static SpaResult -parse_control (SpaProxy *this, - SpaControl *ctrl) +parse_connection (SpaProxy *this) { - SpaControlIter it; + SpaConnection *conn = this->conn; - spa_control_iter_init (&it, ctrl); - while (spa_control_iter_next (&it) == SPA_RESULT_OK) { - SpaControlCmd cmd = spa_control_iter_get_cmd (&it); + while (spa_connection_has_next (conn) == SPA_RESULT_OK) { + SpaControlCmd cmd = spa_connection_get_cmd (conn); switch (cmd) { case SPA_CONTROL_CMD_INVALID: @@ -1092,14 +1048,14 @@ parse_control (SpaProxy *this, case SPA_CONTROL_CMD_SET_PROPERTY: case SPA_CONTROL_CMD_NODE_COMMAND: case SPA_CONTROL_CMD_PROCESS_BUFFER: - spa_log_error (this->log, "proxy %p: got unexpected control %d\n", this, cmd); + spa_log_error (this->log, "proxy %p: got unexpected connection %d\n", this, cmd); break; case SPA_CONTROL_CMD_NODE_UPDATE: { SpaControlCmdNodeUpdate nu; - if (spa_control_iter_parse_cmd (&it, &nu) < 0) + if (spa_connection_parse_cmd (conn, &nu) < 0) break; if (nu.change_mask & SPA_CONTROL_CMD_NODE_UPDATE_MAX_INPUTS) @@ -1119,7 +1075,7 @@ parse_control (SpaProxy *this, bool remove; spa_log_info (this->log, "proxy %p: got port update %d\n", this, cmd); - if (spa_control_iter_parse_cmd (&it, &pu) < 0) + if (spa_connection_parse_cmd (conn, &pu) < 0) break; if (!CHECK_PORT_ID (this, pu.direction, pu.port_id)) @@ -1146,7 +1102,7 @@ parse_control (SpaProxy *this, SpaControlCmdNodeStateChange sc; SpaNodeState old = this->node.state; - if (spa_control_iter_parse_cmd (&it, &sc) < 0) + if (spa_connection_parse_cmd (conn, &sc) < 0) break; spa_log_info (this->log, "proxy %p: got node state change %d -> %d\n", this, old, sc.state); @@ -1168,7 +1124,7 @@ parse_control (SpaProxy *this, { SpaControlCmdNodeEvent cne; - if (spa_control_iter_parse_cmd (&it, &cne) < 0) + if (spa_connection_parse_cmd (conn, &cne) < 0) break; handle_node_event (this, cne.event); @@ -1176,20 +1132,17 @@ parse_control (SpaProxy *this, } } } - spa_control_iter_end (&it); return SPA_RESULT_OK; } static SpaResult -parse_rtcontrol (SpaProxy *this, - SpaControl *ctrl) +parse_rtconnection (SpaProxy *this) { - SpaControlIter it; + SpaConnection *conn = this->rtconn; - spa_control_iter_init (&it, ctrl); - while (spa_control_iter_next (&it) == SPA_RESULT_OK) { - SpaControlCmd cmd = spa_control_iter_get_cmd (&it); + while (spa_connection_has_next (conn) == SPA_RESULT_OK) { + SpaControlCmd cmd = spa_connection_get_cmd (conn); switch (cmd) { case SPA_CONTROL_CMD_INVALID: @@ -1205,7 +1158,7 @@ parse_rtcontrol (SpaProxy *this, case SPA_CONTROL_CMD_ADD_MEM: case SPA_CONTROL_CMD_REMOVE_MEM: case SPA_CONTROL_CMD_USE_BUFFERS: - spa_log_error (this->log, "proxy %p: got unexpected control %d\n", this, cmd); + spa_log_error (this->log, "proxy %p: got unexpected connection %d\n", this, cmd); break; case SPA_CONTROL_CMD_PROCESS_BUFFER: @@ -1213,7 +1166,7 @@ parse_rtcontrol (SpaProxy *this, SpaControlCmdProcessBuffer cmd; SpaProxyPort *port; - if (spa_control_iter_parse_cmd (&it, &cmd) < 0) + if (spa_connection_parse_cmd (conn, &cmd) < 0) break; if (!CHECK_PORT (this, cmd.direction, cmd.port_id)) @@ -1233,7 +1186,7 @@ parse_rtcontrol (SpaProxy *this, { SpaControlCmdNodeEvent cne; - if (spa_control_iter_parse_cmd (&it, &cne) < 0) + if (spa_connection_parse_cmd (conn, &cne) < 0) break; handle_node_event (this, cne.event); @@ -1241,7 +1194,6 @@ parse_rtcontrol (SpaProxy *this, } } } - spa_control_iter_end (&it); return SPA_RESULT_OK; } @@ -1250,19 +1202,9 @@ static int proxy_on_fd_events (SpaPollNotifyData *data) { SpaProxy *this = data->user_data; - SpaResult res; if (data->fds[0].revents & POLLIN) { - SpaControl control; - uint8_t buf[1024]; - int fds[16]; - - if ((res = spa_control_read (&control, data->fds[0].fd, buf, sizeof (buf), fds, 16)) < 0) { - spa_log_error (this->log, "proxy %p: failed to read control: %d\n", this, res); - return 0; - } - parse_control (this, &control); - spa_control_clear (&control); + parse_connection (this); } return 0; } @@ -1271,18 +1213,9 @@ static int proxy_on_rtfd_events (SpaPollNotifyData *data) { SpaProxy *this = data->user_data; - SpaResult res; if (data->fds[0].revents & POLLIN) { - SpaControl control; - uint8_t buf[1024]; - - if ((res = spa_control_read (&control, data->fds[0].fd, buf, sizeof (buf), NULL, 0)) < 0) { - spa_log_error (this->log, "proxy %p: failed to read control: %d\n", this, res); - return 0; - } - parse_rtcontrol (this, &control); - spa_control_clear (&control); + parse_rtconnection (this); } return 0; } @@ -1552,7 +1485,10 @@ pinos_client_node_get_socket_pair (PinosClientNode *this, goto create_failed; priv->proxy->fds[0].fd = g_socket_get_fd (priv->sockets[0]); + priv->proxy->conn = spa_connection_new (priv->proxy->fds[0].fd); + spa_poll_add_item (priv->proxy->main_loop, &priv->proxy->poll); + } return g_object_ref (priv->sockets[1]); @@ -1607,6 +1543,8 @@ pinos_client_node_get_rtsocket_pair (PinosClientNode *this, goto create_failed; priv->proxy->rtfds[0].fd = g_socket_get_fd (priv->rtsockets[0]); + priv->proxy->rtconn = spa_connection_new (priv->proxy->rtfds[0].fd); + spa_poll_add_item (priv->proxy->data_loop, &priv->proxy->rtpoll); } return g_object_ref (priv->rtsockets[1]);