WIP change object model

This commit is contained in:
Wim Taymans 2016-11-14 12:42:00 +01:00
parent 190f01d88e
commit c25ccbb4ba
44 changed files with 1557 additions and 2525 deletions

View file

@ -102,12 +102,12 @@ struct _SpaProxy
SpaNodeEventCallback event_cb;
void *user_data;
SpaPollFd fds[1];
SpaPollItem poll;
SpaPollFd ctrl_fds[1];
SpaPollItem ctrl_poll;
PinosConnection *conn;
SpaPollFd rtfds[1];
SpaPollItem rtpoll;
SpaPollFd data_fds[1];
SpaPollItem data_poll;
unsigned int max_inputs;
unsigned int n_inputs;
@ -123,17 +123,15 @@ typedef struct
{
PinosClientNode this;
PinosObject object;
PinosInterface ifaces[1];
PinosCore *core;
SpaProxy proxy;
PinosListener transport_changed;
PinosListener loop_changed;
int fd;
int rtfd;
int ctrl_fd;
int data_fd;
} PinosClientNodeImpl;
static void
@ -862,7 +860,7 @@ spa_proxy_node_port_reuse_buffer (SpaNode *node,
rb.buffer_id = buffer_id;
pinos_transport_add_event (pnode->transport, &rb.event);
cmd = PINOS_TRANSPORT_CMD_HAVE_EVENT;
write (this->rtfds[0].fd, &cmd, 1);
write (this->data_fds[0].fd, &cmd, 1);
return SPA_RESULT_OK;
}
@ -931,7 +929,7 @@ spa_proxy_node_process_input (SpaNode *node)
return SPA_RESULT_ERROR;
cmd = PINOS_TRANSPORT_CMD_HAVE_DATA;
write (this->rtfds[0].fd, &cmd, 1);
write (this->data_fds[0].fd, &cmd, 1);
return SPA_RESULT_OK;
}
@ -1105,7 +1103,7 @@ proxy_on_fd_events (SpaPollNotifyData *data)
}
static int
proxy_on_rtfd_events (SpaPollNotifyData *data)
proxy_on_data_fd_events (SpaPollNotifyData *data)
{
SpaProxy *this = data->user_data;
PinosNode *pnode = this->pnode;
@ -1113,7 +1111,7 @@ proxy_on_rtfd_events (SpaPollNotifyData *data)
if (data->fds[0].revents & POLLIN) {
uint8_t cmd;
read (this->rtfds[0].fd, &cmd, 1);
read (this->data_fds[0].fd, &cmd, 1);
if (cmd & PINOS_TRANSPORT_CMD_HAVE_EVENT) {
SpaNodeEvent event;
@ -1194,41 +1192,39 @@ proxy_init (SpaProxy *this,
this->node = proxy_node;
this->fds[0].fd = -1;
this->fds[0].events = POLLIN | POLLPRI | POLLERR;
this->fds[0].revents = 0;
this->poll.id = 0;
this->poll.enabled = true;
this->poll.fds = this->fds;
this->poll.n_fds = 1;
this->poll.idle_cb = NULL;
this->poll.before_cb = NULL;
this->poll.after_cb = proxy_on_fd_events;
this->poll.user_data = this;
this->ctrl_fds[0].fd = -1;
this->ctrl_fds[0].events = POLLIN | POLLPRI | POLLERR;
this->ctrl_fds[0].revents = 0;
this->ctrl_poll.id = 0;
this->ctrl_poll.enabled = true;
this->ctrl_poll.fds = this->ctrl_fds;
this->ctrl_poll.n_fds = 1;
this->ctrl_poll.idle_cb = NULL;
this->ctrl_poll.before_cb = NULL;
this->ctrl_poll.after_cb = proxy_on_fd_events;
this->ctrl_poll.user_data = this;
this->rtfds[0].fd = -1;
this->rtfds[0].events = POLLIN | POLLPRI | POLLERR;
this->rtfds[0].revents = 0;
this->rtpoll.id = 0;
this->rtpoll.enabled = true;
this->rtpoll.fds = this->rtfds;
this->rtpoll.n_fds = 1;
this->rtpoll.idle_cb = NULL;
this->rtpoll.before_cb = NULL;
this->rtpoll.after_cb = proxy_on_rtfd_events;
this->rtpoll.user_data = this;
this->data_fds[0].fd = -1;
this->data_fds[0].events = POLLIN | POLLPRI | POLLERR;
this->data_fds[0].revents = 0;
this->data_poll.id = 0;
this->data_poll.enabled = true;
this->data_poll.fds = this->data_fds;
this->data_poll.n_fds = 1;
this->data_poll.idle_cb = NULL;
this->data_poll.before_cb = NULL;
this->data_poll.after_cb = proxy_on_data_fd_events;
this->data_poll.user_data = this;
return SPA_RESULT_RETURN_ASYNC (this->seq++);
}
static void
on_transport_changed (PinosListener *listener,
void *object,
void *data)
PinosNode *node)
{
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (listener, PinosClientNodeImpl, transport_changed);
PinosClientNode *this = &impl->this;
PinosNode *node = object;
PinosTransportInfo info;
PinosMessageTransportUpdate tu;
PinosConnection *conn = impl->proxy.conn;
@ -1244,6 +1240,14 @@ on_transport_changed (PinosListener *listener,
pinos_log_error ("client-node %p: error writing connection", this);
}
static void
on_loop_changed (PinosListener *listener,
PinosNode *node)
{
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (listener, PinosClientNodeImpl, loop_changed);
impl->proxy.data_loop = &node->data_loop->poll;
}
static SpaResult
proxy_clear (SpaProxy *this)
{
@ -1257,35 +1261,18 @@ proxy_clear (SpaProxy *this)
if (this->out_ports[i].valid)
clear_port (this, &this->out_ports[i], SPA_DIRECTION_OUTPUT, i);
}
if (this->fds[0].fd != -1) {
spa_poll_remove_item (this->main_loop, &this->poll);
close (this->fds[0].fd);
if (this->ctrl_fds[0].fd != -1) {
spa_poll_remove_item (this->main_loop, &this->ctrl_poll);
close (this->ctrl_fds[0].fd);
}
if (this->rtfds[0].fd != -1) {
spa_poll_remove_item (this->data_loop, &this->rtpoll);
close (this->rtfds[0].fd);
if (this->data_fds[0].fd != -1) {
spa_poll_remove_item (this->data_loop, &this->data_poll);
close (this->data_fds[0].fd);
}
return SPA_RESULT_OK;
}
static void
client_node_destroy (PinosObject * object)
{
PinosClientNodeImpl *impl = (PinosClientNodeImpl *) object;
PinosClientNode *this = &impl->this;
pinos_log_debug ("client-node %p: destroy", this);
proxy_clear (&impl->proxy);
if (impl->fd != -1)
close (impl->fd);
if (impl->rtfd != -1)
close (impl->rtfd);
free (impl);
}
/**
* pinos_client_node_new:
* @daemon: a #PinosDaemon
@ -1310,16 +1297,12 @@ pinos_client_node_new (PinosCore *core,
impl = calloc (1, sizeof (PinosClientNodeImpl));
impl->core = core;
impl->ctrl_fd = -1;
impl->data_fd = -1;
this = &impl->this;
pinos_log_debug ("client-node %p: new", impl);
impl->ifaces[0].type = impl->core->registry.uri.node;
impl->ifaces[0].iface = this;
pinos_object_init (&this->object,
client_node_destroy,
1,
impl->ifaces);
pinos_signal_init (&this->destroy_signal);
proxy_init (&impl->proxy, NULL, core->support, core->n_support);
@ -1331,91 +1314,93 @@ pinos_client_node_new (PinosCore *core,
impl->proxy.pnode = this->node;
impl->transport_changed.notify = on_transport_changed;
pinos_signal_add (&this->node->transport_changed, &impl->transport_changed);
pinos_signal_add (&this->node->transport_changed,
&impl->transport_changed,
on_transport_changed);
pinos_signal_add (&this->node->loop_changed,
&impl->loop_changed,
on_loop_changed);
return this;
}
void
pinos_client_node_destroy (PinosClientNode * this)
{
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this);
pinos_log_debug ("client-node %p: destroy", impl);
pinos_signal_emit (&this->destroy_signal, this);
pinos_node_destroy (this->node);
proxy_clear (&impl->proxy);
if (impl->ctrl_fd != -1)
close (impl->ctrl_fd);
if (impl->data_fd != -1)
close (impl->data_fd);
free (impl);
}
/**
* pinos_client_node_get_socket_pair:
* pinos_client_node_get_ctrl_socket:
* @node: a #PinosClientNode
* @error: a #GError
* @fd: a result socket
*
* Create or return a previously create socket pair for @node. The
* Socket for the other end is returned.
* socket for the other end is returned.
*
* Returns: a socket that can be used to send/receive buffers to node.
* Returns: %SPA_RESULT_OK on success
*/
int
pinos_client_node_get_socket_pair (PinosClientNode *this,
GError **error)
SpaResult
pinos_client_node_get_ctrl_socket (PinosClientNode *this,
int *fd)
{
PinosClientNodeImpl *impl = (PinosClientNodeImpl *) this;
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this);
g_return_val_if_fail (this, -1);
if (impl->fd == -1) {
if (impl->ctrl_fd == -1) {
int fd[2];
if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0)
goto no_sockets;
return SPA_RESULT_ERRNO;
impl->proxy.fds[0].fd = fd[0];
impl->proxy.conn = pinos_connection_new (impl->proxy.fds[0].fd);
spa_poll_add_item (impl->proxy.main_loop, &impl->proxy.poll);
impl->fd = fd[1];
}
return impl->fd;
/* ERRORS */
no_sockets:
{
g_set_error (error,
G_IO_ERROR,
g_io_error_from_errno (errno),
"could not create socketpair: %s", strerror (errno));
return -1;
impl->proxy.ctrl_fds[0].fd = fd[0];
impl->proxy.conn = pinos_connection_new (impl->proxy.ctrl_fds[0].fd);
spa_poll_add_item (impl->proxy.main_loop, &impl->proxy.ctrl_poll);
impl->ctrl_fd = fd[1];
}
*fd = impl->ctrl_fd;
return SPA_RESULT_OK;
}
/**
* pinos_client_node_get_rtsocket_pair:
* pinos_client_node_get_data_socket:
* @node: a #PinosClientNode
* @error: a #GError
*
* Create or return a previously create socket pair for @node. The
* Socket for the other end is returned.
*
* Returns: a #GSocket that can be used to send/receive buffers to node.
* Returns: %SPA_RESULT_OK on success
*/
int
pinos_client_node_get_rtsocket_pair (PinosClientNode *this,
GError **error)
SpaResult
pinos_client_node_get_data_socket (PinosClientNode *this,
int *fd)
{
PinosClientNodeImpl *impl = (PinosClientNodeImpl *) this;
PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this);
g_return_val_if_fail (this, -1);
if (impl->fd == -1) {
if (impl->data_fd == -1) {
int fd[2];
if (socketpair (AF_UNIX, SOCK_STREAM, 0, fd) != 0)
goto no_sockets;
return SPA_RESULT_ERRNO;
impl->proxy.rtfds[0].fd = fd[0];
spa_poll_add_item (impl->proxy.data_loop, &impl->proxy.rtpoll);
impl->rtfd = fd[1];
}
return impl->rtfd;
/* ERRORS */
no_sockets:
{
g_set_error (error,
G_IO_ERROR,
g_io_error_from_errno (errno),
"could not create socketpair: %s", strerror (errno));
return -1;
impl->proxy.data_fds[0].fd = fd[0];
spa_poll_add_item (impl->proxy.data_loop, &impl->proxy.data_poll);
impl->data_fd = fd[1];
}
*fd = impl->data_fd;
return SPA_RESULT_OK;
}