From 3ab17281f61cb4dd3d780c6b4ccf6398662306e4 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 15 Mar 2017 16:16:16 +0100 Subject: [PATCH] connection: add need_flush signal Add a need_flush signal to the connection when something can be flushed. Add destroy signal to connection --- pinos/client/connection.c | 97 +++++++++++++++++++++++---------------- pinos/client/connection.h | 10 ++++ 2 files changed, 67 insertions(+), 40 deletions(-) diff --git a/pinos/client/connection.c b/pinos/client/connection.c index d2445a271..84b100d58 100644 --- a/pinos/client/connection.c +++ b/pinos/client/connection.c @@ -45,40 +45,44 @@ typedef struct { bool update; } ConnectionBuffer; -struct _PinosConnection { +typedef struct { + PinosConnection this; + ConnectionBuffer in, out; - int fd; -}; +} PinosConnectionImpl; int pinos_connection_get_fd (PinosConnection *conn, uint32_t index) { - if (index < 0 || index >= conn->in.n_fds) + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); + + if (index < 0 || index >= impl->in.n_fds) return -1; - return conn->in.fds[index]; + return impl->in.fds[index]; } uint32_t pinos_connection_add_fd (PinosConnection *conn, int fd) { + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); uint32_t index, i; - for (i = 0; i < conn->out.n_fds; i++) { - if (conn->out.fds[i] == fd) + for (i = 0; i < impl->out.n_fds; i++) { + if (impl->out.fds[i] == fd) return i; } - index = conn->out.n_fds; + index = impl->out.n_fds; if (index >= MAX_FDS) { pinos_log_error ("connection %p: too many fds", conn); return -1; } - conn->out.fds[index] = fd; - conn->out.n_fds++; + impl->out.fds[index] = fd; + impl->out.n_fds++; return index; } @@ -156,40 +160,51 @@ clear_buffer (ConnectionBuffer *buf) PinosConnection * pinos_connection_new (int fd) { - PinosConnection *c; + PinosConnectionImpl *impl; + PinosConnection *this; - c = calloc (1, sizeof (PinosConnection)); - if (c == NULL) + impl = calloc (1, sizeof (PinosConnectionImpl)); + if (impl == NULL) return NULL; - pinos_log_debug ("connection %p: new", c); + this = &impl->this; - c->fd = fd; - c->out.buffer_data = malloc (MAX_BUFFER_SIZE); - c->out.buffer_maxsize = MAX_BUFFER_SIZE; - c->in.buffer_data = malloc (MAX_BUFFER_SIZE); - c->in.buffer_maxsize = MAX_BUFFER_SIZE; - c->in.update = true; + pinos_log_debug ("connection %p: new", this); - if (c->out.buffer_data == NULL || c->in.buffer_data == NULL) + this->fd = fd; + pinos_signal_init (&this->need_flush); + pinos_signal_init (&this->destroy_signal); + + impl->out.buffer_data = malloc (MAX_BUFFER_SIZE); + impl->out.buffer_maxsize = MAX_BUFFER_SIZE; + impl->in.buffer_data = malloc (MAX_BUFFER_SIZE); + impl->in.buffer_maxsize = MAX_BUFFER_SIZE; + impl->in.update = true; + + if (impl->out.buffer_data == NULL || impl->in.buffer_data == NULL) goto no_mem; - return c; + return this; no_mem: - free (c->out.buffer_data); - free (c->in.buffer_data); - free (c); + free (impl->out.buffer_data); + free (impl->in.buffer_data); + free (impl); return NULL; } void pinos_connection_destroy (PinosConnection *conn) { + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); + pinos_log_debug ("connection %p: destroy", conn); - free (conn->out.buffer_data); - free (conn->in.buffer_data); - free (conn); + + pinos_signal_emit (&conn->destroy_signal, conn); + + free (impl->out.buffer_data); + free (impl->in.buffer_data); + free (impl); } /** @@ -207,14 +222,13 @@ pinos_connection_get_next (PinosConnection *conn, void **dt, uint32_t *sz) { + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); size_t len, size; uint8_t *data; ConnectionBuffer *buf; uint32_t *p; - spa_return_val_if_fail (conn != NULL, false); - - buf = &conn->in; + buf = &impl->in; /* move to next packet */ buf->offset += buf->size; @@ -270,8 +284,9 @@ void * pinos_connection_begin_write (PinosConnection *conn, uint32_t size) { + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); uint32_t *p; - ConnectionBuffer *buf = &conn->out; + ConnectionBuffer *buf = &impl->out; /* 4 for dest_id, 1 for opcode, 3 for size and size for payload */ p = connection_ensure_size (conn, buf, 8 + size); return p + 2; @@ -283,19 +298,23 @@ pinos_connection_end_write (PinosConnection *conn, uint8_t opcode, uint32_t size) { + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); uint32_t *p; - ConnectionBuffer *buf = &conn->out; + ConnectionBuffer *buf = &impl->out; p = connection_ensure_size (conn, buf, 8 + size); *p++ = dest_id; *p++ = (opcode << 24) | (size & 0xffffff); buf->buffer_size += 8 + size; + + pinos_signal_emit (&conn->need_flush, conn); } bool pinos_connection_flush (PinosConnection *conn) { + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); ssize_t len; struct msghdr msg = {0}; struct iovec iov[1]; @@ -304,9 +323,7 @@ pinos_connection_flush (PinosConnection *conn) int *cm, i, fds_len; ConnectionBuffer *buf; - spa_return_val_if_fail (conn != NULL, false); - - buf = &conn->out; + buf = &impl->out; if (buf->buffer_size == 0) return true; @@ -362,11 +379,11 @@ send_error: bool pinos_connection_clear (PinosConnection *conn) { - spa_return_val_if_fail (conn != NULL, false); + PinosConnectionImpl *impl = SPA_CONTAINER_OF (conn, PinosConnectionImpl, this); - clear_buffer (&conn->out); - clear_buffer (&conn->in); - conn->in.update = true; + clear_buffer (&impl->out); + clear_buffer (&impl->in); + impl->in.update = true; return true; } diff --git a/pinos/client/connection.h b/pinos/client/connection.h index 1ca229028..fdb509f25 100644 --- a/pinos/client/connection.h +++ b/pinos/client/connection.h @@ -25,9 +25,19 @@ extern "C" { #endif #include +#include typedef struct _PinosConnection PinosConnection; +struct _PinosConnection { + int fd; + + PINOS_SIGNAL (need_flush, (PinosListener *listener, + PinosConnection *conn)); + PINOS_SIGNAL (destroy_signal, (PinosListener *listener, + PinosConnection *conn)); +}; + PinosConnection * pinos_connection_new (int fd); void pinos_connection_destroy (PinosConnection *conn);