diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index d22a8a1d9..b55d8a48f 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -99,7 +99,6 @@ struct node { struct pw_impl_client *client; struct spa_source data_source; - int writefd; struct pw_map ports[2]; @@ -128,9 +127,6 @@ struct impl { uint32_t bind_node_version; uint32_t bind_node_id; - - int fds[2]; - int other_fds[2]; }; #define pw_client_node_resource(r,m,v,...) \ @@ -903,18 +899,15 @@ static int impl_node_process(void *object) struct pw_impl_node *n = impl->this.node; struct timespec ts; - spa_log_trace_fp(this->log, "%p: send process driver:%p", this, impl->this.node->driver_node); - + /* this should not be called, we call the exported node + * directly */ + spa_log_warn(this->log, "exported node activation"); if (SPA_UNLIKELY(spa_system_clock_gettime(this->data_system, CLOCK_MONOTONIC, &ts) < 0)) spa_zero(ts); - n->rt.activation->status = PW_NODE_ACTIVATION_TRIGGERED; n->rt.activation->signal_time = SPA_TIMESPEC_TO_NSEC(&ts); - if (SPA_UNLIKELY(spa_system_eventfd_write(this->data_system, this->writefd, 1) < 0)) - spa_log_warn(this->log, "%p: error %m", this); - - return SPA_STATUS_OK; + return n->rt.target.signal_func(n->rt.target.data); } static struct pw_node * @@ -1259,8 +1252,8 @@ void pw_impl_client_node_registered(struct pw_impl_client_node *this, struct pw_ pw_resource_set_bound_id(this->resource, node_id); pw_client_node_resource_transport(this->resource, - impl->other_fds[0], - impl->other_fds[1], + impl->this.node->source.fd, + impl->node.data_source.fd, impl->activation->id, 0, sizeof(struct pw_node_activation)); @@ -1302,15 +1295,12 @@ static void node_initialized(void *data) struct pw_global *global; struct spa_system *data_system = impl->node.data_system; - impl->fds[0] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); - impl->fds[1] = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); - impl->other_fds[0] = impl->fds[1]; - impl->other_fds[1] = impl->fds[0]; - node->data_source.fd = impl->fds[0]; - node->writefd = impl->fds[1]; + node->data_source.fd = spa_system_eventfd_create(data_system, + SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); spa_loop_add_source(node->data_loop, &node->data_source); - pw_log_debug("%p: transport read-fd:%d write-fd:%d", node, impl->fds[0], impl->fds[1]); + pw_log_debug("%p: transport read-fd:%d write-fd:%d", node, + node->data_source.fd, impl->this.node->source.fd); if (add_area(impl) < 0) return; @@ -1355,10 +1345,8 @@ static void node_free(void *data) pw_map_clear(&impl->node.ports[1]); pw_map_clear(&impl->io_map); - if (impl->fds[0] != -1) - spa_system_close(data_system, impl->fds[0]); - if (impl->fds[1] != -1) - spa_system_close(data_system, impl->fds[1]); + if (node->data_source.fd != -1) + spa_system_close(data_system, node->data_source.fd); free(impl); } @@ -1724,7 +1712,7 @@ struct pw_impl_client_node *pw_impl_client_node_new(struct pw_resource *resource this = &impl->this; impl->context = context; - impl->fds[0] = impl->fds[1] = -1; + impl->node.data_source.fd = -1; pw_log_debug("%p: new", &impl->node); support = pw_context_get_support(impl->context, &n_support); diff --git a/src/modules/module-client-node/remote-node.c b/src/modules/module-client-node/remote-node.c index df6205efc..f0a73c6bc 100644 --- a/src/modules/module-client-node/remote-node.c +++ b/src/modules/module-client-node/remote-node.c @@ -247,6 +247,7 @@ static int client_node_transport(void *_data, int readfd, int writefd, uint32_t mem_id, uint32_t offset, uint32_t size) { struct node_data *data = _data; + struct pw_impl_node *node = data->node; struct pw_proxy *proxy = (struct pw_proxy*)data->client_node; clean_transport(data); @@ -258,18 +259,18 @@ static int client_node_transport(void *_data, return -errno; } - data->node->rt.activation = data->activation->ptr; + node->rt.activation = data->activation->ptr; pw_log_debug("remote-node %p: fds:%d %d node:%u activation:%p", proxy, readfd, writefd, data->remote_id, data->activation->ptr); data->rtwritefd = writefd; - spa_system_close(data->data_system, data->node->source.fd); - data->node->source.fd = readfd; + spa_system_close(data->data_system, node->source.fd); + node->source.fd = readfd; data->have_transport = true; - if (data->node->active) + if (node->active) pw_client_node_set_active(data->client_node, true); return 0; diff --git a/src/pipewire/impl-node.c b/src/pipewire/impl-node.c index 18e64a452..41e2d5dcb 100644 --- a/src/pipewire/impl-node.c +++ b/src/pipewire/impl-node.c @@ -139,7 +139,8 @@ static void remove_node(struct pw_impl_node *this) } static int -do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) +do_node_add(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) { struct pw_impl_node *this = user_data; struct pw_impl_node *driver = this->driver_node; @@ -155,18 +156,21 @@ do_node_add(struct spa_loop *loop, bool async, uint32_t seq, const void *data, s pw_log_warn("%p: read failed %m", this); this->added = true; - spa_loop_add_source(loop, &this->source); + if (!this->remote) + spa_loop_add_source(loop, &this->source); add_node(this, driver); } return 0; } static int -do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, const void *data, size_t size, void *user_data) +do_node_remove(struct spa_loop *loop, bool async, uint32_t seq, + const void *data, size_t size, void *user_data) { struct pw_impl_node *this = user_data; if (this->added) { - spa_loop_remove_source(loop, &this->source); + if (!this->remote) + spa_loop_remove_source(loop, &this->source); remove_node(this); this->added = false; } @@ -1120,7 +1124,7 @@ static inline int node_signal_func(void *data) if (SPA_UNLIKELY(spa_system_eventfd_write(data_system, this->source.fd, 1) < 0)) pw_log_warn("node %p: write failed %m", this); - return 0; + return SPA_STATUS_OK; } static inline void calculate_stats(struct pw_impl_node *this, struct pw_node_activation *a)