diff --git a/src/extensions/client-node.h b/src/extensions/client-node.h index 97f482aaa..3bcda7c4c 100644 --- a/src/extensions/client-node.h +++ b/src/extensions/client-node.h @@ -44,15 +44,6 @@ struct pw_client_node_buffer { struct spa_buffer *buffer; /**< buffer describing metadata and buffer memory */ }; -#define PW_TYPE_CLIENT_NODE_IO__Position SPA_TYPE_IO_BASE "ClientNode:Position" - -struct pw_client_node_position { -#define PW_VERSION_CLIENT_NODE_POSITION 0 - uint32_t version; - uint32_t seq1, seq2; /**< valid contents when equal */ - /* position info follows */ -}; - #define PW_CLIENT_NODE_PROXY_METHOD_DONE 0 #define PW_CLIENT_NODE_PROXY_METHOD_UPDATE 1 #define PW_CLIENT_NODE_PROXY_METHOD_PORT_UPDATE 2 diff --git a/src/modules/module-client-node/client-node.c b/src/modules/module-client-node/client-node.c index 20a5ba8f8..13c22b8f6 100644 --- a/src/modules/module-client-node/client-node.c +++ b/src/modules/module-client-node/client-node.c @@ -166,7 +166,7 @@ struct impl { int fds[2]; int other_fds[2]; - struct pw_client_node_position *position; + struct spa_io_position *position; uint64_t start; }; @@ -975,19 +975,18 @@ static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); struct impl *impl = this->impl; - struct pw_driver_quantum *q, *rq; + struct spa_io_position *q, *rq; uint64_t cmd = 1; spa_log_trace(this->log, "%p: send process %p", this, impl->this.node->driver_node); - q = impl->this.node->driver_node->rt.quantum; - rq = SPA_MEMBER(impl->position, sizeof(struct pw_client_node_position), - struct pw_driver_quantum); + q = impl->this.node->driver_node->rt.position; + rq = impl->position; if (impl->start == -1) - impl->start = q->position; + impl->start = q->clock.position; *rq = *q; - rq->position -= impl->start; + rq->clock.position -= impl->start; if (write(this->writefd, &cmd, 8) != 8) spa_log_warn(this->log, "node %p: error %s", this, strerror(errno)); @@ -1279,7 +1278,7 @@ static void node_initialized(void *data) pw_log_debug("client-node %p: transport fd %d %d", node, impl->fds[0], impl->fds[1]); area_size = sizeof(struct spa_io_buffers) * MAX_AREAS; - size = area_size + sizeof(struct pw_client_node_position) + sizeof(struct pw_driver_quantum); + size = area_size + sizeof(struct spa_io_position); if (pw_memblock_alloc(PW_MEMBLOCK_FLAG_WITH_FD | PW_MEMBLOCK_FLAG_MAP_READWRITE | @@ -1289,16 +1288,16 @@ static void node_initialized(void *data) return; impl->position = SPA_MEMBER(impl->io_areas->ptr, - area_size, struct pw_client_node_position); + area_size, struct spa_io_position); m = ensure_mem(impl, impl->io_areas->fd, SPA_DATA_MemFd, impl->io_areas->flags); pw_log_debug("client-node %p: io areas %p", node, impl->io_areas->ptr); pw_client_node_resource_set_io(this->resource, - PW_IO_ClientNodePosition, + SPA_IO_Position, m->id, area_size, - sizeof(struct pw_client_node_position)); + sizeof(struct spa_io_position)); if ((global = pw_node_get_global(node)) != NULL) pw_client_node_registered(this, pw_global_get_id(global)); diff --git a/src/modules/module-client-node/client-stream.c b/src/modules/module-client-node/client-stream.c index 86e1c4ff8..a671bf0b9 100644 --- a/src/modules/module-client-node/client-stream.c +++ b/src/modules/module-client-node/client-stream.c @@ -847,7 +847,7 @@ static int impl_node_process(struct spa_node *node) { struct node *this = SPA_CONTAINER_OF(node, struct node, node); struct impl *impl = this->impl; - struct pw_driver_quantum *q = impl->this.node->driver_node->rt.quantum; + struct spa_io_position *q = impl->this.node->driver_node->rt.position; int status, trigger; if (!impl->active) diff --git a/src/pipewire/node.c b/src/pipewire/node.c index 045ffa15b..148e5229a 100644 --- a/src/pipewire/node.c +++ b/src/pipewire/node.c @@ -41,6 +41,7 @@ #include #define DEFAULT_QUANTUM 1024 +#define MIN_QUANTUM 64 /** \cond */ struct impl { @@ -60,7 +61,7 @@ struct impl { struct pw_node_activation root_activation; struct pw_node_activation node_activation; - struct pw_driver_quantum quantum; + struct spa_io_position position; uint32_t next_position; }; @@ -521,11 +522,13 @@ static int recalc_quantum(struct pw_node *driver) struct pw_node *n; spa_list_for_each(n, &driver->driver_list, driver_link) { - if (n->rt.quantum_size > 0 && n->rt.quantum_size < quantum) - quantum = n->rt.quantum_size; + if (n->quantum_size > 0 && n->quantum_size < quantum) + quantum = n->quantum_size; + } + if (driver->rt.position) { + driver->rt.position->size = SPA_MAX(quantum, MIN_QUANTUM); + pw_log_info("node %p: driver quantum %d", driver, driver->rt.position->size); } - driver->rt.quantum->size = quantum; - pw_log_info("node %p: driver quantum %d", driver, driver->rt.quantum->size); return 0; } @@ -599,11 +602,11 @@ static void check_properties(struct pw_node *node) uint32_t num, denom; pw_log_info("node %p: latency '%s'", node, str); if (sscanf(str, "%u/%u", &num, &denom) == 2 && denom != 0) { - node->rt.quantum_size = flp2((num * 48000 / denom)); - pw_log_info("node %p: quantum %d", node, node->rt.quantum_size); + node->quantum_size = flp2((num * 48000 / denom)); + pw_log_info("node %p: quantum %d", node, node->quantum_size); } } else - node->rt.quantum_size = DEFAULT_QUANTUM; + node->quantum_size = DEFAULT_QUANTUM; pw_log_debug("node %p: graph %p driver:%d", node, &impl->driver_graph, node->driver); @@ -677,9 +680,8 @@ struct pw_node *pw_node_new(struct pw_core *core, spa_graph_node_init(&this->rt.node, &impl->node_activation.state); spa_graph_node_add(&impl->graph, &this->rt.node); - impl->quantum.rate = SPA_FRACTION(1, 48000); - impl->quantum.size = DEFAULT_QUANTUM; - this->rt.quantum = &impl->quantum; + impl->position.clock.rate = SPA_FRACTION(1, 48000); + impl->position.size = DEFAULT_QUANTUM; check_properties(this); @@ -776,37 +778,38 @@ static void node_process(void *data, int status) { struct pw_node *node = data, *driver; struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + bool is_driving = false; driver = node->driver_node; pw_log_trace("node %p: process driver:%d exported:%d %p", node, node->driver, node->exported, driver->rt.driver); - if (node->driver && driver == node && - (driver->rt.driver->state->pending == 0 || !node->remote)) { + if (node->driver) { + if (driver == node) + is_driving = true; + else if (node->rt.position && driver->rt.position) + *node->rt.position = *driver->rt.position; + } + + if (is_driving && (driver->rt.driver->state->pending == 0 || !node->remote)) { struct timespec ts; - struct pw_driver_quantum *q = node->rt.quantum; + struct spa_io_position *q = node->rt.position; if (driver->rt.driver->state->pending != 0) { pw_log_warn("node %p: graph not finished", node); } - if (node->rt.clock) { - q->nsec = node->rt.clock->nsec; - q->rate = node->rt.clock->rate; - q->position = node->rt.clock->position; - q->delay = node->rt.clock->delay; - } - else { + if (!node->rt.clock) { clock_gettime(CLOCK_MONOTONIC, &ts); - q->nsec = SPA_TIMESPEC_TO_TIME(&ts); - q->position = impl->next_position; - q->delay = 0; + q->clock.nsec = SPA_TIMESPEC_TO_TIME(&ts); + q->clock.position = impl->next_position; + q->clock.delay = 0; } pw_log_trace("node %p: run %"PRIu64" %d/%d %"PRIu64" %"PRIi64" %d", node, - q->nsec, q->rate.num, q->rate.denom, - q->position, q->delay, q->size); + q->clock.nsec, q->clock.rate.num, q->clock.rate.denom, + q->clock.position, q->clock.delay, q->size); spa_graph_run(driver->rt.driver); @@ -842,10 +845,25 @@ static const struct spa_node_callbacks node_callbacks = { void pw_node_set_implementation(struct pw_node *node, struct spa_node *spa_node) { + struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this); + node->node = spa_node; spa_node_set_callbacks(node->node, &node_callbacks, node); spa_graph_node_set_callbacks(&node->rt.node, &spa_graph_node_impl_default, spa_node); + if (spa_node_set_io(node->node, + SPA_IO_Position, + &impl->position, sizeof(struct spa_io_position)) >= 0) { + pw_log_debug("node %p: set position %p", node, &impl->position); + node->rt.position = &impl->position; + } + if (spa_node_set_io(node->node, + SPA_IO_Clock, + &impl->position.clock, sizeof(struct spa_io_position)) >= 0) { + pw_log_debug("node %p: set clock %p", node, &impl->position.clock); + node->rt.clock = &impl->position.clock; + } + if (spa_node->info) pw_node_update_properties(node, spa_node->info); } diff --git a/src/pipewire/port.c b/src/pipewire/port.c index c7df0f083..94ee32adb 100644 --- a/src/pipewire/port.c +++ b/src/pipewire/port.c @@ -579,14 +579,6 @@ int pw_port_add(struct pw_port *port, struct pw_node *node) } } - if (spa_node_port_set_io(node->node, - port->direction, port_id, - SPA_IO_Clock, - &port->rt.clock, sizeof(port->rt.clock)) >= 0) { - node->rt.clock = &port->rt.clock; - pw_log_debug("port %p: set node clock %p", port, node->rt.clock); - } - if (node->global) pw_port_register(port, node->global->owner, node->global, pw_properties_copy(port->properties)); diff --git a/src/pipewire/private.h b/src/pipewire/private.h index 74f52dce0..602d187f3 100644 --- a/src/pipewire/private.h +++ b/src/pipewire/private.h @@ -255,14 +255,6 @@ struct pw_module { void *user_data; /**< module user_data */ }; -struct pw_driver_quantum { - uint64_t nsec; /**< time in nanoseconds */ - struct spa_fraction rate; /**< rate */ - uint64_t position; /**< current position expressed in rate */ - uint64_t delay; /**< delay to hardware */ - uint32_t size; /**< size of one period expressed in rate */ -}; - struct pw_node_activation { #define NOT_TRIGGERED 0 #define TRIGGERED 1 @@ -341,16 +333,16 @@ struct pw_node { struct pw_loop *data_loop; /**< the data loop for this node */ + uint32_t quantum_size; /**< desired quantum */ struct { struct spa_io_clock *clock; /**< io area of the clock or NULL */ - struct pw_driver_quantum *quantum; + struct spa_io_position *position; struct spa_graph *driver; struct spa_graph_node root; struct pw_node_activation *activation; struct spa_graph_node node; struct spa_graph_node subnode; struct spa_graph_link sublink; - uint32_t quantum_size; } rt; void *user_data; /**< extra user data */ diff --git a/src/pipewire/remote.c b/src/pipewire/remote.c index aca193f86..20783fc12 100644 --- a/src/pipewire/remote.c +++ b/src/pipewire/remote.c @@ -103,7 +103,7 @@ struct node_data { struct spa_hook node_proxy_listener; struct spa_hook proxy_listener; - struct pw_client_node_position *position; + struct spa_io_position *position; struct spa_graph_node_callbacks callbacks; void *callbacks_data; @@ -868,11 +868,7 @@ client_node_set_io(void *object, clear_mem(data, m); } data->position = ptr; - if (ptr) - data->node->rt.quantum = SPA_MEMBER(ptr, - sizeof(struct pw_client_node_position), void); - else - data->node->rt.quantum = NULL; + data->node->rt.position = ptr; } else { pw_log_warn("unknown io id %u", id); diff --git a/src/pipewire/stream.c b/src/pipewire/stream.c index 9cfbac7d3..124c6f7b0 100644 --- a/src/pipewire/stream.c +++ b/src/pipewire/stream.c @@ -245,6 +245,11 @@ static void call_process(struct stream *impl) } } +static int impl_set_io(struct spa_node *node, uint32_t id, void *data, size_t size) +{ + return 0; +} + static int impl_send_command(struct spa_node *node, const struct spa_command *command) { struct stream *impl = SPA_CONTAINER_OF(node, struct stream, impl_node); @@ -646,14 +651,14 @@ static int process_notify(struct stream *impl, struct spa_pod_sequence *sequence return 0; } -static inline void copy_quantum(struct stream *impl, int64_t queued) +static inline void copy_position(struct stream *impl, int64_t queued) { - struct pw_driver_quantum *q = impl->node->rt.quantum; + struct spa_io_position *p = impl->node->rt.position; __atomic_add_fetch(&impl->seq, 1, __ATOMIC_SEQ_CST); - impl->time.now = q->nsec; - impl->time.rate = q->rate; - impl->time.ticks = q->position; - impl->time.delay = q->delay; + impl->time.now = p->clock.nsec; + impl->time.rate = p->clock.rate; + impl->time.ticks = p->clock.position; + impl->time.delay = p->clock.delay; impl->time.queued = queued; __atomic_add_fetch(&impl->seq, 1, __ATOMIC_SEQ_CST); @@ -689,7 +694,7 @@ static int impl_node_process_input(struct spa_node *node) call_process(impl); done: - copy_quantum(impl, impl->dequeued.incount); + copy_position(impl, impl->dequeued.incount); /* pop buffer to recycle */ if ((b = pop_queue(impl, &impl->queued))) { @@ -741,7 +746,7 @@ static int impl_node_process_output(struct spa_node *node) io->status == SPA_STATUS_NEED_BUFFER) goto again; } - copy_quantum(impl, impl->queued.outcount); + copy_position(impl, impl->queued.outcount); res = io->status; pw_log_trace("stream %p: res %d", stream, res); @@ -751,6 +756,7 @@ static int impl_node_process_output(struct spa_node *node) static const struct spa_node impl_node = { SPA_VERSION_NODE, + .set_io = impl_set_io, .send_command = impl_send_command, .set_callbacks = impl_set_callbacks, .get_n_ports = impl_get_n_ports,