diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c index 2ccd9eb13..df1e25996 100644 --- a/src/modules/module-combine-sink.c +++ b/src/modules/module-combine-sink.c @@ -124,10 +124,10 @@ struct output { pa_memblockq *memblockq; /* For communication of the stream latencies to the main thread */ - pa_usec_t total_latency; + int64_t total_latency; struct { pa_usec_t timestamp; - pa_usec_t sink_latency; + int64_t sink_latency; size_t output_memblockq_size; uint64_t receive_counter; } latency_snapshot; @@ -249,8 +249,8 @@ static uint32_t rate_controller( static void adjust_rates(struct userdata *u) { struct output *o; struct sink_snapshot rdata; - pa_usec_t avg_total_latency = 0; - pa_usec_t target_latency = 0; + int64_t avg_total_latency = 0; + int64_t target_latency = 0; pa_usec_t max_sink_latency = 0; pa_usec_t min_total_latency = (pa_usec_t)-1; uint32_t base_rate; @@ -280,7 +280,7 @@ static void adjust_rates(struct userdata *u) { return; PA_IDXSET_FOREACH(o, u->outputs, idx) { - pa_usec_t snapshot_latency; + int64_t snapshot_latency; int64_t time_difference; if (!o->sink_input || !PA_SINK_IS_OPENED(o->sink->state)) @@ -319,7 +319,7 @@ static void adjust_rates(struct userdata *u) { /* Debug output */ pa_log_debug("[%s] Snapshot sink latency = %0.2fms, total snapshot latency = %0.2fms", o->sink->name, (double) o->latency_snapshot.sink_latency / PA_USEC_PER_MSEC, (double) snapshot_latency / PA_USEC_PER_MSEC); - if (o->total_latency > 10*PA_USEC_PER_SEC) + if (o->total_latency > (int64_t)(10*PA_USEC_PER_SEC)) pa_log_warn("[%s] Total latency of output is very high (%0.2fms), most likely the audio timing in one of your drivers is broken.", o->sink->name, (double) o->total_latency / PA_USEC_PER_MSEC); n++; diff --git a/src/modules/module-suspend-on-idle.c b/src/modules/module-suspend-on-idle.c index 498b09449..b793d7e6e 100644 --- a/src/modules/module-suspend-on-idle.c +++ b/src/modules/module-suspend-on-idle.c @@ -71,7 +71,7 @@ static void timeout_cb(pa_mainloop_api*a, pa_time_event* e, const struct timeval pa_core_maybe_vacuum(d->userdata->core); } - if (d->source && pa_source_check_suspend(d->source, NULL) <= 0 && !(d->source->suspend_cause & PA_SUSPEND_IDLE)) { + if (d->source && pa_source_check_suspend(d->source, NULL, NULL) <= 0 && !(d->source->suspend_cause & PA_SUSPEND_IDLE)) { pa_log_info("Source %s idle for too long, suspending ...", d->source->name); pa_source_suspend(d->source, true, PA_SUSPEND_IDLE); pa_core_maybe_vacuum(d->userdata->core); @@ -109,6 +109,29 @@ static void resume(struct device_info *d) { } } +/* If the monitor source of a sink becomes idle and the sink is + * idle as well, we have to check if it is an uplink sink because + * the underlying virtual source might also have become idle. */ +static void restart_check_uplink(struct device_info *d, pa_source_output *ignore, struct userdata *u) { + struct device_info *d_master; + + pa_assert(d); + pa_assert(u); + + restart(d); + + if (!d->sink || (d->sink && !d->sink->uplink_of)) + return; + + if (!d->sink->uplink_of->source) + return; + + if ((d_master = pa_hashmap_get(u->device_infos, d->sink->uplink_of->source))) { + if (pa_source_check_suspend(d_master->source, NULL, ignore) <= 0) + restart(d_master); + } +} + static pa_hook_result_t sink_input_fixate_hook_cb(pa_core *c, pa_sink_input_new_data *data, struct userdata *u) { struct device_info *d; @@ -145,7 +168,7 @@ static pa_hook_result_t source_output_fixate_hook_cb(pa_core *c, pa_source_outpu if (d) { resume(d); if (d->source) { - if (pa_source_check_suspend(d->source, NULL) <= 0) + if (pa_source_check_suspend(d->source, NULL, NULL) <= 0) restart(d); } else { /* The source output is connected to a monitor source. */ @@ -170,6 +193,13 @@ static pa_hook_result_t sink_input_unlink_hook_cb(pa_core *c, pa_sink_input *s, struct device_info *d; if ((d = pa_hashmap_get(u->device_infos, s->sink))) restart(d); + + if (s->sink->uplink_of && s->sink->uplink_of->source) { + if (pa_source_check_suspend(s->sink->uplink_of->source, s, NULL) <= 0) { + if ((d = pa_hashmap_get(u->device_infos, s->sink->uplink_of->source))) + restart(d); + } + } } return PA_HOOK_OK; @@ -189,12 +219,12 @@ static pa_hook_result_t source_output_unlink_hook_cb(pa_core *c, pa_source_outpu if (pa_sink_check_suspend(s->source->monitor_of, NULL, s) <= 0) d = pa_hashmap_get(u->device_infos, s->source->monitor_of); } else { - if (pa_source_check_suspend(s->source, s) <= 0) + if (pa_source_check_suspend(s->source, NULL, s) <= 0) d = pa_hashmap_get(u->device_infos, s->source); } if (d) - restart(d); + restart_check_uplink(d, s, u); return PA_HOOK_OK; } @@ -206,10 +236,18 @@ static pa_hook_result_t sink_input_move_start_hook_cb(pa_core *c, pa_sink_input pa_sink_input_assert_ref(s); pa_assert(u); - if (pa_sink_check_suspend(s->sink, s, NULL) <= 0) + if (pa_sink_check_suspend(s->sink, s, NULL) <= 0) { if ((d = pa_hashmap_get(u->device_infos, s->sink))) restart(d); + if (s->sink->uplink_of && s->sink->uplink_of->source) { + if (pa_source_check_suspend(s->sink->uplink_of->source, s, NULL) <= 0) { + if ((d = pa_hashmap_get(u->device_infos, s->sink->uplink_of->source))) + restart(d); + } + } + } + return PA_HOOK_OK; } @@ -240,12 +278,12 @@ static pa_hook_result_t source_output_move_start_hook_cb(pa_core *c, pa_source_o if (pa_sink_check_suspend(s->source->monitor_of, NULL, s) <= 0) d = pa_hashmap_get(u->device_infos, s->source->monitor_of); } else { - if (pa_source_check_suspend(s->source, s) <= 0) + if (pa_source_check_suspend(s->source, NULL, s) <= 0) d = pa_hashmap_get(u->device_infos, s->source); } if (d) - restart(d); + restart_check_uplink(d, s, u); return PA_HOOK_OK; } @@ -278,9 +316,14 @@ static pa_hook_result_t sink_input_state_changed_hook_cb(pa_core *c, pa_sink_inp pa_sink_input_assert_ref(s); pa_assert(u); - if (s->state == PA_SINK_INPUT_RUNNING && s->sink) - if ((d = pa_hashmap_get(u->device_infos, s->sink))) - resume(d); + if (s->sink) { + if ((d = pa_hashmap_get(u->device_infos, s->sink))) { + if (s->state == PA_SINK_INPUT_RUNNING) + resume(d); + else if (s->state == PA_SINK_INPUT_CORKED && pa_sink_check_suspend(s->sink, NULL, NULL) <= 0) + restart(d); + } + } return PA_HOOK_OK; } @@ -289,9 +332,9 @@ static pa_hook_result_t source_output_state_changed_hook_cb(pa_core *c, pa_sourc pa_assert(c); pa_source_output_assert_ref(s); pa_assert(u); + struct device_info *d = NULL; if (s->state == PA_SOURCE_OUTPUT_RUNNING && s->source) { - struct device_info *d; if (s->source->monitor_of) d = pa_hashmap_get(u->device_infos, s->source->monitor_of); @@ -300,6 +343,15 @@ static pa_hook_result_t source_output_state_changed_hook_cb(pa_core *c, pa_sourc if (d) resume(d); + } else if (s->state == PA_SOURCE_OUTPUT_CORKED && s->source) { + + if (s->source->monitor_of && pa_sink_check_suspend(s->source->monitor_of, NULL, NULL) <= 0) + d = pa_hashmap_get(u->device_infos, s->source->monitor_of); + else if (pa_source_check_suspend(s->source, NULL, NULL) <= 0) + d = pa_hashmap_get(u->device_infos, s->source); + + if (d) + restart_check_uplink(d, NULL, u); } return PA_HOOK_OK; @@ -349,7 +401,7 @@ static pa_hook_result_t device_new_hook_cb(pa_core *c, pa_object *o, struct user pa_hashmap_put(u->device_infos, o, d); if ((d->sink && pa_sink_check_suspend(d->sink, NULL, NULL) <= 0) || - (d->source && pa_source_check_suspend(d->source, NULL) <= 0)) + (d->source && pa_source_check_suspend(d->source, NULL, NULL) <= 0)) restart(d); return PA_HOOK_OK; @@ -407,7 +459,7 @@ static pa_hook_result_t device_state_changed_hook_cb(pa_core *c, pa_object *o, s } else if (pa_source_isinstance(o)) { pa_source *s = PA_SOURCE(o); - if (pa_source_check_suspend(s, NULL) <= 0) + if (pa_source_check_suspend(s, NULL, NULL) <= 0) if (PA_SOURCE_IS_OPENED(s->state)) restart(d); } diff --git a/src/modules/module-virtual-source.c b/src/modules/module-virtual-source.c index 4e39a53ea..c7a39650a 100644 --- a/src/modules/module-virtual-source.c +++ b/src/modules/module-virtual-source.c @@ -67,14 +67,6 @@ struct userdata { pa_vsource *vsource; unsigned channels; - - /* optional fields for uplink sink */ - pa_sink *sink; - pa_usec_t block_usec; - pa_memblockq *sink_memblockq; - pa_rtpoll *rtpoll; - bool auto_desc; - }; static const char* const valid_modargs[] = { @@ -88,6 +80,7 @@ static const char* const valid_modargs[] = { "channel_map", "use_volume_sharing", "force_flat_volume", + "autoloaded", NULL }; @@ -100,141 +93,8 @@ static void filter_process_chunk(uint8_t *src, uint8_t *dst, unsigned in_count, nbytes = in_count * pa_frame_size(&u->vsource->source->sample_spec); - /* if uplink sink exists, pull data from there; simplify by using - same length as chunk provided by source */ - if (u->sink && (u->sink->thread_info.state == PA_SINK_RUNNING)) { - pa_memchunk tchunk; - pa_mix_info streams[2]; - pa_memchunk chunk; - void *src_copy; - int ch; - pa_source_output *o; - - /* Hmm, process any rewind request that might be queued up */ - pa_sink_process_rewind(u->sink, 0); - - /* get data from the sink */ - while (pa_memblockq_peek(u->sink_memblockq, &tchunk) < 0) { - pa_memchunk nchunk; - - /* make sure we get nbytes from the sink with render_full, - otherwise we cannot mix with the uplink */ - pa_sink_render_full(u->sink, nbytes, &nchunk); - pa_memblockq_push(u->sink_memblockq, &nchunk); - pa_memblock_unref(nchunk.memblock); - } - pa_assert(tchunk.length == nbytes); - - /* move the read pointer for sink memblockq */ - pa_memblockq_drop(u->sink_memblockq, tchunk.length); - - o = u->vsource->output_from_master; - - /* allocate source chunk */ - chunk.index = 0; - chunk.length = nbytes; - chunk.memblock = pa_memblock_new(o->source->core->mempool, nbytes); - pa_assert(chunk.memblock); - - /* Copy source data to chunk */ - src_copy = pa_memblock_acquire_chunk(&chunk); - memcpy(src_copy, src, nbytes); - - /* set-up mixing structure - volume was taken care of in sink and source already */ - streams[0].chunk = chunk; - for(ch=0;chsample_spec.channels;ch++) - streams[0].volume.values[ch] = PA_VOLUME_NORM; /* FIXME */ - streams[0].volume.channels = o->sample_spec.channels; - - streams[1].chunk = tchunk; - for(ch=0;chsample_spec.channels;ch++) - streams[1].volume.values[ch] = PA_VOLUME_NORM; /* FIXME */ - streams[1].volume.channels = o->sample_spec.channels; - - /* do mixing */ - pa_mix(streams, /* 2 streams to be mixed */ - 2, - dst, /* put result in dst */ - nbytes, /* same length as input */ - (const pa_sample_spec *)&o->sample_spec, /* same sample spec for input and output */ - NULL, /* no volume information */ - false); /* no mute */ - - pa_memblock_release(chunk.memblock); - pa_memblock_unref(tchunk.memblock); - pa_memblock_unref(chunk.memblock); - } else - /* Copy input to output */ - memcpy(dst, src, nbytes); -} - -/* When the source output moves, the asyncmsgq of the uplink sink has - * to change as well */ -static void source_output_moving_cb(pa_source_output *o, pa_source *dest) { - struct userdata *u; - - pa_assert(u = o->userdata); - - pa_virtual_source_output_moving(o, dest); - if (dest && u->sink) { - pa_sink_set_asyncmsgq(u->sink, dest->asyncmsgq); - } -} - -/* Called from I/O thread context */ -static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { - - switch (code) { - - case PA_SINK_MESSAGE_GET_LATENCY: - - /* there's no real latency here */ - *((int64_t*) data) = 0; - - return 0; - } - - return pa_sink_process_msg(o, code, data, offset, chunk); -} - -/* Called from main context */ -static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { - struct userdata *u; - - pa_sink_assert_ref(s); - pa_assert_se(u = s->userdata); - - if (!PA_SINK_IS_LINKED(state)) { - return 0; - } - - if (state == PA_SINK_RUNNING) { - /* need to wake-up source if it was suspended */ - pa_log_debug("Resuming source %s, because its uplink sink became active.", u->vsource->source->name); - pa_source_suspend(u->vsource->source, false, PA_SUSPEND_ALL); - - /* FIXME: if there's no client connected, the source will suspend - and playback will be stuck. You'd want to prevent the source from - sleeping when the uplink sink is active; even if the audio is - discarded at least the app isn't stuck */ - - } else { - /* nothing to do, if the sink becomes idle or suspended let - module-suspend-idle handle the sources later */ - } - - return 0; -} - -static void sink_update_requested_latency_cb(pa_sink *s) { - struct userdata *u; - - pa_sink_assert_ref(s); - pa_assert_se(u = s->userdata); - - /* FIXME: there's no latency support */ - + /* Copy input to output */ + memcpy(dst, src, nbytes); } int pa__init(pa_module*m) { @@ -245,10 +105,6 @@ int pa__init(pa_module*m) { pa_source *master=NULL; bool use_volume_sharing = true; - /* optional for uplink_sink */ - pa_sink_new_data sink_data; - size_t nbytes; - pa_assert(m); if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { @@ -280,75 +136,13 @@ int pa__init(pa_module*m) { m->userdata = u; u->channels = ss.channels; - /* The rtpoll created here is never run. It is only necessary to avoid crashes - * when module-virtual-source is used together with module-loopback or - * module-combine-sink. Both modules base their asyncmsq on the rtpoll provided - * by the sink. module-loopback and combine-sink only work because they - * call pa_asyncmsq_process_one() themselves. */ - u->rtpoll = pa_rtpoll_new(); - - /* Create virtual source */ + /* Create virtual source */ if (!(u->vsource = pa_virtual_source_create(master, "vsource", "Virtual Source", &ss, &map, &ss, &map, m, u, ma, use_volume_sharing, true))) goto fail; /* Set callback for virtual source */ u->vsource->process_chunk = filter_process_chunk; - u->vsource->output_from_master->moving = source_output_moving_cb; - - /* Create optional uplink sink */ - pa_sink_new_data_init(&sink_data); - sink_data.driver = __FILE__; - sink_data.module = m; - if ((sink_data.name = pa_xstrdup(pa_modargs_get_value(ma, "uplink_sink", NULL)))) { - pa_sink_new_data_set_sample_spec(&sink_data, &ss); - pa_sink_new_data_set_channel_map(&sink_data, &map); - pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, master->name); - pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "uplink sink"); - pa_proplist_sets(sink_data.proplist, "device.uplink_sink.name", sink_data.name); - - if ((u->auto_desc = !pa_proplist_contains(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION))) { - const char *z; - - z = pa_proplist_gets(master->proplist, PA_PROP_DEVICE_DESCRIPTION); - pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Uplink Sink %s on %s", sink_data.name, z ? z : master->name); - } - - u->sink_memblockq = pa_memblockq_new("module-virtual-source sink_memblockq", 0, MEMBLOCKQ_MAXLENGTH, 0, &ss, 1, 1, 0, NULL); - if (!u->sink_memblockq) { - pa_sink_new_data_done(&sink_data); - pa_log("Failed to create sink memblockq."); - goto fail; - } - - u->sink = pa_sink_new(m->core, &sink_data, 0); /* FIXME, sink has no capabilities */ - pa_sink_new_data_done(&sink_data); - - if (!u->sink) { - pa_log("Failed to create sink."); - goto fail; - } - - u->sink->parent.process_msg = sink_process_msg_cb; - u->sink->update_requested_latency = sink_update_requested_latency_cb; - u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; - u->sink->userdata = u; - - pa_sink_set_asyncmsgq(u->sink, master->asyncmsgq); - pa_sink_set_rtpoll(u->sink, u->rtpoll); - - /* FIXME: no idea what I am doing here */ - u->block_usec = BLOCK_USEC; - nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec); - pa_sink_set_max_rewind(u->sink, 0); - pa_sink_set_max_request(u->sink, nbytes); - - pa_sink_put(u->sink); - } else { - pa_sink_new_data_done(&sink_data); - /* optional uplink sink not enabled */ - u->sink = NULL; - } if (pa_virtual_source_activate(u->vsource) < 0) goto fail; @@ -386,16 +180,5 @@ void pa__done(pa_module*m) { if (u->vsource) pa_virtual_source_destroy(u->vsource); - if (u->sink) { - pa_sink_unlink(u->sink); - pa_sink_unref(u->sink); - } - - if (u->sink_memblockq) - pa_memblockq_free(u->sink_memblockq); - - if (u->rtpoll) - pa_rtpoll_free(u->rtpoll); - pa_xfree(u); } diff --git a/src/modules/virtual-source-common.c b/src/modules/virtual-source-common.c index cbbbadd56..3f9cf61d5 100644 --- a/src/modules/virtual-source-common.c +++ b/src/modules/virtual-source-common.c @@ -22,8 +22,10 @@ #include #include +#include #include +#include PA_DEFINE_PRIVATE_CLASS(pa_vsource, pa_msgobject); #define PA_VSOURCE(o) (pa_vsource_cast(o)) @@ -42,6 +44,11 @@ enum { VSOURCE_MESSAGE_OUTPUT_ATTACHED }; +struct uplink_data { + pa_vsource *vsource; + pa_memblockq *memblockq; +}; + /* Helper functions */ static inline pa_source_output* get_output_from_source(pa_source *s) { @@ -121,6 +128,8 @@ static void set_latency_range_within_thread(pa_vsource *vsource) { } pa_source_set_latency_range_within_thread(s, min_latency, max_latency); + if (vsource->uplink_sink) + pa_sink_set_latency_range_within_thread(vsource->uplink_sink, min_latency, max_latency); } /* Called from I/O thread context */ @@ -136,6 +145,134 @@ static void set_memblockq_rewind(pa_vsource *vsource) { } } +/* Uplink sink callbacks */ + +/* Called from I/O thread context */ +static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + pa_sink *s; + struct uplink_data *uplink; + + s = PA_SINK(o); + uplink = s->userdata; + pa_assert(uplink); + + switch (code) { + + case PA_SINK_MESSAGE_GET_LATENCY: + + /* While the sink is not opened or if we have not received any data yet, + * simply return 0 as latency */ + if (!PA_SINK_IS_OPENED(s->thread_info.state)) { + *((int64_t*) data) = 0; + return 0; + } + + *((int64_t*) data) = pa_bytes_to_usec(pa_memblockq_get_length(uplink->memblockq), &s->sample_spec); + *((int64_t*) data) -= pa_source_get_latency_within_thread(uplink->vsource->source, true); + + return 0; + } + + return pa_sink_process_msg(o, code, data, offset, chunk); +} + +/* Called from main context */ +static int sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause) { + pa_vsource *vsource; + struct uplink_data *uplink; + + pa_sink_assert_ref(s); + uplink = s->userdata; + pa_assert(uplink); + vsource = uplink->vsource; + pa_assert(vsource); + + if (!PA_SINK_IS_LINKED(state)) { + return 0; + } + + /* need to wake-up source if it was suspended */ + if (!PA_SINK_IS_OPENED(s->state) && PA_SINK_IS_OPENED(state) && !PA_SOURCE_IS_OPENED(vsource->source->state) && PA_SOURCE_IS_LINKED(vsource->source->state)) { + pa_log_debug("Resuming source %s, because its uplink sink became active.", vsource->source->name); + pa_source_suspend(vsource->source, false, PA_SUSPEND_IDLE); + } + + return 0; +} + +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause) { + struct uplink_data *uplink; + + pa_sink_assert_ref(s); + uplink = s->userdata; + pa_assert(uplink); + + if (!PA_SINK_IS_OPENED(new_state) && PA_SINK_IS_OPENED(s->thread_info.state)) { + pa_memblockq_flush_write(uplink->memblockq, true); + pa_sink_set_max_request_within_thread(s, 0); + pa_sink_set_max_rewind_within_thread(s, 0); + } + + return 0; +} + +/* Called from I/O thread context */ +static void sink_update_requested_latency(pa_sink *s) { + struct uplink_data *uplink; + pa_usec_t latency; + size_t rewind_size; + + pa_sink_assert_ref(s); + uplink = s->userdata; + pa_assert(uplink); + + if (!PA_SINK_IS_LINKED(s->thread_info.state)) + return; + + latency = pa_sink_get_requested_latency_within_thread(s); + if (latency == (pa_usec_t) -1) + latency = s->thread_info.max_latency; + rewind_size = pa_usec_to_bytes(latency, &s->sample_spec); + pa_memblockq_set_maxrewind(uplink->memblockq, rewind_size); + + pa_sink_set_max_request_within_thread(s, rewind_size); + pa_sink_set_max_rewind_within_thread(s, rewind_size); +} + +static void sink_process_rewind(pa_sink *s) { + struct uplink_data *uplink; + size_t rewind_nbytes, in_buffer; + + uplink = s->userdata; + pa_assert(uplink); + + rewind_nbytes = s->thread_info.rewind_nbytes; + + if (!PA_SINK_IS_OPENED(s->thread_info.state) || rewind_nbytes <= 0) + goto finish; + + pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes); + + in_buffer = pa_memblockq_get_length(uplink->memblockq); + if (in_buffer == 0) { + pa_log_debug("Memblockq empty, cannot rewind"); + goto finish; + } + + if (rewind_nbytes > in_buffer) + rewind_nbytes = in_buffer; + + pa_memblockq_seek(uplink->memblockq, -rewind_nbytes, PA_SEEK_RELATIVE, true); + pa_sink_process_rewind(s, rewind_nbytes); + + pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes); + return; + +finish: + pa_sink_process_rewind(s, 0); +} + /* Source callbacks */ /* Called from I/O thread context */ @@ -230,15 +367,34 @@ int pa_virtual_source_process_msg(pa_msgobject *obj, int code, void *data, int64 /* Called from main context */ int pa_virtual_source_set_state_in_main_thread(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause) { pa_source_output *o; + pa_vsource *vsource; + bool suspend_cause_changed; pa_source_assert_ref(s); o = get_output_from_source(s); pa_assert(o); + vsource = s->vsource; + pa_assert(vsource); if (!PA_SOURCE_IS_LINKED(state) || !PA_SOURCE_OUTPUT_IS_LINKED(o->state)) return 0; + suspend_cause_changed = (suspend_cause != s->suspend_cause); + if (vsource->uplink_sink && PA_SINK_IS_LINKED(vsource->uplink_sink->state) && suspend_cause_changed) { + /* If the source is suspended for other reasons than being idle, the uplink sink + * should be suspended using the same reasons */ + if (suspend_cause != PA_SUSPEND_IDLE && state == PA_SOURCE_SUSPENDED) { + suspend_cause = suspend_cause & ~PA_SUSPEND_IDLE; + pa_sink_suspend(vsource->uplink_sink, true, suspend_cause); + } else if (PA_SOURCE_IS_OPENED(state) && s->suspend_cause != PA_SUSPEND_IDLE) { + /* If the source is resuming, the old suspend cause of the source should + * be removed from the sink unless the old suspend cause was idle. */ + suspend_cause = s->suspend_cause & ~PA_SUSPEND_IDLE; + pa_sink_suspend(vsource->uplink_sink, false, suspend_cause); + } + } + pa_source_output_cork(o, state == PA_SOURCE_SUSPENDED); return 0; } @@ -335,6 +491,86 @@ void pa_virtual_source_set_mute(pa_source *s) { pa_source_output_set_mute(o, s->muted, s->save_muted); } +/* Post data, mix in uplink sink */ +void pa_virtual_source_post(pa_source *s, const pa_memchunk *chunk) { + pa_vsource *vsource; + + vsource = s->vsource; + pa_assert(vsource); + + /* if uplink sink exists, pull data from there; simplify by using + same length as chunk provided by source */ + if (vsource->uplink_sink && PA_SINK_IS_OPENED(vsource->uplink_sink->thread_info.state)) { + pa_memchunk tchunk; + pa_mix_info streams[2]; + int ch; + uint8_t *dst; + pa_memchunk dst_chunk; + size_t nbytes; + struct uplink_data *uplink; + + uplink = vsource->uplink_sink->userdata; + pa_assert(uplink); + + /* Hmm, process any rewind request that might be queued up */ + if (PA_UNLIKELY(vsource->uplink_sink->thread_info.rewind_requested)) + sink_process_rewind(vsource->uplink_sink); + + nbytes = chunk->length; + + /* get data from the sink */ + while (pa_memblockq_get_length(uplink->memblockq) < nbytes) { + pa_memchunk nchunk; + size_t missing; + + missing = nbytes - pa_memblockq_get_length(uplink->memblockq); + pa_sink_render(vsource->uplink_sink, missing, &nchunk); + pa_memblockq_push(uplink->memblockq, &nchunk); + pa_memblock_unref(nchunk.memblock); + } + pa_memblockq_peek_fixed_size(uplink->memblockq, nbytes, &tchunk); + pa_assert(tchunk.length == nbytes); + + /* move the read pointer for sink memblockq */ + pa_memblockq_drop(uplink->memblockq, tchunk.length); + + /* Prepare output chunk */ + dst_chunk.index = 0; + dst_chunk.length = nbytes; + dst_chunk.memblock = pa_memblock_new(vsource->core->mempool, dst_chunk.length); + dst = pa_memblock_acquire_chunk(&dst_chunk); + + /* set-up mixing structure + volume was taken care of in sink and source already */ + streams[0].chunk = *chunk; + for(ch=0; ch < s->sample_spec.channels; ch++) + streams[0].volume.values[ch] = PA_VOLUME_NORM; + streams[0].volume.channels = s->sample_spec.channels; + + streams[1].chunk = tchunk; + for(ch=0; ch < s->sample_spec.channels;ch++) + streams[1].volume.values[ch] = PA_VOLUME_NORM; + streams[1].volume.channels = s->sample_spec.channels; + + /* do mixing */ + pa_mix(streams, /* 2 streams to be mixed */ + 2, + dst, /* put result in dst */ + nbytes, /* same length as input */ + (const pa_sample_spec *)&s->sample_spec, /* same sample spec for input and output */ + NULL, /* no volume information */ + false); /* no mute */ + + pa_memblock_release(dst_chunk.memblock); + + pa_source_post(s, &dst_chunk); + + pa_memblock_unref(tchunk.memblock); + pa_memblock_unref(dst_chunk.memblock); + } else + pa_source_post(s, chunk); +} + /* Source output callbacks */ /* Called from output thread context */ @@ -355,7 +591,7 @@ void pa_virtual_source_output_push(pa_source_output *o, const pa_memchunk *chunk return; if (!vsource->process_chunk || !vsource->memblockq) { - pa_source_post(s, chunk); + pa_virtual_source_post(s, chunk); return; } @@ -427,7 +663,7 @@ void pa_virtual_source_output_push(pa_source_output *o, const pa_memchunk *chunk pa_memblock_unref(schunk.memblock); /* Post data */ - pa_source_post(s, &tchunk); + pa_virtual_source_post(s, &tchunk); pa_memblock_unref(tchunk.memblock); length = pa_memblockq_get_length(vsource->memblockq); @@ -461,8 +697,16 @@ void pa_virtual_source_output_process_rewind(pa_source_output *o, size_t nbytes) * pass the rewind on to the source */ if (vsource->memblockq) pa_memblockq_seek(vsource->memblockq, - nbytes, PA_SEEK_RELATIVE, true); - else + else { pa_source_process_rewind(s, nbytes * out_fs / in_fs); + if (vsource->uplink_sink && PA_SINK_IS_OPENED(vsource->uplink_sink->thread_info.state)) { + struct uplink_data *uplink; + + uplink = vsource->uplink_sink->userdata; + pa_assert(uplink); + pa_memblockq_rewind(uplink->memblockq, nbytes * out_fs / in_fs); + } + } } /* Called from source I/O thread context. */ @@ -543,6 +787,8 @@ void pa_virtual_source_output_attach(pa_source_output *o) { master_fs = pa_frame_size(&o->source->sample_spec); pa_source_set_rtpoll(s, o->source->thread_info.rtpoll); + if (vsource->uplink_sink) + pa_sink_set_rtpoll(vsource->uplink_sink, o->source->thread_info.rtpoll); set_latency_range_within_thread(vsource); @@ -574,16 +820,21 @@ void pa_virtual_source_output_attach(pa_source_output *o) { /* Called from output thread context */ void pa_virtual_source_output_detach(pa_source_output *o) { pa_source *s; + pa_vsource *vsource; pa_source_output_assert_ref(o); pa_source_output_assert_io_context(o); s = o->destination_source; pa_assert(s); + vsource = s->vsource; + pa_assert(vsource); if (PA_SOURCE_IS_LINKED(s->thread_info.state)) pa_source_detach_within_thread(s); pa_source_set_rtpoll(s, NULL); + if (vsource->uplink_sink) + pa_sink_set_rtpoll(vsource->uplink_sink, NULL); } /* Called from main thread */ @@ -614,6 +865,23 @@ void pa_virtual_source_output_kill(pa_source_output *o) { if (vsource->memblockq) pa_memblockq_free(vsource->memblockq); + /* Destroy uplink sink if present */ + if (vsource->uplink_sink) { + struct uplink_data *uplink; + + uplink = vsource->uplink_sink->userdata; + pa_sink_unlink(vsource->uplink_sink); + pa_sink_unref(vsource->uplink_sink); + + if (uplink) { + if (uplink->memblockq) + pa_memblockq_free(uplink->memblockq); + + pa_xfree(uplink); + } + vsource->uplink_sink = NULL; + } + /* Virtual sources must set the module */ m = s->module; pa_assert(m); @@ -640,7 +908,21 @@ bool pa_virtual_source_output_may_move_to(pa_source_output *o, pa_source *dest) if (vsource->autoloaded) return false; - return s != dest; + if (s == dest) + return false; + + if (vsource->uplink_sink) { + pa_source *chain_master; + + chain_master = dest; + while (chain_master->vsource && chain_master->vsource->output_from_master) + chain_master = chain_master->vsource->output_from_master->source; + + if (chain_master == vsource->uplink_sink->monitor_source) + return false; + } + + return true; } /* Called from main thread */ @@ -649,6 +931,7 @@ void pa_virtual_source_output_moving(pa_source_output *o, pa_source *dest) { pa_vsource *vsource; uint32_t idx; pa_source_output *output; + pa_sink_input *input; pa_source_output_assert_ref(o); pa_assert_ctl_context(); @@ -662,8 +945,22 @@ void pa_virtual_source_output_moving(pa_source_output *o, pa_source *dest) { pa_source_update_flags(s, PA_SOURCE_LATENCY|PA_SOURCE_DYNAMIC_LATENCY, dest->flags); pa_proplist_sets(s->proplist, PA_PROP_DEVICE_MASTER_DEVICE, dest->name); vsource->source_moving = true; - } else + if (vsource->uplink_sink) { + pa_sink_flags_t flags = 0; + + if (dest->flags & PA_SOURCE_LATENCY) + flags |= PA_SINK_LATENCY; + if (dest->flags & PA_SOURCE_DYNAMIC_LATENCY) + flags |= PA_SINK_DYNAMIC_LATENCY; + pa_sink_set_asyncmsgq(vsource->uplink_sink, dest->asyncmsgq); + pa_sink_update_flags(vsource->uplink_sink, PA_SINK_LATENCY|PA_SINK_DYNAMIC_LATENCY, flags); + pa_proplist_sets(vsource->uplink_sink->proplist, PA_PROP_DEVICE_MASTER_DEVICE, dest->name); + } + } else { pa_source_set_asyncmsgq(s, NULL); + if (vsource->uplink_sink) + pa_sink_set_asyncmsgq(vsource->uplink_sink, NULL); + } if (dest && vsource->set_description) vsource->set_description(o, dest); @@ -689,12 +986,33 @@ void pa_virtual_source_output_moving(pa_source_output *o, pa_source *dest) { pa_proplist_setf(o->proplist, PA_PROP_MEDIA_NAME, "%s Stream from %s", vsource->desc_head, pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION)); } + if (vsource->uplink_sink && dest) { + const char *z; + pa_proplist *pl; + + pl = pa_proplist_new(); + z = pa_proplist_gets(dest->proplist, PA_PROP_DEVICE_DESCRIPTION); + pa_proplist_setf(pl, PA_PROP_DEVICE_DESCRIPTION, "Uplink sink %s on %s", + pa_proplist_gets(vsource->uplink_sink->proplist, "device.uplink_sink.name"), z ? z : dest->name); + + pa_sink_update_proplist(vsource->uplink_sink, PA_UPDATE_REPLACE, pl); + pa_proplist_free(pl); + } + /* Propagate asyncmsq change to attached virtual sources */ PA_IDXSET_FOREACH(output, s->outputs, idx) { if (output->destination_source && output->moving) output->moving(output, s); } + /* Propagate asyncmsq change to virtual sinks attached to the uplink sink */ + if (vsource->uplink_sink) { + PA_IDXSET_FOREACH(input, vsource->uplink_sink->inputs, idx) { + if (input->origin_sink && input->moving) + input->moving(input, vsource->uplink_sink); + } + } + } /* Called from main context */ @@ -834,6 +1152,10 @@ int pa_virtual_source_activate(pa_vsource *vs) { return -1; } + /* Activate uplink sink */ + if (vs->uplink_sink) + pa_sink_put(vs->uplink_sink); + /* Set source output latency at startup to max_latency if specified. */ if (vs->max_latency) pa_source_output_set_requested_latency(vs->output_from_master, vs->max_latency); @@ -884,6 +1206,22 @@ void pa_virtual_source_destroy(pa_vsource *vs) { vs->source = NULL; } + /* Destroy uplink sink if present */ + if (vs->uplink_sink) { + struct uplink_data *uplink; + + uplink = vs->uplink_sink->userdata; + pa_sink_unlink(vs->uplink_sink); + pa_sink_unref(vs->uplink_sink); + + if (uplink) { + if (uplink->memblockq) + pa_memblockq_free(uplink->memblockq); + + pa_xfree(uplink); + } + } + /* We have to use pa_msgobject_unref() here because there may still be pending * VSOURCE_MESSAGE_OUTPUT_ATTACHED messages. */ pa_msgobject_unref(PA_MSGOBJECT(vs)); @@ -922,6 +1260,7 @@ pa_vsource* pa_virtual_source_vsource_new(pa_source *s) { vsource->update_filter_parameters = NULL; vsource->update_block_sizes = NULL; vsource->free_filter_parameters = NULL; + vsource->uplink_sink = NULL; return vsource; } @@ -942,6 +1281,8 @@ pa_vsource *pa_virtual_source_create(pa_source *master, const char *source_type, pa_vsource *vsource; pa_source *s; pa_source_output *o; + const char *uplink_sink; + pa_sink_new_data sink_data; /* Make sure all necessary values are set. Only userdata and source description * are allowed to be NULL. */ @@ -1093,6 +1434,70 @@ pa_vsource *pa_virtual_source_create(pa_source *master, const char *source_type, vsource->memblockq = pa_memblockq_new(tmp, 0, MEMBLOCKQ_MAXLENGTH, 0, source_output_ss, 1, 1, 0, &silence); pa_memblock_unref(silence.memblock); pa_xfree(tmp); + if (!vsource->memblockq) { + pa_log("Failed to create memblockq"); + pa_virtual_source_destroy(vsource); + return NULL; + } + } + + /* Set up uplink sink */ + uplink_sink = pa_modargs_get_value(ma, "uplink_sink", NULL); + if (uplink_sink) { + const char *z; + char *tmp; + pa_memchunk silence; + pa_sink_flags_t flags; + struct uplink_data *uplink; + + pa_sink_new_data_init(&sink_data); + sink_data.driver = m->name; + sink_data.module = m; + sink_data.name = pa_xstrdup(uplink_sink); + pa_sink_new_data_set_sample_spec(&sink_data, source_ss); + pa_sink_new_data_set_channel_map(&sink_data, source_map); + pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_MASTER_DEVICE, master->name); + pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "uplink sink"); + pa_proplist_sets(sink_data.proplist, "device.uplink_sink.name", sink_data.name); + z = pa_proplist_gets(master->proplist, PA_PROP_DEVICE_DESCRIPTION); + pa_proplist_setf(sink_data.proplist, PA_PROP_DEVICE_DESCRIPTION, "Uplink Sink %s on %s", sink_data.name, z ? z : master->name); + + flags = 0; + if (master->flags & PA_SOURCE_LATENCY) + flags = PA_SINK_LATENCY; + if (master->flags & PA_SOURCE_DYNAMIC_LATENCY) + flags |= PA_SINK_DYNAMIC_LATENCY; + vsource->uplink_sink = pa_sink_new(m->core, &sink_data, flags); + pa_sink_new_data_done(&sink_data); + + if (!vsource->uplink_sink) { + pa_log("Failed to create uplink sink"); + pa_virtual_source_destroy(vsource); + return NULL; + } + + uplink = pa_xnew0(struct uplink_data, 1); + vsource->uplink_sink->userdata = uplink; + + tmp = pa_sprintf_malloc("%s uplink sink memblockq", desc_prefix); + pa_silence_memchunk_get(&s->core->silence_cache, s->core->mempool, &silence, &s->sample_spec, 0); + uplink->memblockq = pa_memblockq_new(tmp, 0, MEMBLOCKQ_MAXLENGTH, 0, source_ss, 1, 1, 0, &silence); + pa_memblock_unref(silence.memblock); + pa_xfree(tmp); + if (!uplink->memblockq) { + pa_log("Failed to create sink memblockq"); + pa_virtual_source_destroy(vsource); + return NULL; + } + + vsource->uplink_sink->parent.process_msg = sink_process_msg; + vsource->uplink_sink->update_requested_latency = sink_update_requested_latency; + vsource->uplink_sink->set_state_in_main_thread = sink_set_state_in_main_thread; + vsource->uplink_sink->set_state_in_io_thread = sink_set_state_in_io_thread; + vsource->uplink_sink->uplink_of = vsource; + uplink->vsource = vsource; + + pa_sink_set_asyncmsgq(vsource->uplink_sink, master->asyncmsgq); } return vsource; diff --git a/src/modules/virtual-source-common.h b/src/modules/virtual-source-common.h index ff07a2ee4..f48a60018 100644 --- a/src/modules/virtual-source-common.h +++ b/src/modules/virtual-source-common.h @@ -69,3 +69,6 @@ pa_vsource* pa_virtual_source_vsource_new(pa_source *s); /* Update filter parameters */ void pa_virtual_source_request_parameter_update(pa_vsource *vs, void *parameters); + +/* Post data, mix in uplink sink */ +void pa_virtual_source_post(pa_source *s, const pa_memchunk *chunk); diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 21d7dae68..1ed24b786 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -287,6 +287,7 @@ pa_sink* pa_sink_new( s->inputs = pa_idxset_new(NULL, NULL); s->n_corked = 0; s->vsink = NULL; + s->uplink_of = NULL; s->reference_volume = s->real_volume = data->volume; pa_cvolume_reset(&s->soft_volume, s->sample_spec.channels); @@ -2552,7 +2553,7 @@ unsigned pa_sink_check_suspend(pa_sink *s, pa_sink_input *ignore_input, pa_sourc } if (s->monitor_source) - ret += pa_source_check_suspend(s->monitor_source, ignore_output); + ret += pa_source_check_suspend(s->monitor_source, ignore_input, ignore_output); return ret; } @@ -3258,6 +3259,9 @@ void pa_sink_invalidate_requested_latency(pa_sink *s, bool dynamic) { if (PA_SINK_IS_LINKED(s->thread_info.state)) { + if (s->uplink_of && s->uplink_of->source) + pa_source_invalidate_requested_latency(s->uplink_of->source, dynamic); + if (s->update_requested_latency) s->update_requested_latency(s); diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index 1cc44fefc..9e9c9ee9c 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -173,6 +173,7 @@ struct pa_sink { unsigned n_corked; pa_source *monitor_source; pa_vsink *vsink; /* non-NULL only for filter sinks */ + pa_vsource *uplink_of; /* non-NULL only for uplink sinks */ pa_volume_t base_volume; /* shall be constant */ unsigned n_volume_steps; /* shall be constant */ diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index cbffc9b93..969142984 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -2027,7 +2027,7 @@ unsigned pa_source_used_by(pa_source *s) { } /* Called from main thread */ -unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore) { +unsigned pa_source_check_suspend(pa_source *s, pa_sink_input *ignore_input, pa_source_output *ignore_output) { unsigned ret; pa_source_output *o; uint32_t idx; @@ -2041,7 +2041,7 @@ unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore) { ret = 0; PA_IDXSET_FOREACH(o, s->outputs, idx) { - if (o == ignore) + if (o == ignore_output) continue; /* We do not assert here. It is perfectly valid for a source output to @@ -2061,6 +2061,9 @@ unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore) { ret ++; } + if (s->vsource && s->vsource->uplink_sink) + ret += pa_sink_check_suspend(s->vsource->uplink_sink, ignore_input, ignore_output); + return ret; } @@ -2408,6 +2411,16 @@ pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s) { (result == (pa_usec_t) -1 || result > o->thread_info.requested_source_latency)) result = o->thread_info.requested_source_latency; + if (s->vsource && s->vsource->uplink_sink) { + pa_usec_t uplink_sink_latency; + + uplink_sink_latency = pa_sink_get_requested_latency_within_thread(s->vsource->uplink_sink); + + if (uplink_sink_latency != (pa_usec_t) -1 && + (result == (pa_usec_t) -1 || result > uplink_sink_latency)) + result = uplink_sink_latency; + } + if (result != (pa_usec_t) -1) result = PA_CLAMP(result, s->thread_info.min_latency, s->thread_info.max_latency); diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h index 41dab0046..52464ee16 100644 --- a/src/pulsecore/source.h +++ b/src/pulsecore/source.h @@ -89,6 +89,7 @@ typedef struct pa_vsource { * In this case, overlap_frames contains the maximum * number of history frames. */ pa_usec_t max_latency; /* Maximum latency allowed for the source, 0 if unused */ + pa_sink *uplink_sink; /* Uplink sink if present, otherwise NULL */ /* Callback to process a chunk of data by the filter. Called from I/O thread * context. May be NULL */ @@ -499,7 +500,7 @@ unsigned pa_source_used_by(pa_source *s); /* Number of connected streams that ar /* Returns how many streams are active that don't allow suspensions. If * "ignore" is non-NULL, that stream is not included in the count. */ -unsigned pa_source_check_suspend(pa_source *s, pa_source_output *ignore); +unsigned pa_source_check_suspend(pa_source *s, pa_sink_input *ignore_input, pa_source_output *ignore_output); const char *pa_source_state_to_string(pa_source_state_t state); diff --git a/src/pulsecore/typedefs.h b/src/pulsecore/typedefs.h index 3652f8f76..a80117c8d 100644 --- a/src/pulsecore/typedefs.h +++ b/src/pulsecore/typedefs.h @@ -32,6 +32,7 @@ typedef struct pa_sink_input pa_sink_input; typedef struct pa_source pa_source; typedef struct pa_source_volume_change pa_source_volume_change; typedef struct pa_source_output pa_source_output; +typedef struct pa_vsource pa_vsource; #endif