mirror of
https://gitlab.freedesktop.org/pipewire/pipewire.git
synced 2025-10-29 05:40:27 -04:00
stream: handle Latency params better
LOCK the Latency param we get from the peer so that we don't remove it when we update our own port latency. Also don't remove our port latency when we get an update from the peer. This essentially keeps the update/clear of the upstream and downstrem latencies separate and makes it easier to implement the latency logic in the pw-stream. When a filter receives a Latency event on a port, it can simply update the other port latency and none of the peer latencies are removed.
This commit is contained in:
parent
2393be4543
commit
92243038c1
1 changed files with 49 additions and 25 deletions
|
|
@ -228,7 +228,7 @@ static int add_param(struct stream *impl,
|
|||
uint32_t id, uint32_t flags, const struct spa_pod *param)
|
||||
{
|
||||
struct param *p;
|
||||
int idx;
|
||||
int idx, res;
|
||||
|
||||
if (param != NULL && !spa_pod_is_object(param))
|
||||
return -EINVAL;
|
||||
|
|
@ -240,6 +240,21 @@ static int add_param(struct stream *impl,
|
|||
if (id == SPA_ID_INVALID)
|
||||
id = SPA_POD_OBJECT_ID(param);
|
||||
|
||||
switch (id) {
|
||||
case SPA_PARAM_Latency:
|
||||
struct spa_latency_info info;
|
||||
if ((res = spa_latency_parse(param, &info)) < 0)
|
||||
return res;
|
||||
if (impl->this.node_id != SPA_ID_INVALID &&
|
||||
info.direction != impl->direction &&
|
||||
!SPA_FLAG_IS_SET(flags, PARAM_FLAG_LOCKED)) {
|
||||
pw_log_warn("not adding locked Latency param %s %s",
|
||||
pw_direction_as_string(info.direction),
|
||||
pw_direction_as_string(impl->direction));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
p = malloc(sizeof(struct param) + SPA_POD_SIZE(param));
|
||||
if (p == NULL)
|
||||
return -errno;
|
||||
|
|
@ -273,7 +288,7 @@ static int add_param(struct stream *impl,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void clear_params(struct stream *impl, uint32_t id)
|
||||
static void clear_params(struct stream *impl, uint32_t id, uint32_t flags)
|
||||
{
|
||||
struct param *p, *t;
|
||||
bool found = false;
|
||||
|
|
@ -281,7 +296,7 @@ static void clear_params(struct stream *impl, uint32_t id)
|
|||
|
||||
spa_list_for_each_safe(p, t, &impl->param_list, link) {
|
||||
if (id == SPA_ID_INVALID ||
|
||||
(p->id == id && !(p->flags & PARAM_FLAG_LOCKED))) {
|
||||
(p->id == id && (p->flags & PARAM_FLAG_LOCKED) == flags)) {
|
||||
found = true;
|
||||
spa_list_remove(&p->link);
|
||||
free(p);
|
||||
|
|
@ -321,12 +336,12 @@ static int update_params(struct stream *impl, uint32_t id, uint32_t flags,
|
|||
int res = 0;
|
||||
|
||||
if (id != SPA_ID_INVALID) {
|
||||
clear_params(impl, id);
|
||||
clear_params(impl, id, flags);
|
||||
} else {
|
||||
for (i = 0; i < n_params; i++) {
|
||||
if (params[i] == NULL || !spa_pod_is_object(params[i]))
|
||||
continue;
|
||||
clear_params(impl, SPA_POD_OBJECT_ID(params[i]));
|
||||
clear_params(impl, SPA_POD_OBJECT_ID(params[i]), flags);
|
||||
}
|
||||
}
|
||||
for (i = 0; i < n_params; i++) {
|
||||
|
|
@ -336,7 +351,6 @@ static int update_params(struct stream *impl, uint32_t id, uint32_t flags,
|
|||
return res;
|
||||
}
|
||||
|
||||
|
||||
static inline int queue_push(struct stream *stream, struct queue *queue, struct buffer *buffer)
|
||||
{
|
||||
uint32_t index;
|
||||
|
|
@ -863,20 +877,20 @@ static void clear_buffers(struct pw_stream *stream)
|
|||
clear_queue(impl, &impl->queued);
|
||||
}
|
||||
|
||||
static int parse_latency(struct pw_stream *stream, const struct spa_pod *param)
|
||||
static int parse_latency(struct pw_stream *stream, const struct spa_pod *param, uint32_t *flags)
|
||||
{
|
||||
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
|
||||
struct spa_latency_info info;
|
||||
int res;
|
||||
struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
|
||||
struct spa_latency_info info;
|
||||
int res;
|
||||
|
||||
if (param == NULL)
|
||||
return 0;
|
||||
|
||||
if ((res = spa_latency_parse(param, &info)) < 0)
|
||||
if (param == NULL)
|
||||
info = SPA_LATENCY_INFO(SPA_DIRECTION_REVERSE(impl->direction));
|
||||
else if ((res = spa_latency_parse(param, &info)) < 0)
|
||||
return res;
|
||||
|
||||
pw_log_info("stream %p: set %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, stream,
|
||||
pw_log_info("stream %p: set %s/%s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, stream,
|
||||
info.direction == SPA_DIRECTION_INPUT ? "input" : "output",
|
||||
impl->direction == SPA_DIRECTION_INPUT ? "input" : "output",
|
||||
info.min_quantum, info.max_quantum,
|
||||
info.min_rate, info.max_rate,
|
||||
info.min_ns, info.max_ns);
|
||||
|
|
@ -885,6 +899,7 @@ static int parse_latency(struct pw_stream *stream, const struct spa_pod *param)
|
|||
return 0;
|
||||
|
||||
impl->latency = info;
|
||||
*flags = PARAM_FLAG_LOCKED;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -895,8 +910,10 @@ static int impl_port_set_param(void *object,
|
|||
{
|
||||
struct stream *impl = object;
|
||||
struct pw_stream *stream = &impl->this;
|
||||
uint32_t user;
|
||||
uint32_t user, fl = 0;
|
||||
int res;
|
||||
const struct spa_pod *params[1];
|
||||
uint32_t n_params = 0;
|
||||
|
||||
pw_log_debug("%p: port:%d.%d id:%d (%s) param:%p disconnecting:%d", impl,
|
||||
direction, port_id, id,
|
||||
|
|
@ -909,7 +926,16 @@ static int impl_port_set_param(void *object,
|
|||
if (param)
|
||||
pw_log_pod(SPA_LOG_LEVEL_DEBUG, param);
|
||||
|
||||
if ((res = update_params(impl, id, 0, ¶m, param ? 1 : 0)) < 0)
|
||||
params[0] = param;
|
||||
n_params = param ? 1 : 0;
|
||||
|
||||
switch (id) {
|
||||
case SPA_PARAM_Latency:
|
||||
parse_latency(stream, param, &fl);
|
||||
break;
|
||||
}
|
||||
|
||||
if ((res = update_params(impl, id, fl, params, n_params)) < 0)
|
||||
return res;
|
||||
|
||||
switch (id) {
|
||||
|
|
@ -917,9 +943,6 @@ static int impl_port_set_param(void *object,
|
|||
clear_buffers(stream);
|
||||
user = impl->params[NODE_Format].user;
|
||||
break;
|
||||
case SPA_PARAM_Latency:
|
||||
parse_latency(stream, param);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
@ -1712,7 +1735,7 @@ void pw_stream_destroy(struct pw_stream *stream)
|
|||
stream->core = NULL;
|
||||
}
|
||||
|
||||
clear_params(impl, SPA_ID_INVALID);
|
||||
clear_params(impl, SPA_ID_INVALID, 0);
|
||||
|
||||
pw_log_debug("%p: free", stream);
|
||||
free(stream->error);
|
||||
|
|
@ -1990,7 +2013,7 @@ pw_stream_connect(struct pw_stream *stream,
|
|||
impl->port_info.params = impl->port_params;
|
||||
impl->port_info.n_params = N_PORT_PARAMS;
|
||||
|
||||
clear_params(impl, SPA_ID_INVALID);
|
||||
clear_params(impl, SPA_ID_INVALID, 0);
|
||||
for (i = 0; i < n_params; i++)
|
||||
add_param(impl, SPA_ID_INVALID, 0, params[i]);
|
||||
|
||||
|
|
@ -2390,6 +2413,7 @@ int pw_stream_get_time_n(struct pw_stream *stream, struct pw_time *time, size_t
|
|||
uintptr_t seq1, seq2;
|
||||
uint32_t buffered, quantum, index, rate_size;
|
||||
int32_t avail_buffers;
|
||||
struct spa_latency_info *latency = &impl->latency;
|
||||
|
||||
do {
|
||||
seq1 = SPA_SEQ_READ(impl->seq);
|
||||
|
|
@ -2405,9 +2429,9 @@ int pw_stream_get_time_n(struct pw_stream *stream, struct pw_time *time, size_t
|
|||
else
|
||||
time->queued = (int64_t)(impl->queued.incount - time->queued);
|
||||
|
||||
time->delay += (int64_t)(((impl->latency.min_quantum + impl->latency.max_quantum) / 2.0f) * quantum);
|
||||
time->delay += (impl->latency.min_rate + impl->latency.max_rate) / 2;
|
||||
time->delay += ((impl->latency.min_ns + impl->latency.max_ns) / 2) *
|
||||
time->delay += (int64_t)(((latency->min_quantum + latency->max_quantum) / 2.0f) * quantum);
|
||||
time->delay += (latency->min_rate + latency->max_rate) / 2;
|
||||
time->delay += ((latency->min_ns + latency->max_ns) / 2) *
|
||||
(int64_t)time->rate.denom / (int64_t)SPA_NSEC_PER_SEC;
|
||||
|
||||
avail_buffers = spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue