connection: add need_flush signal

Add a need_flush signal to the connection when something can be flushed.
Add destroy signal to connection
This commit is contained in:
Wim Taymans 2017-03-15 16:16:16 +01:00
parent c1cf1e6f67
commit 3ab17281f6
2 changed files with 67 additions and 40 deletions

View file

@ -45,40 +45,44 @@ typedef struct {
bool update; bool update;
} ConnectionBuffer; } ConnectionBuffer;
struct _PinosConnection { typedef struct {
PinosConnection this;
ConnectionBuffer in, out; ConnectionBuffer in, out;
int fd; } PinosConnectionImpl;
};
int int
pinos_connection_get_fd (PinosConnection *conn, pinos_connection_get_fd (PinosConnection *conn,
uint32_t index) uint32_t index)
{ {
if (index < 0 || index >= conn->in.n_fds) PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
if (index < 0 || index >= impl->in.n_fds)
return -1; return -1;
return conn->in.fds[index]; return impl->in.fds[index];
} }
uint32_t uint32_t
pinos_connection_add_fd (PinosConnection *conn, pinos_connection_add_fd (PinosConnection *conn,
int fd) int fd)
{ {
PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
uint32_t index, i; uint32_t index, i;
for (i = 0; i < conn->out.n_fds; i++) { for (i = 0; i < impl->out.n_fds; i++) {
if (conn->out.fds[i] == fd) if (impl->out.fds[i] == fd)
return i; return i;
} }
index = conn->out.n_fds; index = impl->out.n_fds;
if (index >= MAX_FDS) { if (index >= MAX_FDS) {
pinos_log_error ("connection %p: too many fds", conn); pinos_log_error ("connection %p: too many fds", conn);
return -1; return -1;
} }
conn->out.fds[index] = fd; impl->out.fds[index] = fd;
conn->out.n_fds++; impl->out.n_fds++;
return index; return index;
} }
@ -156,40 +160,51 @@ clear_buffer (ConnectionBuffer *buf)
PinosConnection * PinosConnection *
pinos_connection_new (int fd) pinos_connection_new (int fd)
{ {
PinosConnection *c; PinosConnectionImpl *impl;
PinosConnection *this;
c = calloc (1, sizeof (PinosConnection)); impl = calloc (1, sizeof (PinosConnectionImpl));
if (c == NULL) if (impl == NULL)
return NULL; return NULL;
pinos_log_debug ("connection %p: new", c); this = &impl->this;
c->fd = fd; pinos_log_debug ("connection %p: new", this);
c->out.buffer_data = malloc (MAX_BUFFER_SIZE);
c->out.buffer_maxsize = MAX_BUFFER_SIZE;
c->in.buffer_data = malloc (MAX_BUFFER_SIZE);
c->in.buffer_maxsize = MAX_BUFFER_SIZE;
c->in.update = true;
if (c->out.buffer_data == NULL || c->in.buffer_data == NULL) this->fd = fd;
pinos_signal_init (&this->need_flush);
pinos_signal_init (&this->destroy_signal);
impl->out.buffer_data = malloc (MAX_BUFFER_SIZE);
impl->out.buffer_maxsize = MAX_BUFFER_SIZE;
impl->in.buffer_data = malloc (MAX_BUFFER_SIZE);
impl->in.buffer_maxsize = MAX_BUFFER_SIZE;
impl->in.update = true;
if (impl->out.buffer_data == NULL || impl->in.buffer_data == NULL)
goto no_mem; goto no_mem;
return c; return this;
no_mem: no_mem:
free (c->out.buffer_data); free (impl->out.buffer_data);
free (c->in.buffer_data); free (impl->in.buffer_data);
free (c); free (impl);
return NULL; return NULL;
} }
void void
pinos_connection_destroy (PinosConnection *conn) pinos_connection_destroy (PinosConnection *conn)
{ {
PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
pinos_log_debug ("connection %p: destroy", conn); pinos_log_debug ("connection %p: destroy", conn);
free (conn->out.buffer_data);
free (conn->in.buffer_data); pinos_signal_emit (&conn->destroy_signal, conn);
free (conn);
free (impl->out.buffer_data);
free (impl->in.buffer_data);
free (impl);
} }
/** /**
@ -207,14 +222,13 @@ pinos_connection_get_next (PinosConnection *conn,
void **dt, void **dt,
uint32_t *sz) uint32_t *sz)
{ {
PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
size_t len, size; size_t len, size;
uint8_t *data; uint8_t *data;
ConnectionBuffer *buf; ConnectionBuffer *buf;
uint32_t *p; uint32_t *p;
spa_return_val_if_fail (conn != NULL, false); buf = &impl->in;
buf = &conn->in;
/* move to next packet */ /* move to next packet */
buf->offset += buf->size; buf->offset += buf->size;
@ -270,8 +284,9 @@ void *
pinos_connection_begin_write (PinosConnection *conn, pinos_connection_begin_write (PinosConnection *conn,
uint32_t size) uint32_t size)
{ {
PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
uint32_t *p; uint32_t *p;
ConnectionBuffer *buf = &conn->out; ConnectionBuffer *buf = &impl->out;
/* 4 for dest_id, 1 for opcode, 3 for size and size for payload */ /* 4 for dest_id, 1 for opcode, 3 for size and size for payload */
p = connection_ensure_size (conn, buf, 8 + size); p = connection_ensure_size (conn, buf, 8 + size);
return p + 2; return p + 2;
@ -283,19 +298,23 @@ pinos_connection_end_write (PinosConnection *conn,
uint8_t opcode, uint8_t opcode,
uint32_t size) uint32_t size)
{ {
PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
uint32_t *p; uint32_t *p;
ConnectionBuffer *buf = &conn->out; ConnectionBuffer *buf = &impl->out;
p = connection_ensure_size (conn, buf, 8 + size); p = connection_ensure_size (conn, buf, 8 + size);
*p++ = dest_id; *p++ = dest_id;
*p++ = (opcode << 24) | (size & 0xffffff); *p++ = (opcode << 24) | (size & 0xffffff);
buf->buffer_size += 8 + size; buf->buffer_size += 8 + size;
pinos_signal_emit (&conn->need_flush, conn);
} }
bool bool
pinos_connection_flush (PinosConnection *conn) pinos_connection_flush (PinosConnection *conn)
{ {
PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
ssize_t len; ssize_t len;
struct msghdr msg = {0}; struct msghdr msg = {0};
struct iovec iov[1]; struct iovec iov[1];
@ -304,9 +323,7 @@ pinos_connection_flush (PinosConnection *conn)
int *cm, i, fds_len; int *cm, i, fds_len;
ConnectionBuffer *buf; ConnectionBuffer *buf;
spa_return_val_if_fail (conn != NULL, false); buf = &impl->out;
buf = &conn->out;
if (buf->buffer_size == 0) if (buf->buffer_size == 0)
return true; return true;
@ -362,11 +379,11 @@ send_error:
bool bool
pinos_connection_clear (PinosConnection *conn) pinos_connection_clear (PinosConnection *conn)
{ {
spa_return_val_if_fail (conn != NULL, false); PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this);
clear_buffer (&conn->out); clear_buffer (&impl->out);
clear_buffer (&conn->in); clear_buffer (&impl->in);
conn->in.update = true; impl->in.update = true;
return true; return true;
} }

View file

@ -25,9 +25,19 @@ extern "C" {
#endif #endif
#include <spa/defs.h> #include <spa/defs.h>
#include <pinos/client/sig.h>
typedef struct _PinosConnection PinosConnection; typedef struct _PinosConnection PinosConnection;
struct _PinosConnection {
int fd;
PINOS_SIGNAL (need_flush, (PinosListener *listener,
PinosConnection *conn));
PINOS_SIGNAL (destroy_signal, (PinosListener *listener,
PinosConnection *conn));
};
PinosConnection * pinos_connection_new (int fd); PinosConnection * pinos_connection_new (int fd);
void pinos_connection_destroy (PinosConnection *conn); void pinos_connection_destroy (PinosConnection *conn);