From 7d1d3bd666f98f22ec446964920c19afc73b6b0f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 21 Apr 2017 10:24:42 +0200 Subject: [PATCH] stream: use 2 eventfds for client <-> server signaling --- pinos/client/interfaces.h | 3 +- pinos/client/protocol-native.c | 15 ++++--- pinos/client/stream.c | 27 +++++++------ pinos/modules/module-protocol-dbus.c | 11 +++--- pinos/server/client-node.c | 59 ++++++++++++++++------------ pinos/server/client-node.h | 4 +- pinos/server/core.c | 9 +++-- pinos/server/protocol-native.c | 6 ++- spa/tests/test-ringbuffer.c | 2 +- 9 files changed, 80 insertions(+), 56 deletions(-) diff --git a/pinos/client/interfaces.h b/pinos/client/interfaces.h index 627823509..aa298e37e 100644 --- a/pinos/client/interfaces.h +++ b/pinos/client/interfaces.h @@ -217,7 +217,8 @@ typedef struct { typedef struct { void (*done) (void *object, - int datafd); + int readfd, + int writefd); void (*event) (void *object, const SpaEvent *event); void (*add_port) (void *object, diff --git a/pinos/client/protocol-native.c b/pinos/client/protocol-native.c index 739d3625f..e29dec015 100644 --- a/pinos/client/protocol-native.c +++ b/pinos/client/protocol-native.c @@ -576,17 +576,22 @@ client_node_demarshal_done (void *object, PinosProxy *proxy = object; SpaPODIter it; PinosConnection *connection = proxy->context->protocol_private; - int32_t idx; - int fd; + int32_t ridx, widx; + int readfd, writefd; if (!spa_pod_iter_struct (&it, data, size) || !spa_pod_iter_get (&it, - SPA_POD_TYPE_INT, &idx, + SPA_POD_TYPE_INT, &ridx, + SPA_POD_TYPE_INT, &widx, 0)) return false; - fd = pinos_connection_get_fd (connection, idx); - ((PinosClientNodeEvents*)proxy->implementation)->done (proxy, fd); + readfd = pinos_connection_get_fd (connection, ridx); + writefd = pinos_connection_get_fd (connection, widx); + if (readfd == -1 || writefd == -1) + return false; + + ((PinosClientNodeEvents*)proxy->implementation)->done (proxy, readfd, writefd); return true; } diff --git a/pinos/client/stream.c b/pinos/client/stream.c index e752352a0..273efd8a5 100644 --- a/pinos/client/stream.c +++ b/pinos/client/stream.c @@ -75,7 +75,8 @@ typedef struct PinosStreamMode mode; - int rtfd; + int rtreadfd; + int rtwritefd; SpaSource *rtsocket_source; PinosProxy *node_proxy; @@ -372,7 +373,7 @@ send_need_input (PinosStream *stream) ni.event.type = SPA_EVENT_NODE_NEED_INPUT; ni.event.size = sizeof (ni); pinos_transport_add_event (impl->trans, &ni.event); - write (impl->rtfd, &cmd, 8); + write (impl->rtwritefd, &cmd, 8); #endif } @@ -384,7 +385,7 @@ send_have_output (PinosStream *stream) uint64_t cmd = 1; pinos_transport_add_event (impl->trans, &ho); - write (impl->rtfd, &cmd, 8); + write (impl->rtwritefd, &cmd, 8); } static void @@ -553,7 +554,7 @@ on_rtsocket_condition (SpaLoopUtils *utils, SpaEvent event; uint64_t cmd; - read (impl->rtfd, &cmd, 8); + read (impl->rtreadfd, &cmd, 8); while (pinos_transport_next_event (impl->trans, &event) == SPA_RESULT_OK) { SpaEvent *ev = alloca (SPA_POD_SIZE (&event)); @@ -564,14 +565,15 @@ on_rtsocket_condition (SpaLoopUtils *utils, } static void -handle_socket (PinosStream *stream, int rtfd) +handle_socket (PinosStream *stream, int rtreadfd, int rtwritefd) { PinosStreamImpl *impl = SPA_CONTAINER_OF (stream, PinosStreamImpl, this); struct timespec interval; - impl->rtfd = rtfd; + impl->rtreadfd = rtreadfd; + impl->rtwritefd = rtwritefd; impl->rtsocket_source = pinos_loop_add_io (stream->context->loop, - impl->rtfd, + impl->rtreadfd, SPA_IO_ERR | SPA_IO_HUP, true, on_rtsocket_condition, @@ -659,13 +661,14 @@ handle_node_command (PinosStream *stream, static void client_node_done (void *object, - int datafd) + int readfd, + int writefd) { PinosProxy *proxy = object; PinosStream *stream = proxy->user_data; - pinos_log_info ("strean %p: create client node done with fd %d", stream, datafd); - handle_socket (stream, datafd); + pinos_log_info ("strean %p: create client node done with fds %d %d", stream, readfd, writefd); + handle_socket (stream, readfd, writefd); do_node_init (stream); stream_set_state (stream, PINOS_STREAM_STATE_CONFIGURE, NULL); @@ -927,7 +930,7 @@ client_node_transport (void *object, pinos_transport_destroy (impl->trans); impl->trans = pinos_transport_new_from_info (&info); - pinos_log_debug ("transport update %d %p", impl->rtfd, impl->trans); + pinos_log_debug ("transport update %p", impl->trans); } static const PinosClientNodeEvents client_node_events = { @@ -1156,7 +1159,7 @@ pinos_stream_recycle_buffer (PinosStream *stream, spa_list_insert (impl->free.prev, &bid->link); pinos_transport_add_event (impl->trans, (SpaEvent *)&rb); - write (impl->rtfd, &cmd, 8); + write (impl->rtwritefd, &cmd, 8); return true; } diff --git a/pinos/modules/module-protocol-dbus.c b/pinos/modules/module-protocol-dbus.c index 265d4ebd8..7375ac095 100644 --- a/pinos/modules/module-protocol-dbus.c +++ b/pinos/modules/module-protocol-dbus.c @@ -394,8 +394,8 @@ handle_create_client_node (PinosDaemon1 *interface, PinosProperties *props; GError *error = NULL; GUnixFDList *fdlist; - int ctrl_fd, data_fd; - int ctrl_idx, data_idx; + int ctrl_fd, data_rfd, data_wfd; + int ctrl_idx, data_ridx, data_widx; PinosProtocolDBusObject *object; int fd[2]; @@ -426,7 +426,7 @@ handle_create_client_node (PinosDaemon1 *interface, if (object == NULL) goto object_failed; - if ((res = pinos_client_node_get_data_socket (node, &data_fd)) < 0) + if ((res = pinos_client_node_get_fds (node, &data_rfd, &data_wfd)) < 0) goto no_socket; object_path = object->object_path; @@ -434,10 +434,11 @@ handle_create_client_node (PinosDaemon1 *interface, fdlist = g_unix_fd_list_new (); ctrl_idx = g_unix_fd_list_append (fdlist, ctrl_fd, &error); - data_idx = g_unix_fd_list_append (fdlist, data_fd, &error); + data_ridx = g_unix_fd_list_append (fdlist, data_rfd, &error); + data_widx = g_unix_fd_list_append (fdlist, data_wfd, &error); g_dbus_method_invocation_return_value_with_unix_fd_list (invocation, - g_variant_new ("(ohh)", object_path, ctrl_idx, data_idx), fdlist); + g_variant_new ("(ohhh)", object_path, ctrl_idx, data_ridx, data_widx), fdlist); g_object_unref (fdlist); return TRUE; diff --git a/pinos/server/client-node.c b/pinos/server/client-node.c index 428bef220..6498d429a 100644 --- a/pinos/server/client-node.c +++ b/pinos/server/client-node.c @@ -102,6 +102,7 @@ struct _SpaProxy PinosResource *resource; SpaSource data_source; + int writefd; uint32_t max_inputs; uint32_t n_inputs; @@ -129,7 +130,8 @@ struct _PinosClientNodeImpl PinosListener loop_changed; PinosListener global_added; - int data_fd; + int fds[2]; + int other_fds[2]; }; static SpaResult @@ -163,7 +165,7 @@ static inline void do_flush (SpaProxy *this) { uint64_t cmd = 1; - write (this->data_source.fd, &cmd, 8); + write (this->writefd, &cmd, 8); } static inline void @@ -1140,10 +1142,9 @@ client_node_resource_destroy (PinosResource *resource) pinos_signal_remove (&impl->loop_changed); pinos_signal_remove (&impl->initialized); - if (proxy->data_source.fd != -1) { + if (proxy->data_source.fd != -1) spa_loop_remove_source (proxy->data_loop, &proxy->data_source); - close (proxy->data_source.fd); - } + pinos_node_destroy (this->node); } @@ -1161,8 +1162,10 @@ on_node_free (PinosListener *listener, if (impl->transport) pinos_transport_destroy (impl->transport); - if (impl->data_fd != -1) - close (impl->data_fd); + if (impl->fds[0] != -1) + close (impl->fds[0]); + if (impl->fds[1] != -1) + close (impl->fds[1]); free (impl); } @@ -1193,7 +1196,7 @@ pinos_client_node_new (PinosClient *client, this->client = client; impl->core = client->core; - impl->data_fd = -1; + impl->fds[0] = impl->fds[1] = -1; pinos_log_debug ("client-node %p: new", impl); pinos_signal_init (&this->destroy_signal); @@ -1257,39 +1260,45 @@ pinos_client_node_destroy (PinosClientNode * this) } /** - * pinos_client_node_get_data_socket: + * pinos_client_node_get_fds: * @node: a #PinosClientNode - * @error: a #GError + * @readfd: an fd for reading + * @writefd: an fd for writing * - * Create or return a previously create socket pair for @node. The - * Socket for the other end is returned. + * Create or return a previously create set of fds for @node. * * Returns: %SPA_RESULT_OK on success */ SpaResult -pinos_client_node_get_data_socket (PinosClientNode *this, - int *fd) +pinos_client_node_get_fds (PinosClientNode *this, + int *readfd, + int *writefd) { PinosClientNodeImpl *impl = SPA_CONTAINER_OF (this, PinosClientNodeImpl, this); - if (impl->data_fd == -1) { -#if 1 - int fd[2]; - - if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, fd) != 0) + if (impl->fds[0] == -1) { +#if 0 + if (socketpair (AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0, impl->fds) != 0) return SPA_RESULT_ERRNO; - impl->proxy.data_source.fd = fd[0]; - impl->data_fd = fd[1]; + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[0]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[1]; #else - - impl->proxy.data_source.fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); - impl->data_fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->fds[0] = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->fds[1] = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK); + impl->proxy.data_source.fd = impl->fds[0]; + impl->proxy.writefd = impl->fds[1]; + impl->other_fds[0] = impl->fds[1]; + impl->other_fds[1] = impl->fds[0]; #endif spa_loop_add_source (impl->proxy.data_loop, &impl->proxy.data_source); pinos_log_debug ("client-node %p: add data fd %d", this, impl->proxy.data_source.fd); } - *fd = impl->data_fd; + *readfd = impl->other_fds[0]; + *writefd = impl->other_fds[1]; + return SPA_RESULT_OK; } diff --git a/pinos/server/client-node.h b/pinos/server/client-node.h index 199f11a7d..9402c021b 100644 --- a/pinos/server/client-node.h +++ b/pinos/server/client-node.h @@ -52,7 +52,9 @@ PinosClientNode * pinos_client_node_new (PinosClient *client, PinosProperties *properties); void pinos_client_node_destroy (PinosClientNode *node); -SpaResult pinos_client_node_get_data_socket (PinosClientNode *node, int *fd); +SpaResult pinos_client_node_get_fds (PinosClientNode *node, + int *readfd, + int *writefd); #ifdef __cplusplus } diff --git a/pinos/server/core.c b/pinos/server/core.c index 04bee3979..99d3efbe9 100644 --- a/pinos/server/core.c +++ b/pinos/server/core.c @@ -203,7 +203,7 @@ async_create_client_node_complete (PinosAccessData *data) PinosClient *client = resource->client; PinosClientNode *node; SpaResult res; - int data_fd; + int readfd, writefd; if (data->res != SPA_RESULT_OK) goto denied; @@ -215,16 +215,17 @@ async_create_client_node_complete (PinosAccessData *data) if (node == NULL) goto no_mem; - if ((res = pinos_client_node_get_data_socket (node, &data_fd)) < 0) { + if ((res = pinos_client_node_get_fds (node, &readfd, &writefd)) < 0) { pinos_core_notify_error (client->core_resource, resource->id, SPA_RESULT_ERROR, - "can't get data fd"); + "can't get data fds"); return; } pinos_client_node_notify_done (node->resource, - data_fd); + readfd, + writefd); goto done; no_mem: diff --git a/pinos/server/protocol-native.c b/pinos/server/protocol-native.c index 1ffe515ef..9d451ca35 100644 --- a/pinos/server/protocol-native.c +++ b/pinos/server/protocol-native.c @@ -530,7 +530,8 @@ client_marshal_info (void *object, static void client_node_marshal_done (void *object, - int datafd) + int readfd, + int writefd) { PinosResource *resource = object; PinosConnection *connection = resource->client->protocol_private; @@ -540,7 +541,8 @@ client_node_marshal_done (void *object, core_update_map (resource->client); spa_pod_builder_struct (&b.b, &f, - SPA_POD_TYPE_INT, pinos_connection_add_fd (connection, datafd)); + SPA_POD_TYPE_INT, pinos_connection_add_fd (connection, readfd), + SPA_POD_TYPE_INT, pinos_connection_add_fd (connection, writefd)); pinos_connection_end_write (connection, resource->id, PINOS_CLIENT_NODE_EVENT_DONE, b.b.offset); } diff --git a/spa/tests/test-ringbuffer.c b/spa/tests/test-ringbuffer.c index ee1d4647d..522027318 100644 --- a/spa/tests/test-ringbuffer.c +++ b/spa/tests/test-ringbuffer.c @@ -282,7 +282,7 @@ make_nodes (AppData *data) spa_pod_builder_init (&b, buffer, sizeof (buffer)); spa_pod_builder_props (&b, &f[0], data->type.props, SPA_POD_PROP (&f[1], data->type.props_device, 0, SPA_POD_TYPE_STRING, 1, "hw:1"), - SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, 128)); + SPA_POD_PROP (&f[1], data->type.props_min_latency, 0, SPA_POD_TYPE_INT, 1, 64)); props = SPA_POD_BUILDER_DEREF (&b, f[0].ref, SpaProps); if ((res = spa_node_set_props (data->sink, props)) < 0)