diff --git a/src/pipewire/filter.c b/src/pipewire/filter.c index eb7593e4f..8281955bd 100644 --- a/src/pipewire/filter.c +++ b/src/pipewire/filter.c @@ -117,6 +117,8 @@ struct port { struct queue dequeued; struct queue queued; + struct spa_latency_info latency[2]; + /* from here is what the caller gets as user_data */ uint8_t user_data[0]; }; @@ -300,6 +302,8 @@ static struct port *alloc_port(struct filter *filter, p->filter = filter; p->direction = direction; p->id = i; + p->latency[SPA_DIRECTION_INPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_INPUT); + p->latency[SPA_DIRECTION_OUTPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT); spa_list_init(&p->param_list); spa_ringbuffer_init(&p->dequeued.ring); @@ -596,6 +600,7 @@ static int update_params(struct filter *impl, struct port *port, uint32_t id, { uint32_t i; int res = 0; + bool update_latency = false; if (id != SPA_ID_INVALID) { clear_params(impl, port, id); @@ -607,11 +612,39 @@ static int update_params(struct filter *impl, struct port *port, uint32_t id, } } for (i = 0; i < n_params; i++) { + if (params[i] == NULL) + continue; + + if (port != NULL && + spa_pod_is_object(params[i]) && + SPA_POD_OBJECT_ID(params[i]) == SPA_PARAM_Latency) { + struct spa_latency_info info; + if (spa_latency_parse(params[i], &info) >= 0) { + port->latency[info.direction] = info; + pw_log_debug("port %p: set %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, port, + info.direction == SPA_DIRECTION_INPUT ? "input" : "output", + info.min_quantum, info.max_quantum, + info.min_rate, info.max_rate, + info.min_ns, info.max_ns); + update_latency = true; + } + continue; + } if (add_param(impl, port, id, 0, params[i]) == NULL) { res = -errno; break; } } + if (port != NULL && update_latency) { + uint8_t buffer[4096]; + struct spa_pod_builder b; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + add_param(impl, port, SPA_PARAM_Latency, 0, + spa_latency_build(&b, SPA_PARAM_Latency, &port->latency[0])); + add_param(impl, port, SPA_PARAM_Latency, 0, + spa_latency_build(&b, SPA_PARAM_Latency, &port->latency[1])); + } return res; } @@ -685,6 +718,62 @@ static void clear_buffers(struct port *port) clear_queue(port, &port->queued); } + +static int default_latency(struct filter *impl, struct port *port, enum spa_direction direction) +{ + struct pw_filter *filter = &impl->this; + struct spa_latency_info info; + struct port *p; + + info = SPA_LATENCY_INFO(direction); + + spa_list_for_each(p, &impl->port_list, link) { + if (p->direction == direction) + continue; + spa_latency_info_combine(&info, &p->latency[direction]); + } + spa_list_for_each(p, &impl->port_list, link) { + uint8_t buffer[4096]; + struct spa_pod_builder b; + const struct spa_pod *params[1]; + + if (p->direction != direction) + continue; + + spa_pod_builder_init(&b, buffer, sizeof(buffer)); + params[0] = spa_latency_build(&b, SPA_PARAM_Latency, &info); + pw_filter_update_params(filter, p->user_data, params, 1); + } + return 0; +} + +static int handle_latency(struct filter *impl, struct port *port, const struct spa_pod *param) +{ + struct pw_filter *filter = &impl->this; + struct spa_latency_info info; + int res; + + if ((res = spa_latency_parse(param, &info)) < 0) + return res; + + pw_log_info("port %p: set %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, port, + info.direction == SPA_DIRECTION_INPUT ? "input" : "output", + info.min_quantum, info.max_quantum, + info.min_rate, info.max_rate, + info.min_ns, info.max_ns); + + if (info.direction == port->direction) + return 0; + + if (SPA_FLAG_IS_SET(impl->flags, PW_FILTER_FLAG_CUSTOM_LATENCY)) { + pw_filter_emit_param_changed(filter, port->user_data, + SPA_PARAM_Latency, param); + } else { + default_latency(impl, port, info.direction); + } + return 0; +} + static int impl_port_set_param(void *object, enum spa_direction direction, uint32_t port_id, uint32_t id, uint32_t flags, @@ -694,6 +783,9 @@ static int impl_port_set_param(void *object, struct pw_filter *filter = &impl->this; struct port *port; int res; + bool emit = true; + const struct spa_pod *params[1]; + uint32_t n_params = 0; if (impl->disconnecting && param != NULL) return -EIO; @@ -706,13 +798,24 @@ static int impl_port_set_param(void *object, if (param) pw_log_pod(SPA_LOG_LEVEL_DEBUG, param); - if ((res = update_params(impl, port, id, ¶m, param ? 1 : 0)) < 0) + params[0] = param; + n_params = param ? 1 : 0; + + if ((res = update_params(impl, port, id, params, n_params)) < 0) return res; - if (id == SPA_PARAM_Format) + switch (id) { + case SPA_PARAM_Format: clear_buffers(port); + break; + case SPA_PARAM_Latency: + handle_latency(impl, port, param); + emit = false; + break; + } - pw_filter_emit_param_changed(filter, port->user_data, id, param); + if (emit) + pw_filter_emit_param_changed(filter, port->user_data, id, param); if (filter->state == PW_FILTER_STATE_ERROR) return -EIO; diff --git a/src/pipewire/filter.h b/src/pipewire/filter.h index 81d717d90..59bbc79c7 100644 --- a/src/pipewire/filter.h +++ b/src/pipewire/filter.h @@ -117,6 +117,9 @@ enum pw_filter_flags { PW_FILTER_FLAG_DRIVER = (1 << 1), /**< be a driver */ PW_FILTER_FLAG_RT_PROCESS = (1 << 2), /**< call process from the realtime * thread */ + PW_FILTER_FLAG_CUSTOM_LATENCY = (1 << 3), /**< don't call the default latency algorithm + * but emit the param_changed event for the + * ports when Latency params are received. */ }; enum pw_filter_port_flags {