diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 67ebb7c0b..43bf94b82 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -44,6 +44,16 @@ struct pw_client_node_buffer { struct spa_buffer *buffer; /**< buffer describing metadata and buffer memory */ }; +struct pw_client_node_position { +#define PW_VERSION_CLIENT_NODE_POSITION 0 + uint32_t version; + uint32_t seq1, seq2; /**< valid contents when equal */ + uint64_t nsec; /**< time in nanoseconds */ + struct spa_fraction rate; /**< rate of the clock */ + uint32_t position; /**< current position expressed in rate */ + uint32_t duration; /**< duration of cycle expressed in rate */ +}; + #define PW_CLIENT_NODE_PROXY_METHOD_DONE 0 #define PW_CLIENT_NODE_PROXY_METHOD_UPDATE 1 #define PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE 2 @@ -183,7 +193,8 @@ pw_client_node_proxy_destroy(struct pw_client_node_proxy *p) #define PW_CLIENT_NODE_PROXY_EVENT_PORT_USE_BUFFERS 8 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_COMMAND 9 #define PW_CLIENT_NODE_PROXY_EVENT_PORT_SET_IO 10 -#define PW_CLIENT_NODE_PROXY_EVENT_NUM 11 +#define PW_CLIENT_NODE_PROXY_EVENT_SET_POSITION 11 +#define PW_CLIENT_NODE_PROXY_EVENT_NUM 12 /** \ref pw_client_node events */ struct pw_client_node_proxy_events { @@ -340,6 +351,11 @@ struct pw_client_node_proxy_events { uint32_t mem_id, uint32_t offset, uint32_t size); + + void (*set_position) (void *object, + uint32_t mem_id, + uint32_t offset, + uint32_t size); }; static inline void @@ -373,6 +389,8 @@ pw_client_node_proxy_add_listener(struct pw_client_node_proxy *p, pw_resource_notify(r,struct pw_client_node_proxy_events,port_command,__VA_ARGS__) #define pw_client_node_resource_port_set_io(r,...) \ pw_resource_notify(r,struct pw_client_node_proxy_events,port_set_io,__VA_ARGS__) +#define pw_client_node_resource_set_position(r,...) \ + pw_resource_notify(r,struct pw_client_node_proxy_events,set_position,__VA_ARGS__) #ifdef __cplusplus } /* extern "C" */ diff --git a/src/modules/module-audio-session.c b/src/modules/module-audio-session.c index 366f2bc66..1d92cbb76 100644 --- a/src/modules/module-audio-session.c +++ b/src/modules/module-audio-session.c @@ -298,6 +298,13 @@ static void reconfigure_session(struct session *sess) } sess->buffer_size = buffer_size; + + sess->node->rt.quantum->rate.num = 1; + sess->node->rt.quantum->rate.denom = sess->sample_rate; + sess->node->rt.quantum->size = buffer_size; + + pw_log_info("module %p: driver node:%p quantum:%d/%d", + impl, sess->node, sess->sample_rate, buffer_size); } static void node_info_destroy(void *data) diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 14ba1f723..7398c45a1 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -150,6 +150,8 @@ struct impl { int fds[2]; int other_fds[2]; + + struct pw_client_node_position *position; }; /** \endcond */ @@ -890,14 +892,21 @@ impl_node_port_send_command(struct spa_node *node, static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); + struct impl *impl = this->impl; + struct pw_driver_quantum *q; uint64_t cmd = 1; if (this->impl->this.status != SPA_ID_INVALID) { spa_log_trace(this->log, "%p: return %d", this, this->impl->this.status); - return this->impl->this.status; + return impl->this.status; } - spa_log_trace(this->log, "%p: send process", this); + spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node); + + q = impl->this.node->driver_node->rt.quantum; + + impl->position->duration = q->size; + impl->position->rate = q->rate; if (write(this->writefd, &cmd, 8) != 8) spa_log_warn(this->log, "node %p: error %s", this, strerror(errno)); @@ -1148,6 +1157,8 @@ static void node_initialized(void *data) struct pw_type *t = impl->t; struct pw_global *global; uint32_t node_id; + uint32_t area_size, size; + struct mem *m; if (this->resource == NULL) return; @@ -1162,14 +1173,20 @@ static void node_initialized(void *data) spa_loop_add_source(impl->node.data_loop, &impl->node.data_source); pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); + area_size = sizeof(struct spa_io_buffers) * MAX_AREAS; + size = area_size + sizeof(struct pw_client_node_position); + if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | PW_MEMBLOCK_FLAG_MAP_READWRITE | PW_MEMBLOCK_FLAG_SEAL, - sizeof(struct spa_io_buffers) * MAX_AREAS, + size, &impl->io_areas) < 0) return; - ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags); + impl->position = SPA_MEMBER(impl->io_areas->ptr, + area_size, struct pw_client_node_position); + + m = ensure_mem(impl, impl->io_areas->fd, t->data.MemFd, impl->io_areas->flags); pw_log_debug("client-node %p: io areas %p", node, impl->io_areas->ptr); if ((global = pw_node_get_global(node)) != NULL) @@ -1181,6 +1198,11 @@ static void node_initialized(void *data) node_id, impl->other_fds[0], impl->other_fds[1]); + + pw_client_node_resource_set_position(this->resource, + m->id, + area_size, + sizeof(struct pw_client_node_position)); } static void node_free(void *data) diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index 90c95d547..7d13ec60e 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -98,6 +98,7 @@ struct impl { struct pw_port_mix client_port_mix; struct spa_io_buffers *io; + struct spa_io_control_range ctrl; struct spa_buffer **buffers; uint32_t n_buffers; @@ -249,16 +250,29 @@ impl_node_add_port(struct spa_node *node, enum spa_direction direction, uint32_t { struct node *this; struct impl *impl; + struct pw_type *t; + int res; spa_return_val_if_fail(node != NULL, -EINVAL); this = SPA_CONTAINER_OF(node, struct node, node); impl = this->impl; + t = impl->t; if (direction != impl->direction) return -EINVAL; - return spa_node_add_port(impl->adapter, direction, port_id); + if ((res = spa_node_add_port(impl->adapter, direction, port_id)) < 0) + return res; + + if ((res = spa_node_port_set_io(impl->adapter, + direction, port_id, + t->io.ControlRange, + &impl->ctrl, + sizeof(&impl->ctrl))) < 0) + return res; + + return res; } static int @@ -670,10 +684,16 @@ static int impl_node_process(struct spa_node *node) struct impl *impl = this->impl; int status; - spa_log_trace(this->log, "%p: process", this); + impl->client_node->node->driver_node = impl->this.node->driver_node; + impl->ctrl.min_size = impl->ctrl.max_size = + impl->this.node->driver_node->rt.quantum->size; - if (impl->use_converter) + + spa_log_trace(this->log, "%p: process %d", this, impl->ctrl.max_size); + + if (impl->use_converter) { status = spa_node_process(impl->adapter); + } else { status = SPA_STATUS_HAVE_BUFFER | SPA_STATUS_NEED_BUFFER; spa_log_trace(this->log, "%p: process %d/%d %d/%d", this, @@ -841,6 +861,7 @@ static void client_node_initialized(void *data) impl->client_port_mix.io, sizeof(impl->client_port_mix.io))) < 0) return; + } pw_node_register(impl->this.node, NULL, NULL, NULL); diff --git a/src/modules/module-client-node/protocol-native.c b/src/modules/module-client-node/protocol-native.c index 3929bc371..a2c3d2ebc 100644 --- a/src/modules/module-client-node/protocol-native.c +++ b/src/modules/module-client-node/protocol-native.c @@ -432,6 +432,25 @@ static int client_node_demarshal_port_set_io(void *object, void *data, size_t si return 0; } +static int client_node_demarshal_set_position(void *object, void *data, size_t size) +{ + struct pw_proxy *proxy = object; + struct spa_pod_parser prs; + uint32_t memid, off, sz; + + spa_pod_parser_init(&prs, data, size, 0); + if (spa_pod_parser_get(&prs, + "[" + "i", &memid, + "i", &off, + "i", &sz, NULL) < 0) + return -EINVAL; + + pw_proxy_notify(proxy, struct pw_client_node_proxy_events, set_position, + memid, off, sz); + return 0; +} + static void client_node_marshal_add_mem(void *object, uint32_t mem_id, @@ -671,6 +690,22 @@ client_node_marshal_port_set_io(void *object, pw_protocol_native_end_resource(resource, b); } +static void +client_node_marshal_set_position(void *object, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_resource *resource = object; + struct spa_pod_builder *b; + + b = pw_protocol_native_begin_resource(resource, PW_CLIENT_NODE_PROXY_EVENT_SET_POSITION); + spa_pod_builder_struct(b, + "i", memid, + "i", offset, + "i", size); + pw_protocol_native_end_resource(resource, b); +} static int client_node_demarshal_done(void *object, void *data, size_t size) { @@ -857,6 +892,7 @@ static const struct pw_client_node_proxy_events pw_protocol_native_client_node_e &client_node_marshal_port_use_buffers, &client_node_marshal_port_command, &client_node_marshal_port_set_io, + &client_node_marshal_set_position, }; static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_event_demarshal[] = { @@ -871,6 +907,7 @@ static const struct pw_protocol_native_demarshal pw_protocol_native_client_node_ { &client_node_demarshal_port_use_buffers, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_command, PW_PROTOCOL_NATIVE_REMAP }, { &client_node_demarshal_port_set_io, PW_PROTOCOL_NATIVE_REMAP }, + { &client_node_demarshal_set_position, PW_PROTOCOL_NATIVE_REMAP }, }; static const struct pw_protocol_marshal pw_protocol_native_client_node_marshal = { diff --git a/src/pipewire/link.c b/src/pipewire/link.c index 45e72c85f..27cc87b20 100644 --- a/src/pipewire/link.c +++ b/src/pipewire/link.c @@ -1194,14 +1194,16 @@ do_join_graphs(struct spa_loop *loop, out_driver = SPA_FLAG_CHECK(out_graph->flags, SPA_GRAPH_FLAG_DRIVER); if (out_driver) { - pw_log_debug("link %p: in_graph to out_graph", this); src = in_graph; dst = out_graph; + this->input->node->driver_node = this->output->node->driver_node; + pw_log_debug("link %p: in_graph to out_graph %p", this, this->output->node); } else { - pw_log_debug("link %p: out_graph to in_graph", this); src = out_graph; dst = in_graph; + this->output->node->driver_node = this->input->node->driver_node; + pw_log_debug("link %p: out_graph to in_graph %p", this, this->input->node); } move_graph(dst, src); } diff --git a/src/pipewire/node.c b/src/pipewire/node.c index df59333de..c500b3230 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -56,6 +56,8 @@ struct impl { struct pw_node_activation root_activation; struct pw_node_activation node_activation; + + struct pw_driver_quantum quantum; }; struct resource_data { @@ -421,6 +423,8 @@ static void check_properties(struct pw_node *node) else SPA_FLAG_UNSET(impl->driver_graph.flags, SPA_GRAPH_FLAG_DRIVER); + node->driver_node = node; + pw_log_debug("node %p: graph %p driver:%d", node, &impl->driver_graph, node->driver); } @@ -509,6 +513,8 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_graph_node_init(&this->rt.node, &impl->node_activation.state); spa_graph_node_add(&impl->graph, &this->rt.node); + this->rt.quantum = &impl->quantum; + check_properties(this); return this; diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 738da674a..6d813e949 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -223,6 +223,13 @@ struct pw_module { void *user_data; /**< module user_data */ }; +struct pw_driver_quantum { + uint64_t time; /**< time in nanoseconds */ + struct spa_fraction rate; /**< rate */ + uint32_t position; /**< current position expressed in rate */ + uint32_t size; /**< size of one period expressed in rate */ +}; + struct pw_node_activation { #define NOT_TRIGGERED 0 #define TRIGGERED 1 @@ -257,6 +264,8 @@ struct pw_node { bool master; /**< a master node is one of the driver nodes that * is selected to drive the graph */ + struct pw_node *driver_node; + struct spa_clock *clock; /**< handle to SPA clock if any */ struct spa_node *node; /**< SPA node implementation */ @@ -277,6 +286,7 @@ struct pw_node { struct pw_loop *data_loop; /**< the data loop for this node */ struct { + struct pw_driver_quantum *quantum; struct spa_graph *driver; struct spa_graph_node root; struct pw_node_activation *activation; diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index eea82cf3c..a0bc03609 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -104,6 +104,8 @@ struct node_data { struct pw_client_node_proxy *node_proxy; struct spa_hook node_proxy_listener; struct spa_hook proxy_listener; + + struct pw_client_node_position *position; }; /** \endcond */ @@ -525,7 +527,7 @@ on_rtsocket_condition(void *user_data, int fd, enum spa_io mask) if (read(fd, &cmd, sizeof(uint64_t)) != sizeof(uint64_t)) pw_log_warn("proxy %p: read failed %m", proxy); - pw_log_trace("remote %p: process", data->remote); + pw_log_trace("remote %p: process %d", data->remote, data->position->duration); spa_graph_run(node->graph->parent->graph); } } @@ -1196,6 +1198,37 @@ client_node_port_set_io(void *object, } } +static void +client_node_set_position(void *object, + uint32_t memid, + uint32_t offset, + uint32_t size) +{ + struct pw_proxy *proxy = object; + struct node_data *data = proxy->user_data; + struct mem *m; + void *ptr; + + if (memid == SPA_ID_INVALID) { + ptr = NULL; + size = 0; + } + else { + m = find_mem(data, memid); + if (m == NULL) { + pw_log_warn("unknown memory id %u", memid); + return; + } + ptr = mem_map(data, &m->map, m->fd, + PROT_READ|PROT_WRITE, offset, size); + if (ptr == NULL) + return; + m->ref++; + } + + pw_log_debug("node %p: set position %p", proxy, ptr); + data->position = ptr; +} static const struct pw_client_node_proxy_events client_node_events = { PW_VERSION_CLIENT_NODE_PROXY_EVENTS, @@ -1210,6 +1243,7 @@ static const struct pw_client_node_proxy_events client_node_events = { .port_use_buffers = client_node_port_use_buffers, .port_command = client_node_port_command, .port_set_io = client_node_port_set_io, + .set_position = client_node_set_position, }; static void do_node_init(struct pw_proxy *proxy)