mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2026-02-15 22:05:28 -05:00
protocol-native: pass a message around
Pass a message around to make things more extensible later. Keep fds per message if we ever want to write individual messages. Pass number of fds in the message header. We might need this to close the fds when the proxy is gone.
This commit is contained in:
parent
30747942ac
commit
70e62aacd7
7 changed files with 316 additions and 304 deletions
|
|
@ -52,9 +52,10 @@ struct buffer {
|
|||
int fds[MAX_FDS];
|
||||
uint32_t n_fds;
|
||||
|
||||
uint32_t seq;
|
||||
size_t offset;
|
||||
void *data;
|
||||
size_t size;
|
||||
size_t fds_offset;
|
||||
struct pw_protocol_native_message msg;
|
||||
|
||||
bool update;
|
||||
};
|
||||
|
|
@ -64,10 +65,6 @@ struct impl {
|
|||
struct pw_core *core;
|
||||
|
||||
struct buffer in, out;
|
||||
|
||||
uint32_t dest_id;
|
||||
uint8_t opcode;
|
||||
uint32_t seq;
|
||||
struct spa_pod_builder builder;
|
||||
};
|
||||
|
||||
|
|
@ -84,11 +81,12 @@ struct impl {
|
|||
int pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection *conn, uint32_t index)
|
||||
{
|
||||
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
|
||||
struct buffer *buf = &impl->in;
|
||||
|
||||
if (index >= impl->in.n_fds)
|
||||
if (index >= buf->msg.n_fds)
|
||||
return -1;
|
||||
|
||||
return impl->in.fds[index];
|
||||
return buf->msg.fds[index];
|
||||
}
|
||||
|
||||
/** Add an fd to a connection
|
||||
|
|
@ -102,21 +100,23 @@ int pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection *c
|
|||
uint32_t pw_protocol_native_connection_add_fd(struct pw_protocol_native_connection *conn, int fd)
|
||||
{
|
||||
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
|
||||
struct buffer *buf = &impl->out;
|
||||
uint32_t index, i;
|
||||
|
||||
for (i = 0; i < impl->out.n_fds; i++) {
|
||||
if (impl->out.fds[i] == fd)
|
||||
for (i = 0; i < buf->msg.n_fds; i++) {
|
||||
if (buf->msg.fds[i] == fd)
|
||||
return i;
|
||||
}
|
||||
|
||||
index = impl->out.n_fds;
|
||||
if (index >= MAX_FDS) {
|
||||
index = buf->msg.n_fds;
|
||||
if (index + buf->n_fds >= MAX_FDS) {
|
||||
pw_log_error("connection %p: too many fds", conn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
impl->out.fds[index] = fd;
|
||||
impl->out.n_fds++;
|
||||
buf->msg.fds[index] = fd;
|
||||
buf->msg.n_fds++;
|
||||
pw_log_debug("connection %p: add fd %d at index %d", conn, fd, index);
|
||||
|
||||
return index;
|
||||
}
|
||||
|
|
@ -139,7 +139,7 @@ static void *connection_ensure_size(struct pw_protocol_native_connection *conn,
|
|||
return (uint8_t *) buf->buffer_data + buf->buffer_size;
|
||||
}
|
||||
|
||||
static bool refill_buffer(struct pw_protocol_native_connection *conn, struct buffer *buf)
|
||||
static int refill_buffer(struct pw_protocol_native_connection *conn, struct buffer *buf)
|
||||
{
|
||||
ssize_t len;
|
||||
struct cmsghdr *cmsg;
|
||||
|
|
@ -163,7 +163,7 @@ static bool refill_buffer(struct pw_protocol_native_connection *conn, struct buf
|
|||
continue;
|
||||
if (errno != EAGAIN || errno != EWOULDBLOCK)
|
||||
goto recv_error;
|
||||
return false;
|
||||
return -EAGAIN;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
@ -183,20 +183,20 @@ static bool refill_buffer(struct pw_protocol_native_connection *conn, struct buf
|
|||
pw_log_trace("connection %p: %d read %zd bytes and %d fds", conn, conn->fd, len,
|
||||
n_fds);
|
||||
|
||||
return true;
|
||||
return 0;
|
||||
|
||||
/* ERRORS */
|
||||
recv_error:
|
||||
pw_log_error("could not recvmsg on fd %d: %s", conn->fd, strerror(errno));
|
||||
return false;
|
||||
return -errno;
|
||||
}
|
||||
|
||||
static void clear_buffer(struct buffer *buf)
|
||||
{
|
||||
buf->n_fds = 0;
|
||||
buf->offset = 0;
|
||||
buf->size = 0;
|
||||
buf->buffer_size = 0;
|
||||
buf->offset = 0;
|
||||
buf->fds_offset = 0;
|
||||
}
|
||||
|
||||
/** Make a new connection object for the given socket
|
||||
|
|
@ -262,6 +262,44 @@ void pw_protocol_native_connection_destroy(struct pw_protocol_native_connection
|
|||
free(impl);
|
||||
}
|
||||
|
||||
static int prepare_packet(struct pw_protocol_native_connection *conn, struct buffer *buf)
|
||||
{
|
||||
uint8_t *data;
|
||||
size_t size, len;
|
||||
uint32_t *p;
|
||||
|
||||
data = buf->buffer_data + buf->offset;
|
||||
size = buf->buffer_size - buf->offset;
|
||||
|
||||
if (size < HDR_SIZE)
|
||||
return HDR_SIZE;
|
||||
|
||||
p = (uint32_t *) data;
|
||||
data += HDR_SIZE;
|
||||
size -= HDR_SIZE;
|
||||
|
||||
buf->msg.id = p[0];
|
||||
buf->msg.opcode = p[1] >> 24;
|
||||
len = p[1] & 0xffffff;
|
||||
buf->msg.seq = p[2];
|
||||
buf->msg.n_fds = p[3];
|
||||
buf->msg.fds = &buf->fds[buf->fds_offset];
|
||||
|
||||
if (size < len)
|
||||
return len;
|
||||
|
||||
buf->msg.size = len;
|
||||
buf->msg.data = data;
|
||||
|
||||
buf->offset += HDR_SIZE + len;
|
||||
buf->fds_offset += buf->msg.n_fds;
|
||||
|
||||
if (buf->offset >= buf->buffer_size)
|
||||
clear_buffer(buf);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/** Move to the next packet in the connection
|
||||
*
|
||||
* \param conn the connection
|
||||
|
|
@ -276,74 +314,28 @@ void pw_protocol_native_connection_destroy(struct pw_protocol_native_connection
|
|||
*
|
||||
* \memberof pw_protocol_native_connection
|
||||
*/
|
||||
bool
|
||||
int
|
||||
pw_protocol_native_connection_get_next(struct pw_protocol_native_connection *conn,
|
||||
uint8_t *opcode,
|
||||
uint32_t *dest_id,
|
||||
void **dt,
|
||||
uint32_t *sz,
|
||||
int *seq)
|
||||
const struct pw_protocol_native_message **msg)
|
||||
{
|
||||
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
|
||||
size_t len, size;
|
||||
uint8_t *data;
|
||||
size_t len;
|
||||
int res;
|
||||
struct buffer *buf;
|
||||
uint32_t *p;
|
||||
|
||||
buf = &impl->in;
|
||||
|
||||
/* move to next packet */
|
||||
buf->offset += buf->size;
|
||||
while (1) {
|
||||
if ((len = prepare_packet(conn, buf)) == 0)
|
||||
break;
|
||||
|
||||
again:
|
||||
if (buf->update) {
|
||||
if (!refill_buffer(conn, buf))
|
||||
return false;
|
||||
buf->update = false;
|
||||
}
|
||||
|
||||
/* now read packet */
|
||||
data = buf->buffer_data;
|
||||
size = buf->buffer_size;
|
||||
|
||||
if (buf->offset >= size) {
|
||||
clear_buffer(buf);
|
||||
buf->update = true;
|
||||
goto again;
|
||||
}
|
||||
|
||||
data += buf->offset;
|
||||
size -= buf->offset;
|
||||
|
||||
if (size < HDR_SIZE) {
|
||||
if (connection_ensure_size(conn, buf, HDR_SIZE) == NULL)
|
||||
return false;
|
||||
buf->update = true;
|
||||
goto again;
|
||||
}
|
||||
p = (uint32_t *) data;
|
||||
data += HDR_SIZE;
|
||||
size -= HDR_SIZE;
|
||||
|
||||
*dest_id = p[0];
|
||||
*opcode = p[1] >> 24;
|
||||
len = p[1] & 0xffffff;
|
||||
*seq = p[2];
|
||||
|
||||
if (len > size) {
|
||||
if (connection_ensure_size(conn, buf, len) == NULL)
|
||||
return false;
|
||||
buf->update = true;
|
||||
goto again;
|
||||
return -ENOMEM;
|
||||
if ((res = refill_buffer(conn, buf)) < 0)
|
||||
return res;
|
||||
}
|
||||
buf->size = len;
|
||||
buf->data = data;
|
||||
buf->offset += HDR_SIZE;
|
||||
|
||||
*dt = buf->data;
|
||||
*sz = buf->size;
|
||||
|
||||
return true;
|
||||
*msg = &buf->msg;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static inline void *begin_write(struct pw_protocol_native_connection *conn, uint32_t size)
|
||||
|
|
@ -376,16 +368,22 @@ static const struct spa_pod_builder_callbacks builder_callbacks = {
|
|||
|
||||
struct spa_pod_builder *
|
||||
pw_protocol_native_connection_begin(struct pw_protocol_native_connection *conn,
|
||||
uint32_t id, uint8_t opcode, int *res)
|
||||
uint32_t id, uint8_t opcode,
|
||||
struct pw_protocol_native_message **msg)
|
||||
{
|
||||
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
|
||||
impl->dest_id = id;
|
||||
impl->opcode = opcode;
|
||||
struct buffer *buf = &impl->out;
|
||||
|
||||
buf->msg.id = id;
|
||||
buf->msg.opcode = opcode;
|
||||
impl->builder = SPA_POD_BUILDER_INIT(NULL, 0);
|
||||
impl->builder.callbacks = &builder_callbacks;
|
||||
impl->builder.callbacks_data = impl;
|
||||
if (res)
|
||||
*res = SPA_RESULT_RETURN_ASYNC(impl->seq);
|
||||
buf->msg.n_fds = 0;
|
||||
buf->msg.fds = &buf->fds[buf->n_fds];
|
||||
buf->msg.seq = buf->seq;
|
||||
if (msg)
|
||||
*msg = &buf->msg;
|
||||
return &impl->builder;
|
||||
}
|
||||
|
||||
|
|
@ -396,29 +394,31 @@ pw_protocol_native_connection_end(struct pw_protocol_native_connection *conn,
|
|||
struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
|
||||
uint32_t *p, size = builder->state.offset;
|
||||
struct buffer *buf = &impl->out;
|
||||
uint32_t seq;
|
||||
int res;
|
||||
|
||||
if ((p = connection_ensure_size(conn, buf, HDR_SIZE + size)) == NULL)
|
||||
return -ENOMEM;
|
||||
|
||||
seq = impl->seq;
|
||||
impl->seq = (impl->seq + 1) & SPA_ASYNC_SEQ_MASK;
|
||||
|
||||
p[0] = impl->dest_id;
|
||||
p[1] = (impl->opcode << 24) | (size & 0xffffff);
|
||||
p[2] = seq;
|
||||
p[0] = buf->msg.id;
|
||||
p[1] = (buf->msg.opcode << 24) | (size & 0xffffff);
|
||||
p[2] = buf->msg.seq;
|
||||
p[3] = buf->msg.n_fds;
|
||||
|
||||
buf->buffer_size += HDR_SIZE + size;
|
||||
buf->n_fds += buf->msg.n_fds;
|
||||
|
||||
if (debug_messages) {
|
||||
fprintf(stderr, ">>>>>>>>> out: %d %d %d\n", impl->dest_id, impl->opcode, size);
|
||||
fprintf(stderr, ">>>>>>>>> out: %d %d %d\n", buf->msg.id, buf->msg.opcode, size);
|
||||
spa_debug_pod(0, NULL, SPA_MEMBER(p, HDR_SIZE, struct spa_pod));
|
||||
}
|
||||
|
||||
buf->seq = (buf->seq + 1) & SPA_ASYNC_SEQ_MASK;
|
||||
res = SPA_RESULT_RETURN_ASYNC(buf->msg.seq);
|
||||
|
||||
spa_hook_list_call(&conn->listener_list,
|
||||
struct pw_protocol_native_connection_events, need_flush, 0);
|
||||
|
||||
return SPA_RESULT_RETURN_ASYNC(seq);
|
||||
return res;
|
||||
}
|
||||
|
||||
/** Flush the connection object
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue