diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c index 2eca2728a..e429ad7fc 100644 --- a/src/modules/module-combine-sink.c +++ b/src/modules/module-combine-sink.c @@ -116,6 +116,14 @@ struct output { /* For communication of the stream latencies to the main thread */ pa_usec_t total_latency; + struct { + pa_usec_t timestamp; + pa_usec_t sink_latency; + size_t output_memblockq_size; + uint64_t receive_counter; + } latency_snapshot; + + uint64_t receive_counter; /* For communication of the stream parameters to the sink thread */ pa_atomic_t max_request; @@ -159,21 +167,33 @@ struct userdata { bool in_null_mode; pa_smoother *smoother; uint64_t counter; + + uint64_t snapshot_counter; + pa_usec_t snapshot_time; + + pa_usec_t render_timestamp; } thread_info; }; +struct sink_snapshot { + pa_usec_t timestamp; + uint64_t send_counter; +}; + enum { SINK_MESSAGE_ADD_OUTPUT = PA_SINK_MESSAGE_MAX, SINK_MESSAGE_REMOVE_OUTPUT, SINK_MESSAGE_NEED, SINK_MESSAGE_UPDATE_LATENCY, SINK_MESSAGE_UPDATE_MAX_REQUEST, - SINK_MESSAGE_UPDATE_LATENCY_RANGE + SINK_MESSAGE_UPDATE_LATENCY_RANGE, + SINK_MESSAGE_GET_SNAPSHOT }; enum { SINK_INPUT_MESSAGE_POST = PA_SINK_INPUT_MESSAGE_MAX, - SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY + SINK_INPUT_MESSAGE_SET_REQUESTED_LATENCY, + SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT }; static void output_disable(struct output *o); @@ -183,10 +203,16 @@ static int output_create_sink_input(struct output *o); static void adjust_rates(struct userdata *u) { struct output *o; - pa_usec_t max_sink_latency = 0, min_total_latency = (pa_usec_t) -1, target_latency, avg_total_latency = 0; + struct sink_snapshot rdata; + pa_usec_t avg_total_latency = 0; + pa_usec_t target_latency = 0; + pa_usec_t max_sink_latency = 0; + pa_usec_t min_total_latency = (pa_usec_t)-1; uint32_t base_rate; uint32_t idx; unsigned n = 0; + pa_usec_t now; + struct output *o_max; pa_assert(u); pa_sink_assert_ref(u->sink); @@ -194,42 +220,82 @@ static void adjust_rates(struct userdata *u) { if (pa_idxset_size(u->outputs) <= 0) return; - if (!PA_SINK_IS_OPENED(u->sink->state)) + if (u->sink->state != PA_SINK_RUNNING) + return; + + /* Get sink snapshot */ + pa_asyncmsgq_send(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_GET_SNAPSHOT, &rdata, 0, NULL); + + /* The sink snapshot time is the time when the last data was rendered. + * Latency is calculated for that point in time. */ + now = rdata.timestamp; + + /* Sink snapshot is not yet valid. */ + if (!now) return; PA_IDXSET_FOREACH(o, u->outputs, idx) { - pa_usec_t sink_latency; + pa_usec_t snapshot_latency; + int64_t time_difference; if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state)) continue; - o->total_latency = pa_sink_input_get_latency(o->sink_input, &sink_latency); - o->total_latency += sink_latency; + /* The difference may become negative, because it is probable, that the last + * render time was before the sink input snapshot. In this case, the sink + * had some more latency at the render time, so subtracting the value still + * gives the right result. */ + time_difference = (int64_t)now - (int64_t)o->latency_snapshot.timestamp; - if (sink_latency > max_sink_latency) - max_sink_latency = sink_latency; + /* Latency at sink snapshot time is sink input snapshot latency minus time + * passed between the two snapshots. */ + snapshot_latency = o->latency_snapshot.sink_latency + + pa_bytes_to_usec(o->latency_snapshot.output_memblockq_size, &o->sink_input->sample_spec) + - time_difference; - if (min_total_latency == (pa_usec_t) -1 || o->total_latency < min_total_latency) + /* Add the data that was sent between taking the sink input snapshot + * and the sink snapshot. */ + snapshot_latency += pa_bytes_to_usec(rdata.send_counter - o->latency_snapshot.receive_counter, &o->sink_input->sample_spec); + + /* This is the current combined latency of the slave sink and the related + * memblockq at the time of the sink snapshot. */ + o->total_latency = snapshot_latency; + avg_total_latency += snapshot_latency; + + /* Get max_sink_latency and min_total_latency for target selection. */ + if (min_total_latency == (pa_usec_t)-1 || o->total_latency < min_total_latency) min_total_latency = o->total_latency; - avg_total_latency += o->total_latency; - n++; + if (o->latency_snapshot.sink_latency > max_sink_latency) { + max_sink_latency = o->latency_snapshot.sink_latency; + o_max = o; + } - pa_log_debug("[%s] total=%0.2fms sink=%0.2fms ", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC, (double) sink_latency / PA_USEC_PER_MSEC); + /* Debug output */ + pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC); if (o->total_latency > 10*PA_USEC_PER_SEC) pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC); + + n++; } + /* If there is no valid output there is nothing to do. */ if (min_total_latency == (pa_usec_t) -1) return; avg_total_latency /= n; - target_latency = PA_MAX(max_sink_latency, min_total_latency); + /* The target selection ensures, that at least one of the + * sinks will use the base rate and all other sinks are set + * relative to it. */ + if (max_sink_latency > min_total_latency) + target_latency = o_max->total_latency; + else + target_latency = min_total_latency; pa_log_info("[%s] avg total latency is %0.2f msec.", u->sink->name, (double) avg_total_latency / PA_USEC_PER_MSEC); - pa_log_info("[%s] target latency is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC); + pa_log_info("[%s] target latency for all slaves is %0.2f msec.", u->sink->name, (double) target_latency / PA_USEC_PER_MSEC); base_rate = u->sink->sample_spec.rate; @@ -249,14 +315,12 @@ static void adjust_rates(struct userdata *u) { pa_log_warn("[%s] sample rates too different, not adjusting (%u vs. %u).", o->sink_input->sink->name, base_rate, new_rate); new_rate = base_rate; } else { - if (base_rate < new_rate + 20 && new_rate < base_rate + 20) - new_rate = base_rate; /* Do the adjustment in small steps; 2‰ can be considered inaudible */ if (new_rate < (uint32_t) (current_rate*0.998) || new_rate > (uint32_t) (current_rate*1.002)) { pa_log_info("[%s] new rate of %u Hz not within 2‰ of %u Hz, forcing smaller adjustment", o->sink_input->sink->name, new_rate, current_rate); new_rate = PA_CLAMP(new_rate, (uint32_t) (current_rate*0.998), (uint32_t) (current_rate*1.002)); } - pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f; latency is %0.2f msec.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate, (double) o->total_latency / PA_USEC_PER_MSEC); + pa_log_info("[%s] new rate is %u Hz; ratio is %0.3f.", o->sink_input->sink->name, new_rate, (double) new_rate / base_rate); } pa_sink_input_set_rate(o->sink_input, new_rate); } @@ -271,13 +335,22 @@ static void time_callback(pa_mainloop_api *a, pa_time_event *e, const struct tim pa_assert(a); pa_assert(u->time_event == e); - adjust_rates(u); - if (u->sink->state == PA_SINK_SUSPENDED) { u->core->mainloop->time_free(e); u->time_event = NULL; - } else + } else { + struct output *o; + uint32_t idx; + pa_core_rttime_restart(u->core, e, pa_rtclock_now() + u->adjust_time); + + /* Get latency snapshots */ + PA_IDXSET_FOREACH(o, u->outputs, idx) { + pa_asyncmsgq_send(o->control_inq, PA_MSGOBJECT(o->sink_input), SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT, NULL, 0, NULL); + } + + } + adjust_rates(u); } static void process_render_null(struct userdata *u, pa_usec_t now) { @@ -387,7 +460,10 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) while (pa_asyncmsgq_process_one(o->audio_inq) > 0) ; - /* Ok, now let's prepare some data if we really have to */ + /* Ok, now let's prepare some data if we really have to. Save the + * the time for latency calculations. */ + u->thread_info.render_timestamp = pa_rtclock_now(); + while (!pa_memblockq_is_readable(o->memblockq)) { struct output *j; pa_memchunk chunk; @@ -396,6 +472,7 @@ static void render_memblock(struct userdata *u, struct output *o, size_t length) pa_sink_render(u->sink, length, &chunk); u->thread_info.counter += chunk.length; + o->receive_counter += chunk.length; /* OK, let's send this data to the other threads */ PA_LLIST_FOREACH(j, u->thread_info.active_outputs) { @@ -630,9 +707,10 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 case SINK_INPUT_MESSAGE_POST: - if (PA_SINK_IS_OPENED(o->sink_input->sink->thread_info.state)) + if (o->sink_input->sink->thread_info.state == PA_SINK_RUNNING) { pa_memblockq_push_align(o->memblockq, chunk); - else + o->receive_counter += chunk->length; + } else pa_memblockq_flush_write(o->memblockq, true); return 0; @@ -644,6 +722,24 @@ static int sink_input_process_msg(pa_msgobject *obj, int code, void *data, int64 return 0; } + + case SINK_INPUT_MESSAGE_LATENCY_SNAPSHOT: { + size_t length; + + length = pa_memblockq_get_length(o->sink_input->thread_info.render_memblockq); + + o->latency_snapshot.output_memblockq_size = pa_memblockq_get_length(o->memblockq); + + /* Add content of memblockq's to sink latency */ + o->latency_snapshot.sink_latency = pa_sink_get_latency_within_thread(o->sink, true) + + pa_bytes_to_usec(length, &o->sink->sample_spec); + + o->latency_snapshot.timestamp = pa_rtclock_now(); + + o->latency_snapshot.receive_counter = o->receive_counter; + + return 0; + } } return pa_sink_input_process_msg(obj, code, data, offset, chunk); @@ -735,9 +831,10 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state, running = new_state == PA_SINK_RUNNING; pa_atomic_store(&u->thread_info.running, running); - if (running) + if (running) { + u->thread_info.render_timestamp = 0; pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true); - else + } else pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now()); return 0; @@ -830,6 +927,7 @@ static void output_add_within_thread(struct output *o) { o->userdata->rtpoll, PA_RTPOLL_NORMAL, o->control_inq); + o->receive_counter = o->userdata->thread_info.counter; } /* Called from thread context of the io thread */ @@ -917,8 +1015,11 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case SINK_MESSAGE_UPDATE_LATENCY: { pa_usec_t x, y, latency = (pa_usec_t) offset; - x = pa_rtclock_now(); - y = pa_bytes_to_usec(u->thread_info.counter, &u->sink->sample_spec); + /* It may be possible that thread_info.counter has been increased + * since we took the snapshot. Therefore we have to use the snapshot + * time and counter instead of the current values. */ + x = u->thread_info.snapshot_time; + y = pa_bytes_to_usec(u->thread_info.snapshot_counter, &u->sink->sample_spec); if (y > latency) y -= latency; @@ -929,6 +1030,17 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } + case SINK_MESSAGE_GET_SNAPSHOT: { + struct sink_snapshot *rdata = data; + + rdata->timestamp = u->thread_info.render_timestamp; + rdata->send_counter = u->thread_info.counter; + u->thread_info.snapshot_counter = u->thread_info.counter; + u->thread_info.snapshot_time = u->thread_info.render_timestamp; + + return 0; + } + case SINK_MESSAGE_UPDATE_MAX_REQUEST: update_max_request(u); break; @@ -1506,6 +1618,8 @@ int pa__init(pa_module*m) { u->sink_unlink_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_UNLINK], PA_HOOK_EARLY, (pa_hook_cb_t) sink_unlink_hook_cb, u); u->sink_state_changed_slot = pa_hook_connect(&m->core->hooks[PA_CORE_HOOK_SINK_STATE_CHANGED], PA_HOOK_NORMAL, (pa_hook_cb_t) sink_state_changed_hook_cb, u); + u->thread_info.render_timestamp = 0; + if (!(u->thread = pa_thread_new("combine", thread_func, u))) { pa_log("Failed to create thread."); goto fail;