virtual-source-common: Integrate uplink sink into virtual source library

This patch integrates the uplink sink feature of module-virtual-source into
the virtual source library, so that every virtual source can use it.

The patch also introduces latency handling and rewinding for the uplink sink.
Similar to the monitor source, the only useful definition of the latency
appears to be the negative of the master source latency. Rewinding will not
be possible in most situations, because the underlying memblockq is nearly
always empty.
module-combine-sink and module-suspend-on-idle required some changes to
deal correctly with this type of sink.
This commit is contained in:
Georg Chini 2021-05-01 14:47:43 +02:00
parent 0b26ed9b0c
commit 9f70ad3c95
10 changed files with 512 additions and 249 deletions

View file

@ -124,10 +124,10 @@ struct output {
pa_memblockq *memblockq;
/* For communication of the stream latencies to the main thread */
pa_usec_t total_latency;
int64_t total_latency;
struct {
pa_usec_t timestamp;
pa_usec_t sink_latency;
int64_t sink_latency;
size_t output_memblockq_size;
uint64_t receive_counter;
} latency_snapshot;
@ -249,8 +249,8 @@ static uint32_t rate_controller(
static void adjust_rates(struct userdata *u) {
struct output *o;
struct sink_snapshot rdata;
pa_usec_t avg_total_latency = 0;
pa_usec_t target_latency = 0;
int64_t avg_total_latency = 0;
int64_t target_latency = 0;
pa_usec_t max_sink_latency = 0;
pa_usec_t min_total_latency = (pa_usec_t)-1;
uint32_t base_rate;
@ -280,7 +280,7 @@ static void adjust_rates(struct userdata *u) {
return;
PA_IDXSET_FOREACH(o, u->outputs, idx) {
pa_usec_t snapshot_latency;
int64_t snapshot_latency;
int64_t time_difference;
if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state))
@ -319,7 +319,7 @@ static void adjust_rates(struct userdata *u) {
/* Debug output */
pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC);
if (o->total_latency > 10*PA_USEC_PER_SEC)
if (o->total_latency > (int64_t)(10*PA_USEC_PER_SEC))
pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC);
n++;

View file

@ -71,7 +71,7 @@ static void timeout_cb(pa_mainloop_api*a, pa_time_event* e, const struct timeval
pa_core_maybe_vacuum(d->userdata->core);
}
if (d->source && pa_source_check_suspend(d->source, NULL) <= 0 && !(d->source->suspend_cause & PA_SUSPEND_IDLE)) {
if (d->source && pa_source_check_suspend(d->source, NULL, NULL) <= 0 && !(d->source->suspend_cause & PA_SUSPEND_IDLE)) {
pa_log_info("Source %s idle for too long, suspending ...", d->source->name);
pa_source_suspend(d->source, true, PA_SUSPEND_IDLE);
pa_core_maybe_vacuum(d->userdata->core);
@ -109,6 +109,29 @@ static void resume(struct device_info *d) {
}
}
/* If the monitor source of a sink becomes idle and the sink is
* idle as well, we have to check if it is an uplink sink because
* the underlying virtual source might also have become idle. */
static void restart_check_uplink(struct device_info *d, pa_source_output *ignore, struct userdata *u) {
struct device_info *d_master;
pa_assert(d);
pa_assert(u);
restart(d);
if (!d->sink || (d->sink && !d->sink->uplink_of))
return;
if (!d->sink->uplink_of->source)
return;
if ((d_master = pa_hashmap_get(u->device_infos, d->sink->uplink_of->source))) {
if (pa_source_check_suspend(d_master->source, NULL, ignore) <= 0)
restart(d_master);
}
}
static pa_hook_result_t sink_input_fixate_hook_cb(pa_core *c, pa_sink_input_new_data *data, struct userdata *u) {
struct device_info *d;
@ -145,7 +168,7 @@ static pa_hook_result_t source_output_fixate_hook_cb(pa_core *c, pa_source_outpu
if (d) {
resume(d);
if (d->source) {
if (pa_source_check_suspend(d->source, NULL) <= 0)
if (pa_source_check_suspend(d->source, NULL, NULL) <= 0)
restart(d);
} else {
/* The source output is connected to a monitor source. */
@ -170,6 +193,13 @@ static pa_hook_result_t sink_input_unlink_hook_cb(pa_core *c, pa_sink_input *s,
struct device_info *d;
if ((d = pa_hashmap_get(u->device_infos, s->sink)))
restart(d);
if (s->sink->uplink_of && s->sink->uplink_of->source) {
if (pa_source_check_suspend(s->sink->uplink_of->source, s, NULL) <= 0) {
if ((d = pa_hashmap_get(u->device_infos, s->sink->uplink_of->source)))
restart(d);
}
}
}
return PA_HOOK_OK;
@ -189,12 +219,12 @@ static pa_hook_result_t source_output_unlink_hook_cb(pa_core *c, pa_source_outpu
if (pa_sink_check_suspend(s->source->monitor_of, NULL, s) <= 0)
d = pa_hashmap_get(u->device_infos, s->source->monitor_of);
} else {
if (pa_source_check_suspend(s->source, s) <= 0)
if (pa_source_check_suspend(s->source, NULL, s) <= 0)
d = pa_hashmap_get(u->device_infos, s->source);
}
if (d)
restart(d);
restart_check_uplink(d, s, u);
return PA_HOOK_OK;
}
@ -206,10 +236,18 @@ static pa_hook_result_t sink_input_move_start_hook_cb(pa_core *c, pa_sink_input
pa_sink_input_assert_ref(s);
pa_assert(u);
if (pa_sink_check_suspend(s->sink, s, NULL) <= 0)
if (pa_sink_check_suspend(s->sink, s, NULL) <= 0) {
if ((d = pa_hashmap_get(u->device_infos, s->sink)))
restart(d);
if (s->sink->uplink_of && s->sink->uplink_of->source) {
if (pa_source_check_suspend(s->sink->uplink_of->source, s, NULL) <= 0) {
if ((d = pa_hashmap_get(u->device_infos, s->sink->uplink_of->source)))
restart(d);
}
}
}
return PA_HOOK_OK;
}
@ -240,12 +278,12 @@ static pa_hook_result_t source_output_move_start_hook_cb(pa_core *c, pa_source_o
if (pa_sink_check_suspend(s->source->monitor_of, NULL, s) <= 0)
d = pa_hashmap_get(u->device_infos, s->source->monitor_of);
} else {
if (pa_source_check_suspend(s->source, s) <= 0)
if (pa_source_check_suspend(s->source, NULL, s) <= 0)
d = pa_hashmap_get(u->device_infos, s->source);
}
if (d)
restart(d);
restart_check_uplink(d, s, u);
return PA_HOOK_OK;
}
@ -278,9 +316,14 @@ static pa_hook_result_t sink_input_state_changed_hook_cb(pa_core *c, pa_sink_inp
pa_sink_input_assert_ref(s);
pa_assert(u);
if (s->state == PA_SINK_INPUT_RUNNING && s->sink)
if ((d = pa_hashmap_get(u->device_infos, s->sink)))
resume(d);
if (s->sink) {
if ((d = pa_hashmap_get(u->device_infos, s->sink))) {
if (s->state == PA_SINK_INPUT_RUNNING)
resume(d);
else if (s->state == PA_SINK_INPUT_CORKED && pa_sink_check_suspend(s->sink, NULL, NULL) <= 0)
restart(d);
}
}
return PA_HOOK_OK;
}
@ -289,9 +332,9 @@ static pa_hook_result_t source_output_state_changed_hook_cb(pa_core *c, pa_sourc
pa_assert(c);
pa_source_output_assert_ref(s);
pa_assert(u);
struct device_info *d = NULL;
if (s->state == PA_SOURCE_OUTPUT_RUNNING && s->source) {
struct device_info *d;
if (s->source->monitor_of)
d = pa_hashmap_get(u->device_infos, s->source->monitor_of);
@ -300,6 +343,15 @@ static pa_hook_result_t source_output_state_changed_hook_cb(pa_core *c, pa_sourc
if (d)
resume(d);
} else if (s->state == PA_SOURCE_OUTPUT_CORKED && s->source) {
if (s->source->monitor_of && pa_sink_check_suspend(s->source->monitor_of, NULL, NULL) <= 0)
d = pa_hashmap_get(u->device_infos, s->source->monitor_of);
else if (pa_source_check_suspend(s->source, NULL, NULL) <= 0)
d = pa_hashmap_get(u->device_infos, s->source);
if (d)
restart_check_uplink(d, NULL, u);
}
return PA_HOOK_OK;
@ -349,7 +401,7 @@ static pa_hook_result_t device_new_hook_cb(pa_core *c, pa_object *o, struct user
pa_hashmap_put(u->device_infos, o, d);
if ((d->sink && pa_sink_check_suspend(d->sink, NULL, NULL) <= 0) ||
(d->source && pa_source_check_suspend(d->source, NULL) <= 0))
(d->source && pa_source_check_suspend(d->source, NULL, NULL) <= 0))
restart(d);
return PA_HOOK_OK;
@ -407,7 +459,7 @@ static pa_hook_result_t device_state_changed_hook_cb(pa_core *c, pa_object *o, s
} else if (pa_source_isinstance(o)) {
pa_source *s = PA_SOURCE(o);
if (pa_source_check_suspend(s, NULL) <= 0)
if (pa_source_check_suspend(s, NULL, NULL) <= 0)
if (PA_SOURCE_IS_OPENED(s->state))
restart(d);
}

View file

@ -67,14 +67,6 @@ struct userdata {
pa_vsource *vsource;
unsigned channels;
/* optional fields for uplink sink */
pa_sink *sink;
pa_usec_t block_usec;
pa_memblockq *sink_memblockq;
pa_rtpoll *rtpoll;
bool auto_desc;
};
static const char* const valid_modargs[] = {
@ -88,6 +80,7 @@ static const char* const valid_modargs[] = {
"channel_map",
"use_volume_sharing",
"force_flat_volume",
"autoloaded",
NULL
};
@ -100,141 +93,8 @@ static void filter_process_chunk(uint8_t *src, uint8_t *dst, unsigned in_count,
nbytes = in_count * pa_frame_size(&u->vsource->source->sample_spec);
/* if uplink sink exists, pull data from there; simplify by using
same length as chunk provided by source */
if (u->sink && (u->sink->thread_info.state == PA_SINK_RUNNING)) {
pa_memchunk tchunk;
pa_mix_info streams[2];
pa_memchunk chunk;
void *src_copy;
int ch;
pa_source_output *o;
/* Hmm, process any rewind request that might be queued up */
pa_sink_process_rewind(u->sink, 0);
/* get data from the sink */
while (pa_memblockq_peek(u->sink_memblockq, &tchunk) < 0) {
pa_memchunk nchunk;
/* make sure we get nbytes from the sink with render_full,
otherwise we cannot mix with the uplink */
pa_sink_render_full(u->sink, nbytes, &nchunk);
pa_memblockq_push(u->sink_memblockq, &nchunk);
pa_memblock_unref(nchunk.memblock);
}
pa_assert(tchunk.length == nbytes);
/* move the read pointer for sink memblockq */
pa_memblockq_drop(u->sink_memblockq, tchunk.length);
o = u->vsource->output_from_master;
/* allocate source chunk */
chunk.index = 0;
chunk.length = nbytes;
chunk.memblock = pa_memblock_new(o->source->core->mempool, nbytes);
pa_assert(chunk.memblock);
/* Copy source data to chunk */
src_copy = pa_memblock_acquire_chunk(&chunk);
memcpy(src_copy, src, nbytes);
/* set-up mixing structure
volume was taken care of in sink and source already */
streams[0].chunk = chunk;
for(ch=0;ch<o->sample_spec.channels;ch++)
streams[0].volume.values[ch] = PA_VOLUME_NORM; /* FIXME */
streams[0].volume.channels = o->sample_spec.channels;
streams[1].chunk = tchunk;
for(ch=0;ch<o->sample_spec.channels;ch++)
streams[1].volume.values[ch] = PA_VOLUME_NORM; /* FIXME */
streams[1].volume.channels = o->sample_spec.channels;
/* do mixing */
pa_mix(streams, /* 2 streams to be mixed */
2,
dst, /* put result in dst */
nbytes, /* same length as input */
(const pa_sample_spec *)&o->sample_spec, /* same sample spec for input and output */
NULL, /* no volume information */
false); /* no mute */
pa_memblock_release(chunk.memblock);
pa_memblock_unref(tchunk.memblock);
pa_memblock_unref(chunk.memblock);
} else
/* Copy input to output */
memcpy(dst, src, nbytes);
}
/* When the source output moves, the asyncmsgq of the uplink sink has
* to change as well */
static void source_output_moving_cb(pa_source_output *o, pa_source *dest) {
struct userdata *u;
pa_assert(u = o->userdata);
pa_virtual_source_output_moving(o, dest);
if (dest && u->sink) {
pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq);
}
}
/* Called from I/O thread context */
static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
switch (code) {
case PA_SINK_MESSAGE_GET_LATENCY:
/* there's no real latency here */
*((int64_t*) data) = 0;
return 0;
}
return pa_sink_process_msg(o, code, data, offset, chunk);
}
/* Called from main context */
static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
struct userdata *u;
pa_sink_assert_ref(s);
pa_assert_se(u = s->userdata);
if (!PA_SINK_IS_LINKED(state)) {
return 0;
}
if (state == PA_SINK_RUNNING) {
/* need to wake-up source if it was suspended */
pa_log_debug("Resuming source %s, because its uplink sink became active.", u->vsource->source->name);
pa_source_suspend(u->vsource->source, false, PA_SUSPEND_ALL);
/* FIXME: if there's no client connected, the source will suspend
and playback will be stuck. You'd want to prevent the source from
sleeping when the uplink sink is active; even if the audio is
discarded at least the app isn't stuck */
} else {
/* nothing to do, if the sink becomes idle or suspended let
module-suspend-idle handle the sources later */
}
return 0;
}
static void sink_update_requested_latency_cb(pa_sink *s) {
struct userdata *u;
pa_sink_assert_ref(s);
pa_assert_se(u = s->userdata);
/* FIXME: there's no latency support */
/* Copy input to output */
memcpy(dst, src, nbytes);
}
int pa__init(pa_module*m) {
@ -245,10 +105,6 @@ int pa__init(pa_module*m) {
pa_source *master=NULL;
bool use_volume_sharing = true;
/* optional for uplink_sink */
pa_sink_new_data sink_data;
size_t nbytes;
pa_assert(m);
if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
@ -280,75 +136,13 @@ int pa__init(pa_module*m) {
m->userdata = u;
u->channels = ss.channels;
/* The rtpoll created here is never run. It is only necessary to avoid crashes
* when module-virtual-source is used together with module-loopback or
* module-combine-sink. Both modules base their asyncmsq on the rtpoll provided
* by the sink. module-loopback and combine-sink only work because they
* call pa_asyncmsq_process_one() themselves. */
u->rtpoll = pa_rtpoll_new();
/* Create virtual source */
/* Create virtual source */
if (!(u->vsource = pa_virtual_source_create(master, "vsource", "Virtual Source", &ss, &map,
&ss, &map, m, u, ma, use_volume_sharing, true)))
goto fail;
/* Set callback for virtual source */
u->vsource->process_chunk = filter_process_chunk;
u->vsource->output_from_master->moving = source_output_moving_cb;
/* Create optional uplink sink */
pa_sink_new_data_init(&sink_data);
sink_data.driver = __FILE__;
sink_data.module = m;
if ((sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "uplink_sink", NULL)))) {
pa_sink_new_data_set_sample_spec(&sink_data, &ss);
pa_sink_new_data_set_channel_map(&sink_data, &map);
pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, master->name);
pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "uplink sink");
pa_proplist_sets(sink_data.proplist, "device.uplink_sink.name", sink_data.name);
if ((u->auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) {
const char *z;
z = pa_proplist_gets(master->proplist, PA_PROP_DEVICE_DESCRIPTION);
pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Uplink Sink %s on %s", sink_data.name, z ? z : master->name);
}
u->sink_memblockq = pa_memblockq_new("module-virtual-source sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0, &ss, 1, 1, 0, NULL);
if (!u->sink_memblockq) {
pa_sink_new_data_done(&sink_data);
pa_log("Failed to create sink memblockq.");
goto fail;
}
u->sink = pa_sink_new(m->core, &sink_data, 0); /* FIXME, sink has no capabilities */
pa_sink_new_data_done(&sink_data);
if (!u->sink) {
pa_log("Failed to create sink.");
goto fail;
}
u->sink->parent.process_msg = sink_process_msg_cb;
u->sink->update_requested_latency = sink_update_requested_latency_cb;
u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
u->sink->userdata = u;
pa_sink_set_asyncmsgq(u->sink, master->asyncmsgq);
pa_sink_set_rtpoll(u->sink, u->rtpoll);
/* FIXME: no idea what I am doing here */
u->block_usec = BLOCK_USEC;
nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
pa_sink_set_max_rewind(u->sink, 0);
pa_sink_set_max_request(u->sink, nbytes);
pa_sink_put(u->sink);
} else {
pa_sink_new_data_done(&sink_data);
/* optional uplink sink not enabled */
u->sink = NULL;
}
if (pa_virtual_source_activate(u->vsource) < 0)
goto fail;
@ -386,16 +180,5 @@ void pa__done(pa_module*m) {
if (u->vsource)
pa_virtual_source_destroy(u->vsource);
if (u->sink) {
pa_sink_unlink(u->sink);
pa_sink_unref(u->sink);
}
if (u->sink_memblockq)
pa_memblockq_free(u->sink_memblockq);
if (u->rtpoll)
pa_rtpoll_free(u->rtpoll);
pa_xfree(u);
}

View file

@ -22,8 +22,10 @@
#include <modules/virtual-source-common.h>
#include <pulsecore/core-util.h>
#include <pulsecore/mix.h>
#include <pulse/timeval.h>
#include <pulse/rtclock.h>
PA_DEFINE_PRIVATE_CLASS(pa_vsource, pa_msgobject);
#define PA_VSOURCE(o) (pa_vsource_cast(o))
@ -42,6 +44,11 @@ enum {
VSOURCE_MESSAGE_OUTPUT_ATTACHED
};
struct uplink_data {
pa_vsource *vsource;
pa_memblockq *memblockq;
};
/* Helper functions */
static inline pa_source_output* get_output_from_source(pa_source *s) {
@ -121,6 +128,8 @@ static void set_latency_range_within_thread(pa_vsource *vsource) {
}
pa_source_set_latency_range_within_thread(s, min_latency, max_latency);
if (vsource->uplink_sink)
pa_sink_set_latency_range_within_thread(vsource->uplink_sink, min_latency, max_latency);
}
/* Called from I/O thread context */
@ -136,6 +145,134 @@ static void set_memblockq_rewind(pa_vsource *vsource) {
}
}
/* Uplink sink callbacks */
/* Called from I/O thread context */
static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
pa_sink *s;
struct uplink_data *uplink;
s = PA_SINK(o);
uplink = s->userdata;
pa_assert(uplink);
switch (code) {
case PA_SINK_MESSAGE_GET_LATENCY:
/* While the sink is not opened or if we have not received any data yet,
* simply return 0 as latency */
if (!PA_SINK_IS_OPENED(s->thread_info.state)) {
*((int64_t*) data) = 0;
return 0;
}
*((int64_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(uplink->memblockq), &s->sample_spec);
*((int64_t*) data) -= pa_source_get_latency_within_thread(uplink->vsource->source, true);
return 0;
}
return pa_sink_process_msg(o, code, data, offset, chunk);
}
/* Called from main context */
static int sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) {
pa_vsource *vsource;
struct uplink_data *uplink;
pa_sink_assert_ref(s);
uplink = s->userdata;
pa_assert(uplink);
vsource = uplink->vsource;
pa_assert(vsource);
if (!PA_SINK_IS_LINKED(state)) {
return 0;
}
/* need to wake-up source if it was suspended */
if (!PA_SINK_IS_OPENED(s->state) && PA_SINK_IS_OPENED(state) && !PA_SOURCE_IS_OPENED(vsource->source->state) && PA_SOURCE_IS_LINKED(vsource->source->state)) {
pa_log_debug("Resuming source %s, because its uplink sink became active.", vsource->source->name);
pa_source_suspend(vsource->source, false, PA_SUSPEND_IDLE);
}
return 0;
}
/* Called from the IO thread. */
static int sink_set_state_in_io_thread(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) {
struct uplink_data *uplink;
pa_sink_assert_ref(s);
uplink = s->userdata;
pa_assert(uplink);
if (!PA_SINK_IS_OPENED(new_state) && PA_SINK_IS_OPENED(s->thread_info.state)) {
pa_memblockq_flush_write(uplink->memblockq, true);
pa_sink_set_max_request_within_thread(s, 0);
pa_sink_set_max_rewind_within_thread(s, 0);
}
return 0;
}
/* Called from I/O thread context */
static void sink_update_requested_latency(pa_sink *s) {
struct uplink_data *uplink;
pa_usec_t latency;
size_t rewind_size;
pa_sink_assert_ref(s);
uplink = s->userdata;
pa_assert(uplink);
if (!PA_SINK_IS_LINKED(s->thread_info.state))
return;
latency = pa_sink_get_requested_latency_within_thread(s);
if (latency == (pa_usec_t) -1)
latency = s->thread_info.max_latency;
rewind_size = pa_usec_to_bytes(latency, &s->sample_spec);
pa_memblockq_set_maxrewind(uplink->memblockq, rewind_size);
pa_sink_set_max_request_within_thread(s, rewind_size);
pa_sink_set_max_rewind_within_thread(s, rewind_size);
}
static void sink_process_rewind(pa_sink *s) {
struct uplink_data *uplink;
size_t rewind_nbytes, in_buffer;
uplink = s->userdata;
pa_assert(uplink);
rewind_nbytes = s->thread_info.rewind_nbytes;
if (!PA_SINK_IS_OPENED(s->thread_info.state) || rewind_nbytes <= 0)
goto finish;
pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes);
in_buffer = pa_memblockq_get_length(uplink->memblockq);
if (in_buffer == 0) {
pa_log_debug("Memblockq empty, cannot rewind");
goto finish;
}
if (rewind_nbytes > in_buffer)
rewind_nbytes = in_buffer;
pa_memblockq_seek(uplink->memblockq, -rewind_nbytes, PA_SEEK_RELATIVE, true);
pa_sink_process_rewind(s, rewind_nbytes);
pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
return;
finish:
pa_sink_process_rewind(s, 0);
}
/* Source callbacks */
/* Called from I/O thread context */
@ -230,15 +367,34 @@ int pa_virtual_source_process_msg(pa_msgobject *obj, int code, void *data, int64
/* Called from main context */
int pa_virtual_source_set_state_in_main_thread(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) {
pa_source_output *o;
pa_vsource *vsource;
bool suspend_cause_changed;
pa_source_assert_ref(s);
o = get_output_from_source(s);
pa_assert(o);
vsource = s->vsource;
pa_assert(vsource);
if (!PA_SOURCE_IS_LINKED(state) ||
!PA_SOURCE_OUTPUT_IS_LINKED(o->state))
return 0;
suspend_cause_changed = (suspend_cause != s->suspend_cause);
if (vsource->uplink_sink && PA_SINK_IS_LINKED(vsource->uplink_sink->state) && suspend_cause_changed) {
/* If the source is suspended for other reasons than being idle, the uplink sink
* should be suspended using the same reasons */
if (suspend_cause != PA_SUSPEND_IDLE && state == PA_SOURCE_SUSPENDED) {
suspend_cause = suspend_cause & ~PA_SUSPEND_IDLE;
pa_sink_suspend(vsource->uplink_sink, true, suspend_cause);
} else if (PA_SOURCE_IS_OPENED(state) && s->suspend_cause != PA_SUSPEND_IDLE) {
/* If the source is resuming, the old suspend cause of the source should
* be removed from the sink unless the old suspend cause was idle. */
suspend_cause = s->suspend_cause & ~PA_SUSPEND_IDLE;
pa_sink_suspend(vsource->uplink_sink, false, suspend_cause);
}
}
pa_source_output_cork(o, state == PA_SOURCE_SUSPENDED);
return 0;
}
@ -335,6 +491,86 @@ void pa_virtual_source_set_mute(pa_source *s) {
pa_source_output_set_mute(o, s->muted, s->save_muted);
}
/* Post data, mix in uplink sink */
void pa_virtual_source_post(pa_source *s, const pa_memchunk *chunk) {
pa_vsource *vsource;
vsource = s->vsource;
pa_assert(vsource);
/* if uplink sink exists, pull data from there; simplify by using
same length as chunk provided by source */
if (vsource->uplink_sink && PA_SINK_IS_OPENED(vsource->uplink_sink->thread_info.state)) {
pa_memchunk tchunk;
pa_mix_info streams[2];
int ch;
uint8_t *dst;
pa_memchunk dst_chunk;
size_t nbytes;
struct uplink_data *uplink;
uplink = vsource->uplink_sink->userdata;
pa_assert(uplink);
/* Hmm, process any rewind request that might be queued up */
if (PA_UNLIKELY(vsource->uplink_sink->thread_info.rewind_requested))
sink_process_rewind(vsource->uplink_sink);
nbytes = chunk->length;
/* get data from the sink */
while (pa_memblockq_get_length(uplink->memblockq) < nbytes) {
pa_memchunk nchunk;
size_t missing;
missing = nbytes - pa_memblockq_get_length(uplink->memblockq);
pa_sink_render(vsource->uplink_sink, missing, &nchunk);
pa_memblockq_push(uplink->memblockq, &nchunk);
pa_memblock_unref(nchunk.memblock);
}
pa_memblockq_peek_fixed_size(uplink->memblockq, nbytes, &tchunk);
pa_assert(tchunk.length == nbytes);
/* move the read pointer for sink memblockq */
pa_memblockq_drop(uplink->memblockq, tchunk.length);
/* Prepare output chunk */
dst_chunk.index = 0;
dst_chunk.length = nbytes;
dst_chunk.memblock = pa_memblock_new(vsource->core->mempool, dst_chunk.length);
dst = pa_memblock_acquire_chunk(&dst_chunk);
/* set-up mixing structure
volume was taken care of in sink and source already */
streams[0].chunk = *chunk;
for(ch=0; ch < s->sample_spec.channels; ch++)
streams[0].volume.values[ch] = PA_VOLUME_NORM;
streams[0].volume.channels = s->sample_spec.channels;
streams[1].chunk = tchunk;
for(ch=0; ch < s->sample_spec.channels;ch++)
streams[1].volume.values[ch] = PA_VOLUME_NORM;
streams[1].volume.channels = s->sample_spec.channels;
/* do mixing */
pa_mix(streams, /* 2 streams to be mixed */
2,
dst, /* put result in dst */
nbytes, /* same length as input */
(const pa_sample_spec *)&s->sample_spec, /* same sample spec for input and output */
NULL, /* no volume information */
false); /* no mute */
pa_memblock_release(dst_chunk.memblock);
pa_source_post(s, &dst_chunk);
pa_memblock_unref(tchunk.memblock);
pa_memblock_unref(dst_chunk.memblock);
} else
pa_source_post(s, chunk);
}
/* Source output callbacks */
/* Called from output thread context */
@ -355,7 +591,7 @@ void pa_virtual_source_output_push(pa_source_output *o, const pa_memchunk *chunk
return;
if (!vsource->process_chunk || !vsource->memblockq) {
pa_source_post(s, chunk);
pa_virtual_source_post(s, chunk);
return;
}
@ -427,7 +663,7 @@ void pa_virtual_source_output_push(pa_source_output *o, const pa_memchunk *chunk
pa_memblock_unref(schunk.memblock);
/* Post data */
pa_source_post(s, &tchunk);
pa_virtual_source_post(s, &tchunk);
pa_memblock_unref(tchunk.memblock);
length = pa_memblockq_get_length(vsource->memblockq);
@ -461,8 +697,16 @@ void pa_virtual_source_output_process_rewind(pa_source_output *o, size_t nbytes)
* pass the rewind on to the source */
if (vsource->memblockq)
pa_memblockq_seek(vsource->memblockq, - nbytes, PA_SEEK_RELATIVE, true);
else
else {
pa_source_process_rewind(s, nbytes * out_fs / in_fs);
if (vsource->uplink_sink && PA_SINK_IS_OPENED(vsource->uplink_sink->thread_info.state)) {
struct uplink_data *uplink;
uplink = vsource->uplink_sink->userdata;
pa_assert(uplink);
pa_memblockq_rewind(uplink->memblockq, nbytes * out_fs / in_fs);
}
}
}
/* Called from source I/O thread context. */
@ -543,6 +787,8 @@ void pa_virtual_source_output_attach(pa_source_output *o) {
master_fs = pa_frame_size(&o->source->sample_spec);
pa_source_set_rtpoll(s, o->source->thread_info.rtpoll);
if (vsource->uplink_sink)
pa_sink_set_rtpoll(vsource->uplink_sink, o->source->thread_info.rtpoll);
set_latency_range_within_thread(vsource);
@ -574,16 +820,21 @@ void pa_virtual_source_output_attach(pa_source_output *o) {
/* Called from output thread context */
void pa_virtual_source_output_detach(pa_source_output *o) {
pa_source *s;
pa_vsource *vsource;
pa_source_output_assert_ref(o);
pa_source_output_assert_io_context(o);
s = o->destination_source;
pa_assert(s);
vsource = s->vsource;
pa_assert(vsource);
if (PA_SOURCE_IS_LINKED(s->thread_info.state))
pa_source_detach_within_thread(s);
pa_source_set_rtpoll(s, NULL);
if (vsource->uplink_sink)
pa_sink_set_rtpoll(vsource->uplink_sink, NULL);
}
/* Called from main thread */
@ -614,6 +865,23 @@ void pa_virtual_source_output_kill(pa_source_output *o) {
if (vsource->memblockq)
pa_memblockq_free(vsource->memblockq);
/* Destroy uplink sink if present */
if (vsource->uplink_sink) {
struct uplink_data *uplink;
uplink = vsource->uplink_sink->userdata;
pa_sink_unlink(vsource->uplink_sink);
pa_sink_unref(vsource->uplink_sink);
if (uplink) {
if (uplink->memblockq)
pa_memblockq_free(uplink->memblockq);
pa_xfree(uplink);
}
vsource->uplink_sink = NULL;
}
/* Virtual sources must set the module */
m = s->module;
pa_assert(m);
@ -640,7 +908,21 @@ bool pa_virtual_source_output_may_move_to(pa_source_output *o, pa_source *dest)
if (vsource->autoloaded)
return false;
return s != dest;
if (s == dest)
return false;
if (vsource->uplink_sink) {
pa_source *chain_master;
chain_master = dest;
while (chain_master->vsource && chain_master->vsource->output_from_master)
chain_master = chain_master->vsource->output_from_master->source;
if (chain_master == vsource->uplink_sink->monitor_source)
return false;
}
return true;
}
/* Called from main thread */
@ -649,6 +931,7 @@ void pa_virtual_source_output_moving(pa_source_output *o, pa_source *dest) {
pa_vsource *vsource;
uint32_t idx;
pa_source_output *output;
pa_sink_input *input;
pa_source_output_assert_ref(o);
pa_assert_ctl_context();
@ -662,8 +945,22 @@ void pa_virtual_source_output_moving(pa_source_output *o, pa_source *dest) {
pa_source_update_flags(s, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags);
pa_proplist_sets(s->proplist, PA_PROP_DEVICE_MASTER_DEVICE, dest->name);
vsource->source_moving = true;
} else
if (vsource->uplink_sink) {
pa_sink_flags_t flags = 0;
if (dest->flags & PA_SOURCE_LATENCY)
flags |= PA_SINK_LATENCY;
if (dest->flags & PA_SOURCE_DYNAMIC_LATENCY)
flags |= PA_SINK_DYNAMIC_LATENCY;
pa_sink_set_asyncmsgq(vsource->uplink_sink, dest->asyncmsgq);
pa_sink_update_flags(vsource->uplink_sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, flags);
pa_proplist_sets(vsource->uplink_sink->proplist, PA_PROP_DEVICE_MASTER_DEVICE, dest->name);
}
} else {
pa_source_set_asyncmsgq(s, NULL);
if (vsource->uplink_sink)
pa_sink_set_asyncmsgq(vsource->uplink_sink, NULL);
}
if (dest && vsource->set_description)
vsource->set_description(o, dest);
@ -689,12 +986,33 @@ void pa_virtual_source_output_moving(pa_source_output *o, pa_source *dest) {
pa_proplist_setf(o->proplist, PA_PROP_MEDIA_NAME, "%s Stream from %s", vsource->desc_head, pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION));
}
if (vsource->uplink_sink && dest) {
const char *z;
pa_proplist *pl;
pl = pa_proplist_new();
z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION);
pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Uplink sink %s on %s",
pa_proplist_gets(vsource->uplink_sink->proplist, "device.uplink_sink.name"), z ? z : dest->name);
pa_sink_update_proplist(vsource->uplink_sink, PA_UPDATE_REPLACE, pl);
pa_proplist_free(pl);
}
/* Propagate asyncmsq change to attached virtual sources */
PA_IDXSET_FOREACH(output, s->outputs, idx) {
if (output->destination_source && output->moving)
output->moving(output, s);
}
/* Propagate asyncmsq change to virtual sinks attached to the uplink sink */
if (vsource->uplink_sink) {
PA_IDXSET_FOREACH(input, vsource->uplink_sink->inputs, idx) {
if (input->origin_sink && input->moving)
input->moving(input, vsource->uplink_sink);
}
}
}
/* Called from main context */
@ -834,6 +1152,10 @@ int pa_virtual_source_activate(pa_vsource *vs) {
return -1;
}
/* Activate uplink sink */
if (vs->uplink_sink)
pa_sink_put(vs->uplink_sink);
/* Set source output latency at startup to max_latency if specified. */
if (vs->max_latency)
pa_source_output_set_requested_latency(vs->output_from_master, vs->max_latency);
@ -884,6 +1206,22 @@ void pa_virtual_source_destroy(pa_vsource *vs) {
vs->source = NULL;
}
/* Destroy uplink sink if present */
if (vs->uplink_sink) {
struct uplink_data *uplink;
uplink = vs->uplink_sink->userdata;
pa_sink_unlink(vs->uplink_sink);
pa_sink_unref(vs->uplink_sink);
if (uplink) {
if (uplink->memblockq)
pa_memblockq_free(uplink->memblockq);
pa_xfree(uplink);
}
}
/* We have to use pa_msgobject_unref() here because there may still be pending
* VSOURCE_MESSAGE_OUTPUT_ATTACHED messages. */
pa_msgobject_unref(PA_MSGOBJECT(vs));
@ -922,6 +1260,7 @@ pa_vsource* pa_virtual_source_vsource_new(pa_source *s) {
vsource->update_filter_parameters = NULL;
vsource->update_block_sizes = NULL;
vsource->free_filter_parameters = NULL;
vsource->uplink_sink = NULL;
return vsource;
}
@ -942,6 +1281,8 @@ pa_vsource *pa_virtual_source_create(pa_source *master, const char *source_type,
pa_vsource *vsource;
pa_source *s;
pa_source_output *o;
const char *uplink_sink;
pa_sink_new_data sink_data;
/* Make sure all necessary values are set. Only userdata and source description
* are allowed to be NULL. */
@ -1093,6 +1434,70 @@ pa_vsource *pa_virtual_source_create(pa_source *master, const char *source_type,
vsource->memblockq = pa_memblockq_new(tmp, 0, MEMBLOCKQ_MAXLENGTH, 0, source_output_ss, 1, 1, 0, &silence);
pa_memblock_unref(silence.memblock);
pa_xfree(tmp);
if (!vsource->memblockq) {
pa_log("Failed to create memblockq");
pa_virtual_source_destroy(vsource);
return NULL;
}
}
/* Set up uplink sink */
uplink_sink = pa_modargs_get_value(ma, "uplink_sink", NULL);
if (uplink_sink) {
const char *z;
char *tmp;
pa_memchunk silence;
pa_sink_flags_t flags;
struct uplink_data *uplink;
pa_sink_new_data_init(&sink_data);
sink_data.driver = m->name;
sink_data.module = m;
sink_data.name = pa_xstrdup(uplink_sink);
pa_sink_new_data_set_sample_spec(&sink_data, source_ss);
pa_sink_new_data_set_channel_map(&sink_data, source_map);
pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, master->name);
pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "uplink sink");
pa_proplist_sets(sink_data.proplist, "device.uplink_sink.name", sink_data.name);
z = pa_proplist_gets(master->proplist, PA_PROP_DEVICE_DESCRIPTION);
pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Uplink Sink %s on %s", sink_data.name, z ? z : master->name);
flags = 0;
if (master->flags & PA_SOURCE_LATENCY)
flags = PA_SINK_LATENCY;
if (master->flags & PA_SOURCE_DYNAMIC_LATENCY)
flags |= PA_SINK_DYNAMIC_LATENCY;
vsource->uplink_sink = pa_sink_new(m->core, &sink_data, flags);
pa_sink_new_data_done(&sink_data);
if (!vsource->uplink_sink) {
pa_log("Failed to create uplink sink");
pa_virtual_source_destroy(vsource);
return NULL;
}
uplink = pa_xnew0(struct uplink_data, 1);
vsource->uplink_sink->userdata = uplink;
tmp = pa_sprintf_malloc("%s uplink sink memblockq", desc_prefix);
pa_silence_memchunk_get(&s->core->silence_cache, s->core->mempool, &silence, &s->sample_spec, 0);
uplink->memblockq = pa_memblockq_new(tmp, 0, MEMBLOCKQ_MAXLENGTH, 0, source_ss, 1, 1, 0, &silence);
pa_memblock_unref(silence.memblock);
pa_xfree(tmp);
if (!uplink->memblockq) {
pa_log("Failed to create sink memblockq");
pa_virtual_source_destroy(vsource);
return NULL;
}
vsource->uplink_sink->parent.process_msg = sink_process_msg;
vsource->uplink_sink->update_requested_latency = sink_update_requested_latency;
vsource->uplink_sink->set_state_in_main_thread = sink_set_state_in_main_thread;
vsource->uplink_sink->set_state_in_io_thread = sink_set_state_in_io_thread;
vsource->uplink_sink->uplink_of = vsource;
uplink->vsource = vsource;
pa_sink_set_asyncmsgq(vsource->uplink_sink, master->asyncmsgq);
}
return vsource;

View file

@ -69,3 +69,6 @@ pa_vsource* pa_virtual_source_vsource_new(pa_source *s);
/* Update filter parameters */
void pa_virtual_source_request_parameter_update(pa_vsource *vs, void *parameters);
/* Post data, mix in uplink sink */
void pa_virtual_source_post(pa_source *s, const pa_memchunk *chunk);

View file

@ -287,6 +287,7 @@ pa_sink* pa_sink_new(
s->inputs = pa_idxset_new(NULL, NULL);
s->n_corked = 0;
s->vsink = NULL;
s->uplink_of = NULL;
s->reference_volume = s->real_volume = data->volume;
pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels);
@ -2552,7 +2553,7 @@ unsigned pa_sink_check_suspend(pa_sink *s, pa_sink_input *ignore_input, pa_sourc
}
if (s->monitor_source)
ret += pa_source_check_suspend(s->monitor_source, ignore_output);
ret += pa_source_check_suspend(s->monitor_source, ignore_input, ignore_output);
return ret;
}
@ -3258,6 +3259,9 @@ void pa_sink_invalidate_requested_latency(pa_sink *s, bool dynamic) {
if (PA_SINK_IS_LINKED(s->thread_info.state)) {
if (s->uplink_of && s->uplink_of->source)
pa_source_invalidate_requested_latency(s->uplink_of->source, dynamic);
if (s->update_requested_latency)
s->update_requested_latency(s);

View file

@ -173,6 +173,7 @@ struct pa_sink {
unsigned n_corked;
pa_source *monitor_source;
pa_vsink *vsink; /* non-NULL only for filter sinks */
pa_vsource *uplink_of; /* non-NULL only for uplink sinks */
pa_volume_t base_volume; /* shall be constant */
unsigned n_volume_steps; /* shall be constant */

View file

@ -2027,7 +2027,7 @@ unsigned pa_source_used_by(pa_source *s) {
}
/* Called from main thread */
unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore) {
unsigned pa_source_check_suspend(pa_source *s, pa_sink_input *ignore_input, pa_source_output *ignore_output) {
unsigned ret;
pa_source_output *o;
uint32_t idx;
@ -2041,7 +2041,7 @@ unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore) {
ret = 0;
PA_IDXSET_FOREACH(o, s->outputs, idx) {
if (o == ignore)
if (o == ignore_output)
continue;
/* We do not assert here. It is perfectly valid for a source output to
@ -2061,6 +2061,9 @@ unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore) {
ret ++;
}
if (s->vsource && s->vsource->uplink_sink)
ret += pa_sink_check_suspend(s->vsource->uplink_sink, ignore_input, ignore_output);
return ret;
}
@ -2408,6 +2411,16 @@ pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) {
(result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency))
result = o->thread_info.requested_source_latency;
if (s->vsource && s->vsource->uplink_sink) {
pa_usec_t uplink_sink_latency;
uplink_sink_latency = pa_sink_get_requested_latency_within_thread(s->vsource->uplink_sink);
if (uplink_sink_latency != (pa_usec_t) -1 &&
(result == (pa_usec_t) -1 || result > uplink_sink_latency))
result = uplink_sink_latency;
}
if (result != (pa_usec_t) -1)
result = PA_CLAMP(result, s->thread_info.min_latency, s->thread_info.max_latency);

View file

@ -89,6 +89,7 @@ typedef struct pa_vsource {
* In this case, overlap_frames contains the maximum
* number of history frames. */
pa_usec_t max_latency; /* Maximum latency allowed for the source, 0 if unused */
pa_sink *uplink_sink; /* Uplink sink if present, otherwise NULL */
/* Callback to process a chunk of data by the filter. Called from I/O thread
* context. May be NULL */
@ -499,7 +500,7 @@ unsigned pa_source_used_by(pa_source *s); /* Number of connected streams that ar
/* Returns how many streams are active that don't allow suspensions. If
* "ignore" is non-NULL, that stream is not included in the count. */
unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore);
unsigned pa_source_check_suspend(pa_source *s, pa_sink_input *ignore_input, pa_source_output *ignore_output);
const char *pa_source_state_to_string(pa_source_state_t state);

View file

@ -32,6 +32,7 @@ typedef struct pa_sink_input pa_sink_input;
typedef struct pa_source pa_source;
typedef struct pa_source_volume_change pa_source_volume_change;
typedef struct pa_source_output pa_source_output;
typedef struct pa_vsource pa_vsource;
#endif