Big pile of dependant changes:

* Change pa_memblockq to carry silence memchunk instead of memblock and adapt all users
* Add new call pa_sink_input_get_silence() to get the suitable silence block for a sink input
* Implement monitoring sources properly by adding a delay queue to even out rewinds
* Remove pa_{sink|source}_ping() becaused unnecessary these days and not used
* Fix naming of various rewind related functions. Downstream is now _request_rewind(), upstream is _process_rewind()
* Fix volume adjustments for a single stream in pa_sink_render()
* Properly handle prebuf-style buffer underruns in pa_sink_input
* Don't allow rewinding to more than the last underrun
* Rework default buffering metrics selection for native protocol
* New functions pa_memblockq_prebuf_active(), pa_memblockq_silence()
* add option "mixer_reset=" to module-alsa-sink
* Other cleanups


git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/glitch-free@2283 fefdeb5f-60dc-0310-8127-8f9354f1896f
This commit is contained in:
Lennart Poettering 2008-04-20 20:16:55 +00:00
parent 7556ef5bfc
commit 62e7bc17c4
20 changed files with 950 additions and 531 deletions

View file

@ -70,11 +70,30 @@ PA_MODULE_USAGE(
"mmap=<enable memory mapping?> " "mmap=<enable memory mapping?> "
"tsched=<enable system timer based scheduling mode?> " "tsched=<enable system timer based scheduling mode?> "
"tsched_buffer_size=<buffer size when using timer based scheduling> " "tsched_buffer_size=<buffer size when using timer based scheduling> "
"tsched_buffer_watermark=<lower fill watermark>"); "tsched_buffer_watermark=<lower fill watermark> "
"mixer_reset=<reset hw volume and mute settings to sane defaults when falling back to software?>");
static const char* const valid_modargs[] = {
"sink_name",
"device",
"device_id",
"format",
"rate",
"channels",
"channel_map",
"fragments",
"fragment_size",
"mmap",
"tsched",
"tsched_buffer_size",
"tsched_buffer_watermark",
"mixer_reset",
NULL
};
#define DEFAULT_DEVICE "default" #define DEFAULT_DEVICE "default"
#define DEFAULT_TSCHED_BUFFER_USEC (2*PA_USEC_PER_SEC) #define DEFAULT_TSCHED_BUFFER_USEC (10*PA_USEC_PER_SEC) /* 10s */
#define DEFAULT_TSCHED_WATERMARK_USEC (20*PA_USEC_PER_MSEC) #define DEFAULT_TSCHED_WATERMARK_USEC (200*PA_USEC_PER_MSEC) /* 20ms */
struct userdata { struct userdata {
pa_core *core; pa_core *core;
@ -112,23 +131,7 @@ struct userdata {
int64_t frame_index; int64_t frame_index;
snd_pcm_sframes_t hwbuf_unused_frames; snd_pcm_sframes_t hwbuf_unused_frames;
}; snd_pcm_sframes_t avail_min_frames;
static const char* const valid_modargs[] = {
"sink_name",
"device",
"device_id",
"format",
"rate",
"channels",
"channel_map",
"fragments",
"fragment_size",
"mmap",
"tsched",
"tsched_buffer_size",
"tsched_buffer_watermark",
NULL
}; };
static int mmap_write(struct userdata *u) { static int mmap_write(struct userdata *u) {
@ -144,6 +147,7 @@ static int mmap_write(struct userdata *u) {
int err; int err;
const snd_pcm_channel_area_t *areas; const snd_pcm_channel_area_t *areas;
snd_pcm_uframes_t offset, frames; snd_pcm_uframes_t offset, frames;
size_t left_to_play;
snd_pcm_hwsync(u->pcm_handle); snd_pcm_hwsync(u->pcm_handle);
@ -177,8 +181,26 @@ static int mmap_write(struct userdata *u) {
if (PA_UNLIKELY(n <= u->hwbuf_unused_frames)) if (PA_UNLIKELY(n <= u->hwbuf_unused_frames))
return work_done; return work_done;
if (n*u->frame_size < u->hwbuf_size)
left_to_play = u->hwbuf_size - (n*u->frame_size);
else
left_to_play = 0;
pa_log_debug("%0.2f ms left to play", (double) pa_bytes_to_usec(left_to_play, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
if (left_to_play <= 0 && !u->first) {
u->tsched_watermark *=2;
if (u->tsched_watermark >= u->hwbuf_size)
u->tsched_watermark = u->hwbuf_size-u->frame_size;
pa_log_notice("Underrun! Increasing wakeup watermark to %0.2f", (double) pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec) / PA_USEC_PER_MSEC);
}
frames = n = n - u->hwbuf_unused_frames; frames = n = n - u->hwbuf_unused_frames;
pa_log_debug("%llu frames to write", (unsigned long long) frames);
if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) { if (PA_UNLIKELY((err = snd_pcm_mmap_begin(u->pcm_handle, &areas, &offset, &frames)) < 0)) {
pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err)); pa_log_debug("snd_pcm_mmap_begin: %s", snd_strerror(err));
@ -357,14 +379,21 @@ static void update_smoother(struct userdata *u) {
snd_pcm_hwsync(u->pcm_handle); snd_pcm_hwsync(u->pcm_handle);
snd_pcm_avail_update(u->pcm_handle); snd_pcm_avail_update(u->pcm_handle);
if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) { /* if (PA_UNLIKELY((err = snd_pcm_status(u->pcm_handle, status)) < 0)) { */
/* pa_log("Failed to query DSP status data: %s", snd_strerror(err)); */
/* return; */
/* } */
/* delay = snd_pcm_status_get_delay(status); */
if (PA_UNLIKELY((err = snd_pcm_delay(u->pcm_handle, &delay)) < 0)) {
pa_log("Failed to query DSP status data: %s", snd_strerror(err)); pa_log("Failed to query DSP status data: %s", snd_strerror(err));
return; return;
} }
delay = snd_pcm_status_get_delay(status);
frames = u->frame_index - delay; frames = u->frame_index - delay;
pa_log_debug("frame_index = %llu, delay = %llu, p = %llu", (unsigned long long) u->frame_index, (unsigned long long) delay, (unsigned long long) frames); /* pa_log_debug("frame_index = %llu, delay = %llu, p = %llu", (unsigned long long) u->frame_index, (unsigned long long) delay, (unsigned long long) frames); */
/* snd_pcm_status_get_tstamp(status, &timestamp); */ /* snd_pcm_status_get_tstamp(status, &timestamp); */
/* pa_rtclock_from_wallclock(&timestamp); */ /* pa_rtclock_from_wallclock(&timestamp); */
@ -450,7 +479,7 @@ static pa_usec_t hw_sleep_time(struct userdata *u) {
if (usec <= 0) if (usec <= 0)
usec = pa_bytes_to_usec(u->hwbuf_size, &u->sink->sample_spec); usec = pa_bytes_to_usec(u->hwbuf_size, &u->sink->sample_spec);
pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); /* pa_log_debug("hw buffer time: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
wm = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec); wm = pa_bytes_to_usec(u->tsched_watermark, &u->sink->sample_spec);
@ -459,25 +488,27 @@ static pa_usec_t hw_sleep_time(struct userdata *u) {
else else
usec /= 2; usec /= 2;
pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); /* pa_log_debug("after watermark: %u ms", (unsigned) (usec / PA_USEC_PER_MSEC)); */
return usec; return usec;
} }
static void update_hwbuf_unused_frames(struct userdata *u) { static int update_sw_params(struct userdata *u) {
pa_usec_t usec; int err;
size_t b; pa_usec_t latency;
pa_assert(u); pa_assert(u);
if ((usec = pa_sink_get_requested_latency_within_thread(u->sink)) <= 0) { /* Use the full buffer if noone asked us for anything specific */
/* Use the full buffer if noone asked us for anything
* specific */
u->hwbuf_unused_frames = 0; u->hwbuf_unused_frames = 0;
return;
}
b = pa_usec_to_bytes(usec, &u->sink->sample_spec); if (u->use_tsched)
if ((latency = pa_sink_get_requested_latency_within_thread(u->sink)) > 0) {
size_t b;
pa_log("latency set to %llu", (unsigned long long) latency);
b = pa_usec_to_bytes(latency, &u->sink->sample_spec);
/* We need at least one sample in our buffer */ /* We need at least one sample in our buffer */
@ -487,38 +518,28 @@ static void update_hwbuf_unused_frames(struct userdata *u) {
u->hwbuf_unused_frames = u->hwbuf_unused_frames =
PA_LIKELY(b < u->hwbuf_size) ? PA_LIKELY(b < u->hwbuf_size) ?
((u->hwbuf_size - b) / u->frame_size) : 0; ((u->hwbuf_size - b) / u->frame_size) : 0;
} }
static int update_sw_params(struct userdata *u) { pa_log("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames);
snd_pcm_uframes_t avail_min;
int err;
pa_assert(u); /* We need at last one frame in the used part of the buffer */
u->avail_min_frames = u->hwbuf_unused_frames + 1;
if (u->use_tsched) { if (u->use_tsched) {
pa_usec_t usec; pa_usec_t usec;
usec = hw_sleep_time(u); usec = hw_sleep_time(u);
avail_min = pa_usec_to_bytes(usec, &u->sink->sample_spec) / u->frame_size; u->avail_min_frames += (pa_usec_to_bytes(usec, &u->sink->sample_spec) / u->frame_size);
}
if (avail_min <= 0) pa_log("setting avail_min=%lu", (unsigned long) u->avail_min_frames);
avail_min = 1;
} else if ((err = pa_alsa_set_sw_params(u->pcm_handle, u->avail_min_frames)) < 0) {
avail_min = 1;
pa_log("setting avail_min=%lu", (unsigned long) avail_min);
if ((err = pa_alsa_set_sw_params(u->pcm_handle, avail_min)) < 0) {
pa_log("Failed to set software parameters: %s", snd_strerror(err)); pa_log("Failed to set software parameters: %s", snd_strerror(err));
return err; return err;
} }
update_hwbuf_unused_frames(u);
pa_log("hwbuf_unused_frames=%lu", (unsigned long) u->hwbuf_unused_frames);
return 0; return 0;
} }
@ -808,23 +829,23 @@ static void thread_func(void *userdata) {
pa_thread_mq_install(&u->thread_mq); pa_thread_mq_install(&u->thread_mq);
pa_rtpoll_install(u->rtpoll); pa_rtpoll_install(u->rtpoll);
/* update_hwbuf_unused_frames(u); */
for (;;) { for (;;) {
int ret; int ret;
pa_log_debug("loop"); /* pa_log_debug("loop"); */
/* Render some data and write it to the dsp */ /* Render some data and write it to the dsp */
if (PA_SINK_OPENED(u->sink->thread_info.state)) { if (PA_SINK_OPENED(u->sink->thread_info.state)) {
int work_done = 0; int work_done = 0;
if (u->sink->thread_info.rewind_nbytes > 0 && u->use_tsched) { if (u->sink->thread_info.rewind_nbytes > 0) {
snd_pcm_sframes_t frames, limit, unused; snd_pcm_sframes_t unused;
size_t rewind_nbytes, unused_nbytes, limit_nbytes;
pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) u->sink->thread_info.rewind_nbytes); rewind_nbytes = u->sink->thread_info.rewind_nbytes;
u->sink->thread_info.rewind_nbytes = 0;
frames = u->sink->thread_info.rewind_nbytes / u->frame_size; pa_log_debug("Requested to rewind %lu bytes.", (unsigned long) rewind_nbytes);
snd_pcm_hwsync(u->pcm_handle); snd_pcm_hwsync(u->pcm_handle);
if ((unused = snd_pcm_avail_update(u->pcm_handle)) < 0) { if ((unused = snd_pcm_avail_update(u->pcm_handle)) < 0) {
@ -832,32 +853,54 @@ static void thread_func(void *userdata) {
goto fail; goto fail;
} }
limit = (u->hwbuf_size / u->frame_size) - unused; unused_nbytes = u->tsched_watermark + (size_t) unused * u->frame_size;
if (frames > limit) if (u->hwbuf_size > unused_nbytes)
frames = limit; limit_nbytes = u->hwbuf_size - unused_nbytes;
else
limit_nbytes = 0;
frames = 0; if (rewind_nbytes > limit_nbytes)
rewind_nbytes = limit_nbytes;
if (frames > 0) { if (rewind_nbytes > 0) {
snd_pcm_sframes_t in_frames, out_frames;
pa_log_debug("Limited to %lu bytes.", (unsigned long) frames * u->frame_size); pa_log_debug("Limited to %lu bytes.", (unsigned long) rewind_nbytes);
if ((frames = snd_pcm_rewind(u->pcm_handle, frames)) < 0) { in_frames = (snd_pcm_sframes_t) rewind_nbytes / u->frame_size;
pa_log("snd_pcm_rewind() failed: %s", snd_strerror(frames)); pa_log_debug("before: %lu", (unsigned long) in_frames);
if ((out_frames = snd_pcm_rewind(u->pcm_handle, in_frames)) < 0) {
pa_log("snd_pcm_rewind() failed: %s", snd_strerror(out_frames));
goto fail;
}
pa_log_debug("after: %lu", (unsigned long) out_frames);
if (out_frames > in_frames) {
snd_pcm_sframes_t sfix;
pa_log("FUCK, device rewound %lu frames more than we wanted. What a mess!", (unsigned long) (out_frames-in_frames));
if ((sfix = snd_pcm_forward(u->pcm_handle, out_frames-in_frames)) < 0) {
pa_log("snd_pcm_forward() failed: %s", snd_strerror(sfix));
goto fail; goto fail;
} }
if ((u->sink->thread_info.rewind_nbytes = frames * u->frame_size) <= 0) pa_log("Could fix by %lu", (unsigned long) sfix);
out_frames -= sfix;
}
rewind_nbytes = out_frames * u->frame_size;
if (rewind_nbytes <= 0)
pa_log_info("Tried rewind, but was apparently not possible."); pa_log_info("Tried rewind, but was apparently not possible.");
else { else {
u->frame_index -= frames; u->frame_index -= out_frames;
pa_log_debug("Rewound %lu bytes.", (unsigned long) u->sink->thread_info.rewind_nbytes); pa_log_debug("Rewound %lu bytes.", (unsigned long) rewind_nbytes);
pa_sink_process_rewind(u->sink); pa_sink_process_rewind(u->sink, rewind_nbytes);
} }
} else { } else
pa_log_debug("Mhmm, actually there is nothing to rewind."); pa_log_debug("Mhmm, actually there is nothing to rewind.");
}
} }
if (u->use_mmap) { if (u->use_mmap) {
@ -868,7 +911,7 @@ static void thread_func(void *userdata) {
goto fail; goto fail;
} }
pa_log_debug("work_done = %i", work_done); /* pa_log_debug("work_done = %i", work_done); */
if (work_done) { if (work_done) {
@ -891,13 +934,13 @@ static void thread_func(void *userdata) {
usec = hw_sleep_time(u); usec = hw_sleep_time(u);
pa_log_debug("Waking up in %0.2fms (sound card clock).", (double) usec / PA_USEC_PER_MSEC); /* pa_log_debug("Waking up in %0.2fms (sound card clock).", (double) usec / PA_USEC_PER_MSEC); */
/* Convert from the sound card time domain to the /* Convert from the sound card time domain to the
* system time domain */ * system time domain */
cusec = pa_smoother_translate(u->smoother, pa_rtclock_usec(), usec); cusec = pa_smoother_translate(u->smoother, pa_rtclock_usec(), usec);
pa_log_debug("Waking up in %0.2fms (system clock).", (double) cusec / PA_USEC_PER_MSEC); /* pa_log_debug("Waking up in %0.2fms (system clock).", (double) cusec / PA_USEC_PER_MSEC); */
/* We don't trust the conversion, so we wake up whatever comes first */ /* We don't trust the conversion, so we wake up whatever comes first */
pa_rtpoll_set_timer_relative(u->rtpoll, PA_MIN(usec, cusec)); pa_rtpoll_set_timer_relative(u->rtpoll, PA_MIN(usec, cusec));
@ -974,7 +1017,8 @@ static void thread_func(void *userdata) {
u->first = TRUE; u->first = TRUE;
} }
pa_log_debug("alsa revents = %i", revents); if (revents)
pa_log_info("Wakeup from ALSA! (%i)", revents);
} }
} }
@ -1003,7 +1047,7 @@ int pa__init(pa_module*m) {
const char *name; const char *name;
char *name_buf = NULL; char *name_buf = NULL;
pa_bool_t namereg_fail; pa_bool_t namereg_fail;
pa_bool_t use_mmap = TRUE, b, use_tsched = TRUE, d; pa_bool_t use_mmap = TRUE, b, use_tsched = TRUE, d, mixer_reset = TRUE;
pa_usec_t usec; pa_usec_t usec;
pa_sink_new_data data; pa_sink_new_data data;
static const char * const class_table[SND_PCM_CLASS_LAST+1] = { static const char * const class_table[SND_PCM_CLASS_LAST+1] = {
@ -1066,6 +1110,11 @@ int pa__init(pa_module*m) {
use_tsched = FALSE; use_tsched = FALSE;
} }
if (pa_modargs_get_value_boolean(ma, "mixer_reset", &mixer_reset) < 0) {
pa_log("Failed to parse mixer_reset argument.");
goto fail;
}
u = pa_xnew0(struct userdata, 1); u = pa_xnew0(struct userdata, 1);
u->core = m->core; u->core = m->core;
u->module = m; u->module = m;
@ -1202,7 +1251,7 @@ int pa__init(pa_module*m) {
if (class_table[snd_pcm_info_get_class(pcm_info)]) if (class_table[snd_pcm_info_get_class(pcm_info)])
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, class_table[snd_pcm_info_get_class(pcm_info)]); pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, class_table[snd_pcm_info_get_class(pcm_info)]);
pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap_rewrite" : (u->use_mmap ? "mmap" : "serial")); pa_proplist_sets(data.proplist, PA_PROP_DEVICE_ACCESS_MODE, u->use_tsched ? "mmap+timer" : (u->use_mmap ? "mmap" : "serial"));
u->sink = pa_sink_new(m->core, &data, PA_SINK_HARDWARE|PA_SINK_LATENCY); u->sink = pa_sink_new(m->core, &data, PA_SINK_HARDWARE|PA_SINK_LATENCY);
pa_sink_new_data_done(&data); pa_sink_new_data_done(&data);
@ -1225,14 +1274,18 @@ int pa__init(pa_module*m) {
u->nfragments = nfrags; u->nfragments = nfrags;
u->hwbuf_size = u->fragment_size * nfrags; u->hwbuf_size = u->fragment_size * nfrags;
u->hwbuf_unused_frames = 0; u->hwbuf_unused_frames = 0;
u->avail_min_frames = 0;
u->tsched_watermark = tsched_watermark; u->tsched_watermark = tsched_watermark;
u->frame_index = 0; u->frame_index = 0;
u->hw_dB_supported = FALSE; u->hw_dB_supported = FALSE;
u->hw_dB_min = u->hw_dB_max = 0; u->hw_dB_min = u->hw_dB_max = 0;
u->hw_volume_min = u->hw_volume_max = 0; u->hw_volume_min = u->hw_volume_max = 0;
u->sink->thread_info.max_rewind = use_tsched ? u->hwbuf_size : 0; if (use_tsched)
if (u->tsched_watermark >= u->hwbuf_size/2)
u->tsched_watermark = pa_frame_align(u->hwbuf_size/2, &ss);
u->sink->thread_info.max_rewind = use_tsched ? u->hwbuf_size : 0;
u->sink->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss); u->sink->max_latency = pa_bytes_to_usec(u->hwbuf_size, &ss);
if (!use_tsched) if (!use_tsched)
@ -1310,10 +1363,11 @@ int pa__init(pa_module*m) {
u->sink->flags |= PA_SINK_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SINK_DECIBEL_VOLUME : 0); u->sink->flags |= PA_SINK_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SINK_DECIBEL_VOLUME : 0);
pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using"); pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using");
} else { } else if (mixer_reset) {
pa_log_info("Using software volume control. Trying to reset sound card to 0 dB."); pa_log_info("Using software volume control. Trying to reset sound card to 0 dB.");
pa_alsa_0dB_playback(u->mixer_elem); pa_alsa_0dB_playback(u->mixer_elem);
} } else
pa_log_info("Using software volume control. Leaving hw mixer controls untouched.");
} }
if (snd_mixer_selem_has_playback_switch(u->mixer_elem)) { if (snd_mixer_selem_has_playback_switch(u->mixer_elem)) {

View file

@ -71,7 +71,26 @@ PA_MODULE_USAGE(
"mmap=<enable memory mapping?> " "mmap=<enable memory mapping?> "
"tsched=<enable system timer based scheduling mode?> " "tsched=<enable system timer based scheduling mode?> "
"tsched_buffer_size=<buffer size when using timer based scheduling> " "tsched_buffer_size=<buffer size when using timer based scheduling> "
"tsched_buffer_watermark=<upper fill watermark>"); "tsched_buffer_watermark=<upper fill watermark> "
"mixer_reset=<reset hw volume and mute settings to sane defaults when falling back to software?>");
static const char* const valid_modargs[] = {
"source_name",
"device",
"device_id",
"format",
"rate",
"channels",
"channel_map",
"fragments",
"fragment_size",
"mmap",
"tsched",
"tsched_buffer_size",
"tsched_buffer_watermark",
"mixer_reset",
NULL
};
#define DEFAULT_DEVICE "default" #define DEFAULT_DEVICE "default"
#define DEFAULT_TSCHED_BUFFER_USEC (2*PA_USEC_PER_SEC) #define DEFAULT_TSCHED_BUFFER_USEC (2*PA_USEC_PER_SEC)
@ -110,23 +129,6 @@ struct userdata {
int64_t frame_index; int64_t frame_index;
}; };
static const char* const valid_modargs[] = {
"source_name",
"device",
"device_id",
"format",
"rate",
"channels",
"channel_map",
"fragments",
"fragment_size",
"mmap",
"tsched",
"tsched_buffer_size",
"tsched_buffer_watermark",
NULL
};
static int mmap_read(struct userdata *u) { static int mmap_read(struct userdata *u) {
int work_done = 0; int work_done = 0;
@ -876,7 +878,7 @@ int pa__init(pa_module*m) {
const char *name; const char *name;
char *name_buf = NULL; char *name_buf = NULL;
pa_bool_t namereg_fail; pa_bool_t namereg_fail;
pa_bool_t use_mmap = TRUE, b, use_tsched = TRUE, d; pa_bool_t use_mmap = TRUE, b, use_tsched = TRUE, d, mixer_reset = TRUE;
pa_source_new_data data; pa_source_new_data data;
static const char * const class_table[SND_PCM_CLASS_LAST+1] = { static const char * const class_table[SND_PCM_CLASS_LAST+1] = {
[SND_PCM_CLASS_GENERIC] = "sound", [SND_PCM_CLASS_GENERIC] = "sound",
@ -938,6 +940,11 @@ int pa__init(pa_module*m) {
use_tsched = FALSE; use_tsched = FALSE;
} }
if (pa_modargs_get_value_boolean(ma, "mixer_reset", &mixer_reset) < 0) {
pa_log("Failed to parse mixer_reset argument.");
goto fail;
}
u = pa_xnew0(struct userdata, 1); u = pa_xnew0(struct userdata, 1);
u->core = m->core; u->core = m->core;
u->module = m; u->module = m;
@ -1163,10 +1170,12 @@ int pa__init(pa_module*m) {
u->source->flags |= PA_SOURCE_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SOURCE_DECIBEL_VOLUME : 0); u->source->flags |= PA_SOURCE_HW_VOLUME_CTRL | (u->hw_dB_supported ? PA_SOURCE_DECIBEL_VOLUME : 0);
pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using"); pa_log_info("Using hardware volume control. %s dB scale.", u->hw_dB_supported ? "Using" : "Not using");
} else { } else if (mixer_reset) {
pa_log_info("Using software volume control. Trying to reset sound card to 0 dB."); pa_log_info("Using software volume control. Trying to reset sound card to 0 dB.");
pa_alsa_0dB_capture(u->mixer_elem); pa_alsa_0dB_capture(u->mixer_elem);
} } else
pa_log_info("Using software volume control. Leaving hw mixer controls untouched.");
} }

View file

@ -252,7 +252,8 @@ static void thread_func(void *userdata) {
/* Just rewind if necessary, since we are in NULL mode, we /* Just rewind if necessary, since we are in NULL mode, we
* don't have to pass this on */ * don't have to pass this on */
pa_sink_process_rewind(u->sink); pa_sink_process_rewind(u->sink, u->sink->thread_info.rewind_nbytes);
u->sink->thread_info.rewind_nbytes = 0;
pa_rtclock_get(&now); pa_rtclock_get(&now);

View file

@ -134,7 +134,7 @@ static void sink_request_rewind(pa_sink *s) {
pa_assert_se(u = s->userdata); pa_assert_se(u = s->userdata);
/* Just hand this one over to the master sink */ /* Just hand this one over to the master sink */
pa_sink_input_request_rewrite(u->sink_input, s->thread_info.rewind_nbytes); pa_sink_input_request_rewind(u->sink_input, s->thread_info.rewind_nbytes, FALSE);
} }
/* Called from I/O thread context */ /* Called from I/O thread context */
@ -224,8 +224,7 @@ static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) {
pa_assert_se(u = i->userdata); pa_assert_se(u = i->userdata);
pa_assert(nbytes > 0); pa_assert(nbytes > 0);
u->sink->thread_info.rewind_nbytes = nbytes; pa_sink_process_rewind(u->sink, nbytes);
pa_sink_process_rewind(u->sink);
} }
/* Called from I/O thread context */ /* Called from I/O thread context */

View file

@ -112,7 +112,7 @@ static void sink_request_rewind(pa_sink *s) {
pa_sink_assert_ref(s); pa_sink_assert_ref(s);
pa_assert_se(u = s->userdata); pa_assert_se(u = s->userdata);
pa_sink_input_request_rewrite(u->sink_input, s->thread_info.rewind_nbytes); pa_sink_input_request_rewind(u->sink_input, s->thread_info.rewind_nbytes, FALSE);
} }
/* Called from I/O thread context */ /* Called from I/O thread context */
@ -166,8 +166,7 @@ static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) {
pa_assert_se(u = i->userdata); pa_assert_se(u = i->userdata);
pa_assert(nbytes > 0); pa_assert(nbytes > 0);
u->sink->thread_info.rewind_nbytes = nbytes; pa_sink_process_rewind(u->sink, nbytes);
pa_sink_process_rewind(u->sink);
} }
/* Called from I/O thread context */ /* Called from I/O thread context */

View file

@ -313,7 +313,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
char *c; char *c;
pa_sink *sink; pa_sink *sink;
int fd = -1; int fd = -1;
pa_memblock *silence; pa_memchunk silence;
pa_sink_input_new_data data; pa_sink_input_new_data data;
struct timeval now; struct timeval now;
@ -371,10 +371,7 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
s->sink_input->attach = sink_input_attach; s->sink_input->attach = sink_input_attach;
s->sink_input->detach = sink_input_detach; s->sink_input->detach = sink_input_detach;
silence = pa_silence_memblock_new( pa_sink_input_get_silence(s->sink_input, &silence);
s->userdata->module->core->mempool,
&s->sink_input->sample_spec,
pa_frame_align(pa_bytes_per_second(&s->sink_input->sample_spec)/128, &s->sink_input->sample_spec));
s->memblockq = pa_memblockq_new( s->memblockq = pa_memblockq_new(
0, 0,
@ -384,9 +381,9 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
pa_bytes_per_second(&s->sink_input->sample_spec)/10+1, pa_bytes_per_second(&s->sink_input->sample_spec)/10+1,
0, 0,
0, 0,
silence); &silence);
pa_memblock_unref(silence); pa_memblock_unref(silence.memblock);
pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec)); pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));

View file

@ -55,7 +55,7 @@ struct pa_memblockq {
size_t maxlength, tlength, base, prebuf, minreq, maxrewind; size_t maxlength, tlength, base, prebuf, minreq, maxrewind;
int64_t read_index, write_index; int64_t read_index, write_index;
pa_bool_t in_prebuf; pa_bool_t in_prebuf;
pa_memblock *silence; pa_memchunk silence;
pa_mcalign *mcalign; pa_mcalign *mcalign;
int64_t missing; int64_t missing;
size_t requested; size_t requested;
@ -69,7 +69,7 @@ pa_memblockq* pa_memblockq_new(
size_t prebuf, size_t prebuf,
size_t minreq, size_t minreq,
size_t maxrewind, size_t maxrewind,
pa_memblock *silence) { pa_memchunk *silence) {
pa_memblockq* bq; pa_memblockq* bq;
@ -98,7 +98,12 @@ pa_memblockq* pa_memblockq_new(
pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu maxrewind=%lu", pa_log_debug("memblockq sanitized: maxlength=%lu, tlength=%lu, base=%lu, prebuf=%lu, minreq=%lu maxrewind=%lu",
(unsigned long) bq->maxlength, (unsigned long) bq->tlength, (unsigned long) bq->base, (unsigned long) bq->prebuf, (unsigned long) bq->minreq, (unsigned long) bq->maxrewind); (unsigned long) bq->maxlength, (unsigned long) bq->tlength, (unsigned long) bq->base, (unsigned long) bq->prebuf, (unsigned long) bq->minreq, (unsigned long) bq->maxrewind);
bq->silence = silence ? pa_memblock_ref(silence) : NULL; if (silence) {
bq->silence = *silence;
pa_memblock_ref(bq->silence.memblock);
} else
pa_memchunk_reset(&bq->silence);
bq->mcalign = pa_mcalign_new(bq->base); bq->mcalign = pa_mcalign_new(bq->base);
return bq; return bq;
@ -109,8 +114,8 @@ void pa_memblockq_free(pa_memblockq* bq) {
pa_memblockq_flush(bq); pa_memblockq_flush(bq);
if (bq->silence) if (bq->silence.memblock)
pa_memblock_unref(bq->silence); pa_memblock_unref(bq->silence.memblock);
if (bq->mcalign) if (bq->mcalign)
pa_mcalign_free(bq->mcalign); pa_mcalign_free(bq->mcalign);
@ -420,7 +425,7 @@ finish:
return 0; return 0;
} }
static pa_bool_t memblockq_check_prebuf(pa_memblockq *bq) { pa_bool_t pa_memblockq_prebuf_active(pa_memblockq *bq) {
pa_assert(bq); pa_assert(bq);
if (bq->in_prebuf) { if (bq->in_prebuf) {
@ -447,7 +452,7 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
pa_assert(chunk); pa_assert(chunk);
/* We need to pre-buffer */ /* We need to pre-buffer */
if (memblockq_check_prebuf(bq)) if (pa_memblockq_prebuf_active(bq))
return -1; return -1;
fix_current_read(bq); fix_current_read(bq);
@ -466,13 +471,12 @@ int pa_memblockq_peek(pa_memblockq* bq, pa_memchunk *chunk) {
length = 0; length = 0;
/* We need to return silence, since no data is yet available */ /* We need to return silence, since no data is yet available */
if (bq->silence) { if (bq->silence.memblock) {
size_t l; *chunk = bq->silence;
pa_memblock_ref(chunk->memblock);
chunk->memblock = pa_memblock_ref(bq->silence); if (length > 0 && length < chunk->length)
chunk->length = length;
l = pa_memblock_get_length(chunk->memblock);
chunk->length = (length <= 0 || length > l) ? l : length;
} else { } else {
@ -511,7 +515,7 @@ void pa_memblockq_drop(pa_memblockq *bq, size_t length) {
while (length > 0) { while (length > 0) {
/* Do not drop any data when we are in prebuffering mode */ /* Do not drop any data when we are in prebuffering mode */
if (memblockq_check_prebuf(bq)) if (pa_memblockq_prebuf_active(bq))
break; break;
fix_current_read(bq); fix_current_read(bq);
@ -546,10 +550,20 @@ void pa_memblockq_drop(pa_memblockq *bq, size_t length) {
bq->missing += delta; bq->missing += delta;
} }
void pa_memblockq_rewind(pa_memblockq *bq, size_t length) {
pa_assert(bq);
pa_assert(length % bq->base == 0);
/* This is kind of the inverse of pa_memblockq_drop() */
bq->read_index -= length;
bq->missing -= length;
}
pa_bool_t pa_memblockq_is_readable(pa_memblockq *bq) { pa_bool_t pa_memblockq_is_readable(pa_memblockq *bq) {
pa_assert(bq); pa_assert(bq);
if (memblockq_check_prebuf(bq)) if (pa_memblockq_prebuf_active(bq))
return FALSE; return FALSE;
if (pa_memblockq_get_length(bq) <= 0) if (pa_memblockq_get_length(bq) <= 0)
@ -621,10 +635,7 @@ void pa_memblockq_flush(pa_memblockq *bq) {
int64_t old, delta; int64_t old, delta;
pa_assert(bq); pa_assert(bq);
while (bq->blocks) pa_memblockq_silence(bq);
drop_block(bq, bq->blocks);
pa_assert(bq->n_blocks == 0);
old = bq->write_index; old = bq->write_index;
bq->write_index = bq->read_index; bq->write_index = bq->read_index;
@ -757,18 +768,17 @@ void pa_memblockq_set_tlength(pa_memblockq *bq, size_t tlength) {
size_t old_tlength; size_t old_tlength;
pa_assert(bq); pa_assert(bq);
old_tlength = bq->tlength;
if (tlength <= 0) if (tlength <= 0)
tlength = bq->maxlength; tlength = bq->maxlength;
old_tlength = bq->tlength;
bq->tlength = ((tlength+bq->base-1)/bq->base)*bq->base; bq->tlength = ((tlength+bq->base-1)/bq->base)*bq->base;
if (bq->tlength > bq->maxlength) if (bq->tlength > bq->maxlength)
bq->tlength = bq->maxlength; bq->tlength = bq->maxlength;
if (bq->minreq > bq->tlength - bq->prebuf) if (bq->minreq > bq->tlength)
pa_memblockq_set_minreq(bq, bq->tlength - bq->prebuf); pa_memblockq_set_minreq(bq, bq->tlength);
bq->missing += (int64_t) bq->tlength - (int64_t) old_tlength; bq->missing += (int64_t) bq->tlength - (int64_t) old_tlength;
} }
@ -776,8 +786,10 @@ void pa_memblockq_set_tlength(pa_memblockq *bq, size_t tlength) {
void pa_memblockq_set_prebuf(pa_memblockq *bq, size_t prebuf) { void pa_memblockq_set_prebuf(pa_memblockq *bq, size_t prebuf) {
pa_assert(bq); pa_assert(bq);
bq->prebuf = (prebuf == (size_t) -1) ? bq->tlength : prebuf; if (prebuf == (size_t) -1)
bq->prebuf = ((bq->prebuf+bq->base-1)/bq->base)*bq->base; prebuf = bq->tlength;
bq->prebuf = ((prebuf+bq->base-1)/bq->base)*bq->base;
if (prebuf > 0 && bq->prebuf < bq->base) if (prebuf > 0 && bq->prebuf < bq->base)
bq->prebuf = bq->base; bq->prebuf = bq->base;
@ -788,8 +800,8 @@ void pa_memblockq_set_prebuf(pa_memblockq *bq, size_t prebuf) {
if (bq->prebuf <= 0 || pa_memblockq_get_length(bq) >= bq->prebuf) if (bq->prebuf <= 0 || pa_memblockq_get_length(bq) >= bq->prebuf)
bq->in_prebuf = FALSE; bq->in_prebuf = FALSE;
if (bq->minreq > bq->tlength - bq->prebuf) if (bq->minreq > bq->prebuf)
pa_memblockq_set_minreq(bq, bq->tlength - bq->prebuf); pa_memblockq_set_minreq(bq, bq->prebuf);
} }
void pa_memblockq_set_minreq(pa_memblockq *bq, size_t minreq) { void pa_memblockq_set_minreq(pa_memblockq *bq, size_t minreq) {
@ -797,8 +809,11 @@ void pa_memblockq_set_minreq(pa_memblockq *bq, size_t minreq) {
bq->minreq = (minreq/bq->base)*bq->base; bq->minreq = (minreq/bq->base)*bq->base;
if (bq->minreq > bq->tlength - bq->prebuf) if (bq->minreq > bq->tlength)
bq->minreq = bq->tlength - bq->prebuf; bq->minreq = bq->tlength;
if (bq->minreq > bq->prebuf)
bq->minreq = bq->prebuf;
if (bq->minreq < bq->base) if (bq->minreq < bq->base)
bq->minreq = bq->base; bq->minreq = bq->base;
@ -810,14 +825,6 @@ void pa_memblockq_set_maxrewind(pa_memblockq *bq, size_t maxrewind) {
bq->maxrewind = (maxrewind/bq->base)*bq->base; bq->maxrewind = (maxrewind/bq->base)*bq->base;
} }
void pa_memblockq_rewind(pa_memblockq *bq, size_t length) {
pa_assert(bq);
pa_assert(length % bq->base == 0);
bq->read_index -= length;
bq->missing -= length;
}
int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) { int pa_memblockq_splice(pa_memblockq *bq, pa_memblockq *source) {
pa_assert(bq); pa_assert(bq);
@ -859,13 +866,17 @@ void pa_memblockq_willneed(pa_memblockq *bq) {
pa_memchunk_will_need(&q->chunk); pa_memchunk_will_need(&q->chunk);
} }
void pa_memblockq_set_silence(pa_memblockq *bq, pa_memblock *silence) { void pa_memblockq_set_silence(pa_memblockq *bq, pa_memchunk *silence) {
pa_assert(bq); pa_assert(bq);
if (bq->silence) if (bq->silence.memblock)
pa_memblock_unref(bq->silence); pa_memblock_unref(bq->silence.memblock);
bq->silence = silence ? pa_memblock_ref(silence) : NULL; if (silence) {
bq->silence = *silence;
pa_memblock_ref(bq->silence.memblock);
} else
pa_memchunk_reset(&bq->silence);
} }
pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq) { pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq) {
@ -873,3 +884,12 @@ pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq) {
return !bq->blocks; return !bq->blocks;
} }
void pa_memblockq_silence(pa_memblockq *bq) {
pa_assert(bq);
while (bq->blocks)
drop_block(bq, bq->blocks);
pa_assert(bq->n_blocks == 0);
}

View file

@ -64,7 +64,7 @@ typedef struct pa_memblockq pa_memblockq;
- maxrewind: how many bytes of history to keep in the queue - maxrewind: how many bytes of history to keep in the queue
- silence: return this memblock when reading unitialized data - silence: return this memchunk when reading unitialized data
*/ */
pa_memblockq* pa_memblockq_new( pa_memblockq* pa_memblockq_new(
int64_t idx, int64_t idx,
@ -74,7 +74,7 @@ pa_memblockq* pa_memblockq_new(
size_t prebuf, size_t prebuf,
size_t minreq, size_t minreq,
size_t maxrewind, size_t maxrewind,
pa_memblock *silence); pa_memchunk *silence);
void pa_memblockq_free(pa_memblockq*bq); void pa_memblockq_free(pa_memblockq*bq);
@ -152,7 +152,7 @@ void pa_memblockq_set_tlength(pa_memblockq *memblockq, size_t tlength); /* might
void pa_memblockq_set_prebuf(pa_memblockq *memblockq, size_t prebuf); /* might modify minreq, too */ void pa_memblockq_set_prebuf(pa_memblockq *memblockq, size_t prebuf); /* might modify minreq, too */
void pa_memblockq_set_minreq(pa_memblockq *memblockq, size_t minreq); void pa_memblockq_set_minreq(pa_memblockq *memblockq, size_t minreq);
void pa_memblockq_set_maxrewind(pa_memblockq *memblockq, size_t rewind); /* Set the maximum history size */ void pa_memblockq_set_maxrewind(pa_memblockq *memblockq, size_t rewind); /* Set the maximum history size */
void pa_memblockq_set_silence(pa_memblockq *memblockq, pa_memblock *silence); void pa_memblockq_set_silence(pa_memblockq *memblockq, pa_memchunk *silence);
/* Call pa_memchunk_willneed() for every chunk in the queue from the current read pointer to the end */ /* Call pa_memchunk_willneed() for every chunk in the queue from the current read pointer to the end */
void pa_memblockq_willneed(pa_memblockq *bq); void pa_memblockq_willneed(pa_memblockq *bq);
@ -162,4 +162,9 @@ void pa_memblockq_willneed(pa_memblockq *bq);
* data for the future nor data in the backlog. */ * data for the future nor data in the backlog. */
pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq); pa_bool_t pa_memblockq_is_empty(pa_memblockq *bq);
void pa_memblockq_silence(pa_memblockq *bq);
/* Check whether we currently are in prebuf state */
pa_bool_t pa_memblockq_prebuf_active(pa_memblockq *bq);
#endif #endif

View file

@ -255,17 +255,13 @@ void pa_memblockq_sink_input_set_queue(pa_sink_input *i, pa_memblockq *q) {
pa_memblockq_free(u->memblockq); pa_memblockq_free(u->memblockq);
if ((u->memblockq = q)) { if ((u->memblockq = q)) {
pa_memblock *silence; pa_memchunk silence;
pa_memblockq_set_prebuf(q, 0); pa_memblockq_set_prebuf(q, 0);
silence = pa_silence_memblock_new( pa_sink_input_get_silence(i, &silence);
i->sink->core->mempool, pa_memblockq_set_silence(q, &silence);
&i->sample_spec, pa_memblock_unref(silence.memblock);
i->thread_info.resampler ? pa_resampler_max_block_size(i->thread_info.resampler) : 0);
pa_memblockq_set_silence(q, silence);
pa_memblock_unref(silence);
pa_memblockq_willneed(q); pa_memblockq_willneed(q);
} }

View file

@ -72,6 +72,7 @@
#define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */ #define MAX_MEMBLOCKQ_LENGTH (4*1024*1024) /* 4MB */
#define DEFAULT_TLENGTH_MSEC 2000 /* 2s */ #define DEFAULT_TLENGTH_MSEC 2000 /* 2s */
#define DEFAULT_PROCESS_MSEC 20 /* 20ms */
#define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC #define DEFAULT_FRAGSIZE_MSEC DEFAULT_TLENGTH_MSEC
typedef struct connection connection; typedef struct connection connection;
@ -104,7 +105,7 @@ typedef struct playback_stream {
pa_bool_t drain_request; pa_bool_t drain_request;
uint32_t drain_tag; uint32_t drain_tag;
uint32_t syncid; uint32_t syncid;
pa_bool_t underrun; uint64_t underrun; /* length of underrun */
pa_atomic_t missing; pa_atomic_t missing;
size_t minreq; size_t minreq;
@ -208,6 +209,9 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk
static void sink_input_kill_cb(pa_sink_input *i); static void sink_input_kill_cb(pa_sink_input *i);
static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend); static void sink_input_suspend_cb(pa_sink_input *i, pa_bool_t suspend);
static void sink_input_moved_cb(pa_sink_input *i); static void sink_input_moved_cb(pa_sink_input *i);
static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes);
static void sink_input_set_max_rewind_cb(pa_sink_input *i, size_t nbytes);
static void send_memblock(connection *c); static void send_memblock(connection *c);
static void request_bytes(struct playback_stream*s); static void request_bytes(struct playback_stream*s);
@ -471,7 +475,6 @@ static record_stream* record_stream_new(
pa_source *source, pa_source *source,
pa_sample_spec *ss, pa_sample_spec *ss,
pa_channel_map *map, pa_channel_map *map,
const char *name,
uint32_t *maxlength, uint32_t *maxlength,
uint32_t *fragsize, uint32_t *fragsize,
pa_source_output_flags_t flags, pa_source_output_flags_t flags,
@ -485,7 +488,6 @@ static record_stream* record_stream_new(
pa_assert(c); pa_assert(c);
pa_assert(ss); pa_assert(ss);
pa_assert(name);
pa_assert(maxlength); pa_assert(maxlength);
pa_assert(p); pa_assert(p);
@ -523,7 +525,7 @@ static record_stream* record_stream_new(
if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH) if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
*maxlength = MAX_MEMBLOCKQ_LENGTH; *maxlength = MAX_MEMBLOCKQ_LENGTH;
if (*fragsize <= 0) if (*fragsize <= 0)
*fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*1000, &source_output->sample_spec); *fragsize = pa_usec_to_bytes(DEFAULT_FRAGSIZE_MSEC*PA_USEC_PER_MSEC, &source_output->sample_spec);
if (adjust_latency) { if (adjust_latency) {
pa_usec_t fragsize_usec; pa_usec_t fragsize_usec;
@ -618,21 +620,14 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,
uint32_t l = 0; uint32_t l = 0;
for (;;) { for (;;) {
int32_t k; if ((l = pa_atomic_load(&s->missing)) <= 0)
if ((k = pa_atomic_load(&s->missing)) <= 0)
break; break;
l += k; if (pa_atomic_cmpxchg(&s->missing, l, 0))
if (l < s->minreq)
break;
if (pa_atomic_sub(&s->missing, k) <= k)
break; break;
} }
if (l < s->minreq) if (l <= 0)
break; break;
t = pa_tagstruct_new(NULL, 0); t = pa_tagstruct_new(NULL, 0);
@ -642,7 +637,7 @@ static int playback_stream_process_msg(pa_msgobject *o, int code, void*userdata,
pa_tagstruct_putu32(t, l); pa_tagstruct_putu32(t, l);
pa_pstream_send_tagstruct(s->connection->pstream, t); pa_pstream_send_tagstruct(s->connection->pstream, t);
/* pa_log("Requesting %u bytes", l); */ /* pa_log("Requesting %lu bytes", (unsigned long) l); */
break; break;
} }
@ -684,7 +679,6 @@ static playback_stream* playback_stream_new(
pa_sink *sink, pa_sink *sink,
pa_sample_spec *ss, pa_sample_spec *ss,
pa_channel_map *map, pa_channel_map *map,
const char *name,
uint32_t *maxlength, uint32_t *maxlength,
uint32_t *tlength, uint32_t *tlength,
uint32_t *prebuf, uint32_t *prebuf,
@ -699,14 +693,15 @@ static playback_stream* playback_stream_new(
playback_stream *s, *ssync; playback_stream *s, *ssync;
pa_sink_input *sink_input; pa_sink_input *sink_input;
pa_memblock *silence; pa_memchunk silence;
uint32_t idx; uint32_t idx;
int64_t start_index; int64_t start_index;
pa_sink_input_new_data data; pa_sink_input_new_data data;
pa_usec_t tlength_usec, minreq_usec, sink_usec;
size_t frame_size;
pa_assert(c); pa_assert(c);
pa_assert(ss); pa_assert(ss);
pa_assert(name);
pa_assert(maxlength); pa_assert(maxlength);
pa_assert(tlength); pa_assert(tlength);
pa_assert(prebuf); pa_assert(prebuf);
@ -761,10 +756,12 @@ static playback_stream* playback_stream_new(
s->connection = c; s->connection = c;
s->syncid = syncid; s->syncid = syncid;
s->sink_input = sink_input; s->sink_input = sink_input;
s->underrun = TRUE; s->underrun = (uint64_t) -1;
s->sink_input->parent.process_msg = sink_input_process_msg; s->sink_input->parent.process_msg = sink_input_process_msg;
s->sink_input->pop = sink_input_pop_cb; s->sink_input->pop = sink_input_pop_cb;
s->sink_input->rewind = sink_input_rewind_cb;
s->sink_input->set_max_rewind = sink_input_set_max_rewind_cb;
s->sink_input->kill = sink_input_kill_cb; s->sink_input->kill = sink_input_kill_cb;
s->sink_input->moved = sink_input_moved_cb; s->sink_input->moved = sink_input_moved_cb;
s->sink_input->suspend = sink_input_suspend_cb; s->sink_input->suspend = sink_input_suspend_cb;
@ -775,40 +772,69 @@ static playback_stream* playback_stream_new(
if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH) if (*maxlength <= 0 || *maxlength > MAX_MEMBLOCKQ_LENGTH)
*maxlength = MAX_MEMBLOCKQ_LENGTH; *maxlength = MAX_MEMBLOCKQ_LENGTH;
if (*tlength <= 0) if (*tlength <= 0)
*tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*1000, &sink_input->sample_spec); *tlength = pa_usec_to_bytes(DEFAULT_TLENGTH_MSEC*PA_USEC_PER_MSEC, &sink_input->sample_spec);
if (*minreq <= 0) if (*minreq <= 0)
*minreq = (*tlength*9)/10; *minreq = pa_usec_to_bytes(DEFAULT_PROCESS_MSEC*PA_USEC_PER_MSEC, &sink_input->sample_spec);
if (*prebuf <= 0)
*prebuf = *tlength;
if (adjust_latency) { frame_size = pa_frame_size(&sink_input->sample_spec);
pa_usec_t tlength_usec, minreq_usec; if (*minreq <= 0)
*minreq = frame_size;
/* So, the user asked us to adjust the latency according to if (*tlength < *minreq+frame_size)
* the what the sink can provide. Half the latency will be *tlength = *minreq+frame_size;
* spent on the hw buffer, half of it in the async buffer
* queue we maintain for each client. */
tlength_usec = pa_bytes_to_usec(*tlength, &sink_input->sample_spec); tlength_usec = pa_bytes_to_usec(*tlength, &sink_input->sample_spec);
minreq_usec = pa_bytes_to_usec(*minreq, &sink_input->sample_spec); minreq_usec = pa_bytes_to_usec(*minreq, &sink_input->sample_spec);
s->sink_latency = pa_sink_input_set_requested_latency(sink_input, tlength_usec/2); if (adjust_latency) {
if (tlength_usec >= s->sink_latency*2) /* So, the user asked us to adjust the latency of the stream
* buffer according to the what the sink can provide. The
* tlength passed in shall be the overall latency. Roughly
* half the latency will be spent on the hw buffer, the other
* half of it in the async buffer queue we maintain for each
* client. In between we'll have a safety space of size
* minreq.*/
sink_usec = (tlength_usec-minreq_usec)/2;
} else {
/* Ok, the user didn't ask us to adjust the latency, but we
* still need to make sure that the parameters from the user
* do make sense. */
sink_usec = tlength_usec - minreq_usec;
}
s->sink_latency = pa_sink_input_set_requested_latency(sink_input, sink_usec);
if (adjust_latency) {
/* Ok, we didn't necessarily get what we were asking for, so
* let's subtract from what we asked for for the remaining
* buffer space */
if (tlength_usec >= s->sink_latency)
tlength_usec -= s->sink_latency; tlength_usec -= s->sink_latency;
else }
tlength_usec = s->sink_latency;
if (minreq_usec >= s->sink_latency*2) if (tlength_usec < s->sink_latency + minreq_usec)
minreq_usec -= s->sink_latency; tlength_usec = s->sink_latency + minreq_usec;
else
minreq_usec = s->sink_latency;
*tlength = pa_usec_to_bytes(tlength_usec, &sink_input->sample_spec); *tlength = pa_usec_to_bytes(tlength_usec, &sink_input->sample_spec);
*minreq = pa_usec_to_bytes(minreq_usec, &sink_input->sample_spec); *minreq = pa_usec_to_bytes(minreq_usec, &sink_input->sample_spec);
if (*minreq <= 0) {
*minreq = frame_size;
*tlength += frame_size;
} }
silence = pa_silence_memblock_new(c->protocol->core->mempool, &sink_input->sample_spec, 0); if (*tlength <= *minreq)
*tlength = *minreq + frame_size;
if (*prebuf <= 0)
*prebuf = *tlength;
pa_sink_input_get_silence(sink_input, &silence);
s->memblockq = pa_memblockq_new( s->memblockq = pa_memblockq_new(
start_index, start_index,
@ -818,9 +844,9 @@ static playback_stream* playback_stream_new(
*prebuf, *prebuf,
*minreq, *minreq,
0, 0,
silence); &silence);
pa_memblock_unref(silence); pa_memblock_unref(silence.memblock);
*maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq); *maxlength = (uint32_t) pa_memblockq_get_maxlength(s->memblockq);
*tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq); *tlength = (uint32_t) pa_memblockq_get_tlength(s->memblockq);
@ -924,10 +950,12 @@ static void request_bytes(playback_stream *s) {
if (m <= 0) if (m <= 0)
return; return;
/* pa_log("request_bytes(%u)", m); */ /* pa_log("request_bytes(%lu)", (unsigned long) m); */
previous_missing = pa_atomic_add(&s->missing, m); previous_missing = pa_atomic_add(&s->missing, m);
if (previous_missing < s->minreq && previous_missing+m >= s->minreq) {
if (pa_memblockq_prebuf_active(s->memblockq) ||
(previous_missing < s->minreq && previous_missing+m >= s->minreq)) {
pa_assert(pa_thread_mq_get()); pa_assert(pa_thread_mq_get());
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL);
} }
@ -989,6 +1017,45 @@ static void send_record_stream_killed(record_stream *r) {
/*** sink input callbacks ***/ /*** sink input callbacks ***/
static void handle_seek(playback_stream *s, int64_t indexw) {
playback_stream_assert_ref(s);
/* pa_log("handle_seek: %llu -- %i", (unsigned long long) s->underrun, pa_memblockq_is_readable(s->memblockq)); */
if (s->underrun != 0) {
/* pa_log("%lu vs. %lu", (unsigned long) pa_memblockq_get_length(s->memblockq), (unsigned long) pa_memblockq_get_prebuf(s->memblockq)); */
if (pa_memblockq_is_readable(s->memblockq)) {
size_t u = pa_memblockq_get_length(s->memblockq);
if (u >= s->underrun)
u = s->underrun;
pa_log("yeah! ready to rock");
/* We just ended an underrun, let's ask the sink
* to rewrite */
s->sink_input->thread_info.ignore_rewind = TRUE;
pa_sink_input_request_rewind(s->sink_input, u, TRUE);
}
} else {
int64_t indexr;
indexr = pa_memblockq_get_read_index(s->memblockq);
if (indexw < indexr)
/* OK, the sink already asked for this data, so
* let's have it usk us again */
pa_sink_input_request_rewind(s->sink_input, indexr - indexw, FALSE);
}
request_bytes(s);
}
/* Called from thread context */ /* Called from thread context */
static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o); pa_sink_input *i = PA_SINK_INPUT(o);
@ -1000,48 +1067,42 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
switch (code) { switch (code) {
case SINK_INPUT_MESSAGE_SEEK: case SINK_INPUT_MESSAGE_SEEK: {
int64_t windex;
windex = pa_memblockq_get_write_index(s->memblockq);
pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata)); pa_memblockq_seek(s->memblockq, offset, PA_PTR_TO_UINT(userdata));
request_bytes(s);
handle_seek(s, windex);
return 0; return 0;
}
case SINK_INPUT_MESSAGE_POST_DATA: { case SINK_INPUT_MESSAGE_POST_DATA: {
int64_t windex;
pa_assert(chunk); pa_assert(chunk);
/* pa_log("sink input post: %u", chunk->length); */ /* pa_log("sink input post: %lu", (unsigned long) chunk->length); */
windex = pa_memblockq_get_write_index(s->memblockq);
if (pa_memblockq_push_align(s->memblockq, chunk) < 0) { if (pa_memblockq_push_align(s->memblockq, chunk) < 0) {
pa_log_warn("Failed to push data into queue"); pa_log_warn("Failed to push data into queue");
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_OVERFLOW, NULL, 0, NULL, NULL);
pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE); pa_memblockq_seek(s->memblockq, chunk->length, PA_SEEK_RELATIVE);
} }
request_bytes(s); handle_seek(s, windex);
s->underrun = FALSE;
return 0;
}
case SINK_INPUT_MESSAGE_DRAIN: {
pa_memblockq_prebuf_disable(s->memblockq);
if (!pa_memblockq_is_readable(s->memblockq))
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
else {
s->drain_tag = PA_PTR_TO_UINT(userdata);
s->drain_request = TRUE;
}
request_bytes(s);
return 0; return 0;
} }
case SINK_INPUT_MESSAGE_DRAIN:
case SINK_INPUT_MESSAGE_FLUSH: case SINK_INPUT_MESSAGE_FLUSH:
case SINK_INPUT_MESSAGE_PREBUF_FORCE: case SINK_INPUT_MESSAGE_PREBUF_FORCE:
case SINK_INPUT_MESSAGE_TRIGGER: { case SINK_INPUT_MESSAGE_TRIGGER: {
int64_t windex;
pa_sink_input *isync; pa_sink_input *isync;
void (*func)(pa_memblockq *bq); void (*func)(pa_memblockq *bq);
@ -1054,6 +1115,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
func = pa_memblockq_prebuf_force; func = pa_memblockq_prebuf_force;
break; break;
case SINK_INPUT_MESSAGE_DRAIN:
case SINK_INPUT_MESSAGE_TRIGGER: case SINK_INPUT_MESSAGE_TRIGGER:
func = pa_memblockq_prebuf_disable; func = pa_memblockq_prebuf_disable;
break; break;
@ -1062,23 +1124,32 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
pa_assert_not_reached(); pa_assert_not_reached();
} }
windex = pa_memblockq_get_write_index(s->memblockq);
func(s->memblockq); func(s->memblockq);
s->underrun = FALSE; handle_seek(s, windex);
request_bytes(s);
/* Do the same for all other members in the sync group */ /* Do the same for all other members in the sync group */
for (isync = i->sync_prev; isync; isync = isync->sync_prev) { for (isync = i->sync_prev; isync; isync = isync->sync_prev) {
playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
windex = pa_memblockq_get_write_index(ssync->memblockq);
func(ssync->memblockq); func(ssync->memblockq);
ssync->underrun = FALSE; handle_seek(ssync, windex);
request_bytes(ssync);
} }
for (isync = i->sync_next; isync; isync = isync->sync_next) { for (isync = i->sync_next; isync; isync = isync->sync_next) {
playback_stream *ssync = PLAYBACK_STREAM(isync->userdata); playback_stream *ssync = PLAYBACK_STREAM(isync->userdata);
windex = pa_memblockq_get_write_index(ssync->memblockq);
func(ssync->memblockq); func(ssync->memblockq);
ssync->underrun = FALSE; handle_seek(ssync, windex);
request_bytes(ssync); }
if (code == SINK_INPUT_MESSAGE_DRAIN) {
if (!pa_memblockq_is_readable(s->memblockq))
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, userdata, 0, NULL, NULL);
else {
s->drain_tag = PA_PTR_TO_UINT(userdata);
s->drain_request = TRUE;
}
} }
return 0; return 0;
@ -1091,11 +1162,18 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq); s->render_memblockq_length = pa_memblockq_get_length(s->sink_input->thread_info.render_memblockq);
return 0; return 0;
case PA_SINK_INPUT_MESSAGE_SET_STATE: case PA_SINK_INPUT_MESSAGE_SET_STATE: {
int64_t windex;
windex = pa_memblockq_get_write_index(s->memblockq);
pa_memblockq_prebuf_force(s->memblockq); pa_memblockq_prebuf_force(s->memblockq);
request_bytes(s);
handle_seek(s, windex);
/* Fall through to the default handler */
break; break;
}
case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
pa_usec_t *r = userdata; pa_usec_t *r = userdata;
@ -1112,7 +1190,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int
} }
/* Called from thread context */ /* Called from thread context */
static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { static int sink_input_pop_cb(pa_sink_input *i, size_t nbytes, pa_memchunk *chunk) {
playback_stream *s; playback_stream *s;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
@ -1122,23 +1200,58 @@ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk
if (pa_memblockq_peek(s->memblockq, chunk) < 0) { if (pa_memblockq_peek(s->memblockq, chunk) < 0) {
/* pa_log("UNDERRUN"); */
if (s->drain_request && pa_sink_input_safe_to_remove(i)) { if (s->drain_request && pa_sink_input_safe_to_remove(i)) {
s->drain_request = FALSE; s->drain_request = FALSE;
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_DRAIN_ACK, PA_UINT_TO_PTR(s->drain_tag), 0, NULL, NULL);
} else if (!s->underrun) { } else if (s->underrun == 0)
s->underrun = TRUE;
pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL); pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(s), PLAYBACK_STREAM_MESSAGE_UNDERFLOW, NULL, 0, NULL, NULL);
}
if (s->underrun != (size_t) -1)
s->underrun += nbytes;
/* pa_log("added %llu bytes, total is %llu", (unsigned long long) nbytes, (unsigned long long) s->underrun); */
request_bytes(s);
return -1; return -1;
} }
/* pa_log("NOTUNDERRUN"); */
s->underrun = 0;
pa_memblockq_drop(s->memblockq, chunk->length); pa_memblockq_drop(s->memblockq, chunk->length);
request_bytes(s); request_bytes(s);
return 0; return 0;
} }
static void sink_input_rewind_cb(pa_sink_input *i, size_t nbytes) {
playback_stream *s;
pa_sink_input_assert_ref(i);
s = PLAYBACK_STREAM(i->userdata);
playback_stream_assert_ref(s);
/* If we are in an underrun, then we don't rewind */
if (s->underrun != 0)
return;
pa_memblockq_rewind(s->memblockq, nbytes);
}
static void sink_input_set_max_rewind_cb(pa_sink_input *i, size_t nbytes) {
playback_stream *s;
pa_sink_input_assert_ref(i);
s = PLAYBACK_STREAM(i->userdata);
playback_stream_assert_ref(s);
pa_memblockq_set_maxrewind(s->memblockq, nbytes);
}
/* Called from main context */ /* Called from main context */
static void sink_input_kill_cb(pa_sink_input *i) { static void sink_input_kill_cb(pa_sink_input *i) {
playback_stream *s; playback_stream *s;
@ -1416,7 +1529,7 @@ static void command_create_playback_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GC
(no_move ? PA_SINK_INPUT_DONT_MOVE : 0) | (no_move ? PA_SINK_INPUT_DONT_MOVE : 0) |
(variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0); (variable_rate ? PA_SINK_INPUT_VARIABLE_RATE : 0);
s = playback_stream_new(c, sink, &ss, &map, name, &maxlength, &tlength, &prebuf, &minreq, &volume, muted, syncid, &missing, flags, p, adjust_latency); s = playback_stream_new(c, sink, &ss, &map, &maxlength, &tlength, &prebuf, &minreq, &volume, muted, syncid, &missing, flags, p, adjust_latency);
pa_proplist_free(p); pa_proplist_free(p);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
@ -1625,7 +1738,7 @@ static void command_create_record_stream(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_
(no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) | (no_move ? PA_SOURCE_OUTPUT_DONT_MOVE : 0) |
(variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0); (variable_rate ? PA_SOURCE_OUTPUT_VARIABLE_RATE : 0);
s = record_stream_new(c, source, &ss, &map, name, &maxlength, &fragment_size, flags, p, adjust_latency); s = record_stream_new(c, source, &ss, &map, &maxlength, &fragment_size, flags, p, adjust_latency);
pa_proplist_free(p); pa_proplist_free(p);
CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID); CHECK_VALIDITY(c->pstream, s, tag, PA_ERR_INVALID);
@ -1721,7 +1834,6 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
pa_log_warn("failed to get GID of group '%s'", c->protocol->auth_group); pa_log_warn("failed to get GID of group '%s'", c->protocol->auth_group);
else if (gid == creds->gid) else if (gid == creds->gid)
success = 1; success = 1;
if (!success) { if (!success) {
if ((r = pa_uid_in_group(creds->uid, c->protocol->auth_group)) < 0) if ((r = pa_uid_in_group(creds->uid, c->protocol->auth_group)) < 0)
pa_log_warn("failed to check group membership."); pa_log_warn("failed to check group membership.");
@ -1739,7 +1851,7 @@ static void command_auth(PA_GCC_UNUSED pa_pdispatch *pd, PA_GCC_UNUSED uint32_t
pa_mempool_is_shared(c->protocol->core->mempool) && pa_mempool_is_shared(c->protocol->core->mempool) &&
creds->uid == getuid()) { creds->uid == getuid()) {
pa_pstream_use_shm(c->pstream, 1); pa_pstream_enable_shm(c->pstream, TRUE);
pa_log_info("Enabled SHM for new connection"); pa_log_info("Enabled SHM for new connection");
} }

View file

@ -42,7 +42,7 @@
#include "sink-input.h" #include "sink-input.h"
#define MEMBLOCKQ_MAXLENGTH (16*1024*1024) #define MEMBLOCKQ_MAXLENGTH (32*1024*1024)
#define CONVERT_BUFFER_LENGTH (PA_PAGE_SIZE) #define CONVERT_BUFFER_LENGTH (PA_PAGE_SIZE)
#define MOVE_BUFFER_LENGTH (PA_PAGE_SIZE*256) #define MOVE_BUFFER_LENGTH (PA_PAGE_SIZE*256)
@ -94,6 +94,20 @@ void pa_sink_input_new_data_done(pa_sink_input_new_data *data) {
pa_proplist_free(data->proplist); pa_proplist_free(data->proplist);
} }
static void reset_callbacks(pa_sink_input *i) {
pa_assert(i);
i->pop = NULL;
i->rewind = NULL;
i->set_max_rewind = NULL;
i->attach = NULL;
i->detach = NULL;
i->suspend = NULL;
i->moved = NULL;
i->kill = NULL;
i->get_latency = NULL;
}
pa_sink_input* pa_sink_input_new( pa_sink_input* pa_sink_input_new(
pa_core *core, pa_core *core,
pa_sink_input_new_data *data, pa_sink_input_new_data *data,
@ -102,7 +116,6 @@ pa_sink_input* pa_sink_input_new(
pa_sink_input *i; pa_sink_input *i;
pa_resampler *resampler = NULL; pa_resampler *resampler = NULL;
char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX]; char st[PA_SAMPLE_SPEC_SNPRINT_MAX], cm[PA_CHANNEL_MAP_SNPRINT_MAX];
pa_memblock *silence;
pa_assert(core); pa_assert(core);
pa_assert(data); pa_assert(data);
@ -223,15 +236,7 @@ pa_sink_input* pa_sink_input_new(
} else } else
i->sync_next = i->sync_prev = NULL; i->sync_next = i->sync_prev = NULL;
i->pop = NULL; reset_callbacks(i);
i->rewind = NULL;
i->set_max_rewind = NULL;
i->kill = NULL;
i->get_latency = NULL;
i->attach = NULL;
i->detach = NULL;
i->suspend = NULL;
i->moved = NULL;
i->userdata = NULL; i->userdata = NULL;
i->thread_info.state = i->state; i->thread_info.state = i->state;
@ -244,10 +249,9 @@ pa_sink_input* pa_sink_input_new(
i->thread_info.muted = i->muted; i->thread_info.muted = i->muted;
i->thread_info.requested_sink_latency = 0; i->thread_info.requested_sink_latency = 0;
i->thread_info.rewrite_nbytes = 0; i->thread_info.rewrite_nbytes = 0;
i->thread_info.since_underrun = 0;
i->thread_info.ignore_rewind = FALSE; i->thread_info.ignore_rewind = FALSE;
silence = pa_silence_memblock_new(i->sink->core->mempool, &i->sink->sample_spec, 0);
i->thread_info.render_memblockq = pa_memblockq_new( i->thread_info.render_memblockq = pa_memblockq_new(
0, 0,
MEMBLOCKQ_MAXLENGTH, MEMBLOCKQ_MAXLENGTH,
@ -256,9 +260,7 @@ pa_sink_input* pa_sink_input_new(
0, 0,
1, 1,
0, 0,
silence); &i->sink->silence);
pa_memblock_unref(silence);
pa_assert_se(pa_idxset_put(core->sink_inputs, pa_sink_input_ref(i), &i->index) == 0); pa_assert_se(pa_idxset_put(core->sink_inputs, pa_sink_input_ref(i), &i->index) == 0);
pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0); pa_assert_se(pa_idxset_put(i->sink->inputs, i, NULL) == 0);
@ -349,15 +351,7 @@ void pa_sink_input_unlink(pa_sink_input *i) {
} else } else
i->state = PA_SINK_INPUT_UNLINKED; i->state = PA_SINK_INPUT_UNLINKED;
i->pop = NULL; reset_callbacks(i);
i->rewind = NULL;
i->set_max_rewind = NULL;
i->kill = NULL;
i->get_latency = NULL;
i->attach = NULL;
i->detach = NULL;
i->suspend = NULL;
i->moved = NULL;
if (linked) { if (linked) {
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index);
@ -395,19 +389,21 @@ static void sink_input_free(pa_object *o) {
} }
void pa_sink_input_put(pa_sink_input *i) { void pa_sink_input_put(pa_sink_input *i) {
pa_sink_input_state_t state;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_assert(i->state == PA_SINK_INPUT_INIT); pa_assert(i->state == PA_SINK_INPUT_INIT);
pa_assert(i->pop); pa_assert(i->pop);
pa_assert(i->rewind);
i->thread_info.state = i->state = i->flags & PA_SINK_INPUT_START_CORKED ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING;
i->thread_info.volume = i->volume; i->thread_info.volume = i->volume;
i->thread_info.muted = i->muted; i->thread_info.muted = i->muted;
if (i->state == PA_SINK_INPUT_CORKED) state = i->flags & PA_SINK_INPUT_START_CORKED ? PA_SINK_INPUT_CORKED : PA_SINK_INPUT_RUNNING;
i->sink->n_corked++;
update_n_corked(i, state);
i->thread_info.state = i->state = state;
pa_sink_update_status(i->sink);
pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL); pa_asyncmsgq_send(i->sink->asyncmsgq, PA_MSGOBJECT(i->sink), PA_SINK_MESSAGE_ADD_INPUT, i, 0, NULL);
pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index); pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index);
@ -454,18 +450,20 @@ int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa
pa_assert(chunk); pa_assert(chunk);
pa_assert(volume); pa_assert(volume);
pa_log_debug("peek"); /* pa_log_debug("peek"); */
if (!i->pop || i->thread_info.state == PA_SINK_INPUT_CORKED) if (!i->pop)
return -1; return -1;
pa_assert(i->thread_info.state == PA_SINK_INPUT_RUNNING || i->thread_info.state == PA_SINK_INPUT_DRAINED); pa_assert(i->thread_info.state == PA_SINK_INPUT_RUNNING ||
i->thread_info.state == PA_SINK_INPUT_CORKED ||
i->thread_info.state == PA_SINK_INPUT_DRAINED);
/* If there's still some rewrite request the handle, but the sink /* If there's still some rewrite request the handle, but the sink
didn't do this for us, we do it here. However, since the sink didn't do this for us, we do it here. However, since the sink
apparently doesn't support rewinding, we pass 0 here. This still apparently doesn't support rewinding, we pass 0 here. This still
allows rewinding through the render buffer. */ allows rewinding through the render buffer. */
pa_sink_input_rewind(i, 0); pa_sink_input_process_rewind(i, 0);
block_size_max_sink_input = i->thread_info.resampler ? block_size_max_sink_input = i->thread_info.resampler ?
pa_resampler_max_block_size(i->thread_info.resampler) : pa_resampler_max_block_size(i->thread_info.resampler) :
@ -504,11 +502,15 @@ int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa
/* There's nothing in our render queue. We need to fill it up /* There's nothing in our render queue. We need to fill it up
* with data from the implementor. */ * with data from the implementor. */
if (i->pop(i, ilength, &tchunk) < 0) { if (i->thread_info.state == PA_SINK_INPUT_CORKED ||
i->pop(i, ilength, &tchunk) < 0) {
/* OK, we're corked or the implementor didn't give us any
* data, so let's just hand out silence */
pa_atomic_store(&i->thread_info.drained, 1); pa_atomic_store(&i->thread_info.drained, 1);
/* OK, we got no data from the implementor, so let's just skip ahead */
pa_memblockq_seek(i->thread_info.render_memblockq, slength, PA_SEEK_RELATIVE_ON_READ); pa_memblockq_seek(i->thread_info.render_memblockq, slength, PA_SEEK_RELATIVE_ON_READ);
i->thread_info.since_underrun = 0;
break; break;
} }
@ -517,10 +519,13 @@ int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa
pa_assert(tchunk.length > 0); pa_assert(tchunk.length > 0);
pa_assert(tchunk.memblock); pa_assert(tchunk.memblock);
i->thread_info.since_underrun += tchunk.length;
while (tchunk.length > 0) { while (tchunk.length > 0) {
pa_memchunk wchunk; pa_memchunk wchunk;
wchunk = tchunk; wchunk = tchunk;
pa_memblock_ref(wchunk.memblock);
if (wchunk.length > block_size_max_sink_input) if (wchunk.length > block_size_max_sink_input)
wchunk.length = block_size_max_sink_input; wchunk.length = block_size_max_sink_input;
@ -529,6 +534,8 @@ int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa
if (do_volume_adj_here && !volume_is_norm) { if (do_volume_adj_here && !volume_is_norm) {
pa_memchunk_make_writable(&wchunk, 0); pa_memchunk_make_writable(&wchunk, 0);
pa_log_debug("adjusting volume!");
if (i->thread_info.muted) if (i->thread_info.muted)
pa_silence_memchunk(&wchunk, &i->thread_info.sample_spec); pa_silence_memchunk(&wchunk, &i->thread_info.sample_spec);
else else
@ -547,6 +554,8 @@ int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa
} }
} }
pa_memblock_unref(wchunk.memblock);
tchunk.index += wchunk.length; tchunk.index += wchunk.length;
tchunk.length -= wchunk.length; tchunk.length -= wchunk.length;
} }
@ -581,22 +590,18 @@ int pa_sink_input_peek(pa_sink_input *i, size_t slength /* in sink frames */, pa
/* Called from thread context */ /* Called from thread context */
void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec */) { void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state));
pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec)); pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec));
pa_assert(nbytes > 0); pa_assert(nbytes > 0);
if (i->thread_info.state == PA_SINK_INPUT_CORKED)
return;
/* If there's still some rewrite request the handle, but the sink /* If there's still some rewrite request the handle, but the sink
didn't do this for us, we do it here. However, since the sink didn't do this for us, we do it here. However, since the sink
apparently doesn't support rewinding, we pass 0 here. This still apparently doesn't support rewinding, we pass 0 here. This still
allows rewinding through the render buffer. */ allows rewinding through the render buffer. */
if (i->thread_info.rewrite_nbytes > 0) if (i->thread_info.rewrite_nbytes > 0)
pa_sink_input_rewind(i, 0); pa_sink_input_process_rewind(i, 0);
pa_memblockq_drop(i->thread_info.render_memblockq, nbytes); pa_memblockq_drop(i->thread_info.render_memblockq, nbytes);
@ -604,25 +609,22 @@ void pa_sink_input_drop(pa_sink_input *i, size_t nbytes /* in sink sample spec *
} }
/* Called from thread context */ /* Called from thread context */
void pa_sink_input_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) { void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec */) {
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state));
pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec)); pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec));
pa_log_debug("rewind(%lu, %lu)", (unsigned long) nbytes, (unsigned long) i->thread_info.rewrite_nbytes); /* pa_log_debug("rewind(%lu, %lu)", (unsigned long) nbytes, (unsigned long) i->thread_info.rewrite_nbytes); */
if (i->thread_info.state == PA_SINK_INPUT_CORKED)
return;
if (i->thread_info.ignore_rewind) { if (i->thread_info.ignore_rewind) {
i->thread_info.rewrite_nbytes = 0;
i->thread_info.ignore_rewind = FALSE; i->thread_info.ignore_rewind = FALSE;
i->thread_info.rewrite_nbytes = 0;
return; return;
} }
if (nbytes > 0) if (nbytes > 0)
pa_log_debug("Have to rewind %lu bytes.", (unsigned long) nbytes); pa_log_debug("Have to rewind %lu bytes on render memblockq.", (unsigned long) nbytes);
if (i->thread_info.rewrite_nbytes > 0) { if (i->thread_info.rewrite_nbytes > 0) {
size_t max_rewrite; size_t max_rewrite;
@ -641,19 +643,23 @@ void pa_sink_input_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec
/* Convert back to to sink domain */ /* Convert back to to sink domain */
r = i->thread_info.resampler ? pa_resampler_result(i->thread_info.resampler, amount) : amount; r = i->thread_info.resampler ? pa_resampler_result(i->thread_info.resampler, amount) : amount;
if (r > 0)
/* Ok, now update the write pointer */ /* Ok, now update the write pointer */
pa_memblockq_seek(i->thread_info.render_memblockq, -r, PA_SEEK_RELATIVE); pa_memblockq_seek(i->thread_info.render_memblockq, -r, PA_SEEK_RELATIVE);
if (amount) {
pa_log_debug("Have to rewind %lu bytes on implementor.", (unsigned long) amount);
/* Tell the implementor */ /* Tell the implementor */
if (i->rewind) if (i->rewind)
i->rewind(i, amount); i->rewind(i, amount);
}
/* And reset the resampler */ /* And reset the resampler */
if (i->thread_info.resampler) if (i->thread_info.resampler)
pa_resampler_reset(i->thread_info.resampler); pa_resampler_reset(i->thread_info.resampler);
} }
i->thread_info.rewrite_nbytes = 0; i->thread_info.rewrite_nbytes = 0;
} }
@ -664,7 +670,6 @@ void pa_sink_input_rewind(pa_sink_input *i, size_t nbytes /* in sink sample spec
/* Called from thread context */ /* Called from thread context */
void pa_sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */) { void pa_sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */) {
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state)); pa_assert(PA_SINK_INPUT_LINKED(i->thread_info.state));
pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec)); pa_assert(pa_frame_aligned(nbytes, &i->sink->sample_spec));
@ -935,7 +940,7 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
/* Replace resampler */ /* Replace resampler */
if (new_resampler != i->thread_info.resampler) { if (new_resampler != i->thread_info.resampler) {
pa_memblock *silence; pa_memchunk silence;
if (i->thread_info.resampler) if (i->thread_info.resampler)
pa_resampler_free(i->thread_info.resampler); pa_resampler_free(i->thread_info.resampler);
@ -944,9 +949,15 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
/* if the resampler changed, the silence memblock is /* if the resampler changed, the silence memblock is
* probably invalid now, too */ * probably invalid now, too */
silence = pa_silence_memblock_new(i->sink->core->mempool, &dest->sample_spec, new_resampler ? pa_resampler_max_block_size(new_resampler) : 0); pa_silence_memchunk_get(
pa_memblockq_set_silence(i->thread_info.render_memblockq, silence); &i->sink->core->silence_cache,
pa_memblock_unref(silence); i->sink->core->mempool,
&silence,
&dest->sample_spec,
0);
pa_memblockq_set_silence(i->thread_info.render_memblockq, &silence);
pa_memblock_unref(silence.memblock);
} }
@ -974,6 +985,35 @@ int pa_sink_input_move_to(pa_sink_input *i, pa_sink *dest, int immediately) {
return 0; return 0;
} }
static void set_state(pa_sink_input *i, pa_sink_input_state_t state) {
pa_sink_input_assert_ref(i);
if ((state == PA_SINK_INPUT_DRAINED || state == PA_SINK_INPUT_RUNNING) &&
!(i->thread_info.state == PA_SINK_INPUT_DRAINED || i->thread_info.state != PA_SINK_INPUT_RUNNING))
pa_atomic_store(&i->thread_info.drained, 1);
if (state == PA_SINK_INPUT_CORKED && i->thread_info.state != PA_SINK_INPUT_CORKED) {
/* OK, we're corked, so let's make sure we have total silence
* from now on on this stream */
pa_memblockq_silence(i->thread_info.render_memblockq);
/* This will tell the implementing sink input driver to rewind
* so that the unplayed already mixed data is not lost */
pa_sink_input_request_rewind(i, 0, FALSE);
} else if (i->thread_info.state == PA_SINK_INPUT_CORKED && state != PA_SINK_INPUT_CORKED) {
/* OK, we're being uncorked. Make sure we're not rewound when
* the hw buffer is remixed and request a remix. */
i->thread_info.ignore_rewind = TRUE;
i->thread_info.since_underrun = 0;
pa_sink_request_rewind(i->sink, 0);
}
i->thread_info.state = state;
}
/* Called from thread context */ /* Called from thread context */
int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
pa_sink_input *i = PA_SINK_INPUT(o); pa_sink_input *i = PA_SINK_INPUT(o);
@ -984,12 +1024,12 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t
switch (code) { switch (code) {
case PA_SINK_INPUT_MESSAGE_SET_VOLUME: case PA_SINK_INPUT_MESSAGE_SET_VOLUME:
i->thread_info.volume = *((pa_cvolume*) userdata); i->thread_info.volume = *((pa_cvolume*) userdata);
pa_sink_input_request_rewrite(i, 0); pa_sink_input_request_rewind(i, 0, FALSE);
return 0; return 0;
case PA_SINK_INPUT_MESSAGE_SET_MUTE: case PA_SINK_INPUT_MESSAGE_SET_MUTE:
i->thread_info.muted = PA_PTR_TO_UINT(userdata); i->thread_info.muted = PA_PTR_TO_UINT(userdata);
pa_sink_input_request_rewrite(i, 0); pa_sink_input_request_rewind(i, 0, FALSE);
return 0; return 0;
case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {
@ -1010,25 +1050,13 @@ int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t
case PA_SINK_INPUT_MESSAGE_SET_STATE: { case PA_SINK_INPUT_MESSAGE_SET_STATE: {
pa_sink_input *ssync; pa_sink_input *ssync;
if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) && set_state(i, PA_PTR_TO_UINT(userdata));
(i->thread_info.state != PA_SINK_INPUT_DRAINED) && (i->thread_info.state != PA_SINK_INPUT_RUNNING))
pa_atomic_store(&i->thread_info.drained, 1);
i->thread_info.state = PA_PTR_TO_UINT(userdata); for (ssync = i->thread_info.sync_prev; ssync; ssync = ssync->thread_info.sync_prev)
set_state(ssync, PA_PTR_TO_UINT(userdata));
for (ssync = i->thread_info.sync_prev; ssync; ssync = ssync->thread_info.sync_prev) { for (ssync = i->thread_info.sync_next; ssync; ssync = ssync->thread_info.sync_next)
if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) && set_state(ssync, PA_PTR_TO_UINT(userdata));
(ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING))
pa_atomic_store(&ssync->thread_info.drained, 1);
ssync->thread_info.state = PA_PTR_TO_UINT(userdata);
}
for (ssync = i->thread_info.sync_next; ssync; ssync = ssync->thread_info.sync_next) {
if ((PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_DRAINED || PA_PTR_TO_UINT(userdata) == PA_SINK_INPUT_RUNNING) &&
(ssync->thread_info.state != PA_SINK_INPUT_DRAINED) && (ssync->thread_info.state != PA_SINK_INPUT_RUNNING))
pa_atomic_store(&ssync->thread_info.drained, 1);
ssync->thread_info.state = PA_PTR_TO_UINT(userdata);
}
return 0; return 0;
} }
@ -1062,24 +1090,42 @@ pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i) {
return TRUE; return TRUE;
} }
void pa_sink_input_request_rewrite(pa_sink_input *i, size_t nbytes /* in our sample spec */) { void pa_sink_input_request_rewind(pa_sink_input *i, size_t nbytes /* in our sample spec */, pa_bool_t ignore_underruns) {
size_t l, lbq; size_t l, lbq;
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
/* We don't take rewind requests while we are corked */
if (i->state == PA_SINK_INPUT_CORKED)
return;
lbq = pa_memblockq_get_length(i->thread_info.render_memblockq); lbq = pa_memblockq_get_length(i->thread_info.render_memblockq);
if (nbytes <= 0) { if (nbytes <= 0) {
/* Calulate maximum number of bytes that could be rewound in theory */
nbytes = i->sink->thread_info.max_rewind + lbq;
/* Transform from sink domain */
nbytes = nbytes =
i->thread_info.resampler ? i->thread_info.resampler ?
pa_resampler_request(i->thread_info.resampler, i->sink->thread_info.max_rewind + lbq) : pa_resampler_request(i->thread_info.resampler, nbytes) :
(i->sink->thread_info.max_rewind + lbq); nbytes;
} }
i->thread_info.rewrite_nbytes = PA_MAX(nbytes, i->thread_info.rewrite_nbytes); /* Increase the number of bytes to rewrite, never decrease */
if (nbytes > i->thread_info.rewrite_nbytes)
i->thread_info.rewrite_nbytes = nbytes;
if (!ignore_underruns) {
/* Make sure to not overwrite over underruns */
if ((int64_t) i->thread_info.rewrite_nbytes > i->thread_info.since_underrun)
i->thread_info.rewrite_nbytes = (size_t) i->thread_info.since_underrun;
}
/* Transform to sink domain */ /* Transform to sink domain */
l = i->thread_info.resampler ? pa_resampler_result(i->thread_info.resampler, nbytes) : nbytes; l = i->thread_info.resampler ?
pa_resampler_result(i->thread_info.resampler, i->thread_info.rewrite_nbytes) :
i->thread_info.rewrite_nbytes;
if (l <= 0) if (l <= 0)
return; return;
@ -1087,3 +1133,17 @@ void pa_sink_input_request_rewrite(pa_sink_input *i, size_t nbytes /* in our sa
if (l > lbq) if (l > lbq)
pa_sink_request_rewind(i->sink, l - lbq); pa_sink_request_rewind(i->sink, l - lbq);
} }
pa_memchunk* pa_sink_input_get_silence(pa_sink_input *i, pa_memchunk *ret) {
pa_sink_input_assert_ref(i);
pa_assert(ret);
pa_silence_memchunk_get(
&i->sink->core->silence_cache,
i->sink->core->mempool,
ret,
&i->sample_spec,
i->thread_info.resampler ? pa_resampler_max_block_size(i->thread_info.resampler) : 0);
return ret;
}

View file

@ -97,16 +97,16 @@ struct pa_sink_input {
* only. If less data is available, it's fine to return a smaller * only. If less data is available, it's fine to return a smaller
* block. If more data is already ready, it is better to return * block. If more data is already ready, it is better to return
* the full block. */ * the full block. */
int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); int (*pop) (pa_sink_input *i, size_t request_nbytes, pa_memchunk *chunk); /* may NOT be NULL */
/* Rewind the queue by the specified number of bytes. Called just /* Rewind the queue by the specified number of bytes. Called just
* before peek() if it is called at all. Only called if the sink * before peek() if it is called at all. Only called if the sink
* input driver ever plans to call * input driver ever plans to call
* pa_sink_input_request_rewrite(). Called from IO context. */ * pa_sink_input_request_rewind(). Called from IO context. */
void (*rewind) (pa_sink_input *i, size_t nbytes); /* may be NULL */ void (*rewind) (pa_sink_input *i, size_t nbytes); /* may NOT be NULL */
/* Called whenever the maximum rewindable size of the sink /* Called whenever the maximum rewindable size of the sink
* changes. Called from UI context. */ * changes. Called from RT context. */
void (*set_max_rewind) (pa_sink_input *i, size_t nbytes); /* may be NULL */ void (*set_max_rewind) (pa_sink_input *i, size_t nbytes); /* may be NULL */
/* If non-NULL this function is called when the input is first /* If non-NULL this function is called when the input is first
@ -150,7 +150,9 @@ struct pa_sink_input {
/* We maintain a history of resampled audio data here. */ /* We maintain a history of resampled audio data here. */
pa_memblockq *render_memblockq; pa_memblockq *render_memblockq;
size_t rewrite_nbytes; size_t rewrite_nbytes;
int64_t since_underrun;
pa_bool_t ignore_rewind; pa_bool_t ignore_rewind;
pa_sink_input *sync_prev, *sync_next; pa_sink_input *sync_prev, *sync_next;
@ -229,14 +231,13 @@ void pa_sink_input_set_name(pa_sink_input *i, const char *name);
pa_usec_t pa_sink_input_set_requested_latency(pa_sink_input *i, pa_usec_t usec); pa_usec_t pa_sink_input_set_requested_latency(pa_sink_input *i, pa_usec_t usec);
/* Request that the specified number of bytes already written out to /* Request that the specified number of bytes already written out to
the hw device is rewritten, if possible. If this function is used you the hw device is rewritten, if possible. Please note that this is
need to supply the ->rewind() function pointer. Please note that this only a kind request. The sink driver may not be able to fulfill it
is only a kind request. The sink driver may not be able to fulfill it
fully -- or at all. If the request for a rewrite was successful, the fully -- or at all. If the request for a rewrite was successful, the
sink driver will call ->rewind() and pass the number of bytes that sink driver will call ->rewind() and pass the number of bytes that
could be rewound in the HW device. This functionality is required for could be rewound in the HW device. This functionality is required for
implementing the "zero latency" write-through functionality. */ implementing the "zero latency" write-through functionality. */
void pa_sink_input_request_rewrite(pa_sink_input *i, size_t nbytes); void pa_sink_input_request_rewind(pa_sink_input *i, size_t nbytes, pa_bool_t ignore_rewind);
/* Callable by everyone from main thread*/ /* Callable by everyone from main thread*/
@ -265,7 +266,7 @@ pa_bool_t pa_sink_input_safe_to_remove(pa_sink_input *i);
int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_cvolume *volume); int pa_sink_input_peek(pa_sink_input *i, size_t length, pa_memchunk *chunk, pa_cvolume *volume);
void pa_sink_input_drop(pa_sink_input *i, size_t length); void pa_sink_input_drop(pa_sink_input *i, size_t length);
void pa_sink_input_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */); void pa_sink_input_process_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */);
void pa_sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */); void pa_sink_input_set_max_rewind(pa_sink_input *i, size_t nbytes /* in the sink's sample spec */);
int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); int pa_sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
@ -277,5 +278,6 @@ typedef struct pa_sink_input_move_info {
size_t buffer_bytes; size_t buffer_bytes;
} pa_sink_input_move_info; } pa_sink_input_move_info;
pa_memchunk* pa_sink_input_get_silence(pa_sink_input *i, pa_memchunk *ret);
#endif #endif

View file

@ -105,6 +105,19 @@ void pa_sink_new_data_done(pa_sink_new_data *data) {
pa_proplist_free(data->proplist); pa_proplist_free(data->proplist);
} }
static void reset_callbacks(pa_sink *s) {
pa_assert(s);
s->set_state = NULL;
s->get_volume = NULL;
s->set_volume = NULL;
s->get_mute = NULL;
s->set_mute = NULL;
s->get_latency = NULL;
s->request_rewind = NULL;
s->update_requested_latency = NULL;
}
pa_sink* pa_sink_new( pa_sink* pa_sink_new(
pa_core *core, pa_core *core,
pa_sink_new_data *data, pa_sink_new_data *data,
@ -182,19 +195,18 @@ pa_sink* pa_sink_new(
s->muted = data->muted; s->muted = data->muted;
s->refresh_volume = s->refresh_mute = FALSE; s->refresh_volume = s->refresh_mute = FALSE;
s->get_latency = NULL; reset_callbacks(s);
s->set_volume = NULL;
s->get_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
s->set_state = NULL;
s->request_rewind = NULL;
s->update_requested_latency = NULL;
s->userdata = NULL; s->userdata = NULL;
s->asyncmsgq = NULL; s->asyncmsgq = NULL;
s->rtpoll = NULL; s->rtpoll = NULL;
s->silence = pa_silence_memblock_new(core->mempool, &s->sample_spec, 0);
pa_silence_memchunk_get(
&core->silence_cache,
core->mempool,
&s->silence,
&s->sample_spec,
0);
s->min_latency = DEFAULT_MIN_LATENCY; s->min_latency = DEFAULT_MIN_LATENCY;
s->max_latency = s->min_latency; s->max_latency = s->min_latency;
@ -239,6 +251,7 @@ pa_sink* pa_sink_new(
} }
s->monitor_source->monitor_of = s; s->monitor_source->monitor_of = s;
pa_source_set_max_rewind(s->monitor_source, s->thread_info.max_rewind);
return s; return s;
} }
@ -336,14 +349,7 @@ void pa_sink_unlink(pa_sink* s) {
else else
s->state = PA_SINK_UNLINKED; s->state = PA_SINK_UNLINKED;
s->get_latency = NULL; reset_callbacks(s);
s->get_volume = NULL;
s->set_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
s->set_state = NULL;
s->request_rewind = NULL;
s->update_requested_latency = NULL;
if (s->monitor_source) if (s->monitor_source)
pa_source_unlink(s->monitor_source); pa_source_unlink(s->monitor_source);
@ -378,8 +384,8 @@ static void sink_free(pa_object *o) {
pa_hashmap_free(s->thread_info.inputs, NULL, NULL); pa_hashmap_free(s->thread_info.inputs, NULL, NULL);
if (s->silence) if (s->silence.memblock)
pa_memblock_unref(s->silence); pa_memblock_unref(s->silence.memblock);
pa_xfree(s->name); pa_xfree(s->name);
pa_xfree(s->driver); pa_xfree(s->driver);
@ -429,37 +435,31 @@ int pa_sink_suspend(pa_sink *s, pa_bool_t suspend) {
return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE); return sink_set_state(s, pa_sink_used_by(s) ? PA_SINK_RUNNING : PA_SINK_IDLE);
} }
void pa_sink_ping(pa_sink *s) { void pa_sink_process_rewind(pa_sink *s, size_t nbytes) {
pa_sink_assert_ref(s);
pa_assert(PA_SINK_LINKED(s->state));
pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SINK_MESSAGE_PING, NULL, 0, NULL, NULL);
}
void pa_sink_process_rewind(pa_sink *s) {
pa_sink_input *i; pa_sink_input *i;
void *state = NULL; void *state = NULL;
pa_sink_assert_ref(s); pa_sink_assert_ref(s);
pa_assert(PA_SINK_LINKED(s->state)); pa_assert(PA_SINK_LINKED(s->state));
if (s->thread_info.rewind_nbytes <= 0) if (nbytes <= 0)
return; return;
pa_log_debug("Processing rewind..."); pa_log_debug("Processing rewind...");
while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) { while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) {
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
pa_sink_input_process_rewind(i, nbytes);
pa_sink_input_rewind(i, s->thread_info.rewind_nbytes);
} }
s->thread_info.rewind_nbytes = 0; if (s->monitor_source && PA_SOURCE_OPENED(pa_source_get_state(s->monitor_source)))
pa_source_process_rewind(s->monitor_source, nbytes);
} }
static unsigned fill_mix_info(pa_sink *s, size_t length, pa_mix_info *info, unsigned maxinfo) { static unsigned fill_mix_info(pa_sink *s, size_t *length, pa_mix_info *info, unsigned maxinfo) {
pa_sink_input *i; pa_sink_input *i;
unsigned n = 0; unsigned n = 0;
void *state = NULL; void *state = NULL;
size_t mixlength = *length;
pa_sink_assert_ref(s); pa_sink_assert_ref(s);
pa_assert(info); pa_assert(info);
@ -467,9 +467,12 @@ static unsigned fill_mix_info(pa_sink *s, size_t length, pa_mix_info *info, unsi
while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)) && maxinfo > 0) { while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)) && maxinfo > 0) {
pa_sink_input_assert_ref(i); pa_sink_input_assert_ref(i);
if (pa_sink_input_peek(i, length, &info->chunk, &info->volume) < 0) if (pa_sink_input_peek(i, *length, &info->chunk, &info->volume) < 0)
continue; continue;
if (mixlength == 0 || info->chunk.length < mixlength)
mixlength = info->chunk.length;
if (pa_memblock_is_silence(info->chunk.memblock)) { if (pa_memblock_is_silence(info->chunk.memblock)) {
pa_memblock_unref(info->chunk.memblock); pa_memblock_unref(info->chunk.memblock);
continue; continue;
@ -485,6 +488,9 @@ static unsigned fill_mix_info(pa_sink *s, size_t length, pa_mix_info *info, unsi
maxinfo--; maxinfo--;
} }
if (mixlength > 0)
*length = mixlength;
return n; return n;
} }
@ -569,13 +575,15 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
pa_assert(length > 0); pa_assert(length > 0);
n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, length, info, MAX_MIX_CHANNELS) : 0; n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, &length, info, MAX_MIX_CHANNELS) : 0;
if (n == 0) { if (n == 0) {
result->memblock = pa_memblock_ref(s->silence); *result = s->silence;
result->length = PA_MIN(pa_memblock_get_length(s->silence), length); pa_memblock_ref(result->memblock);
result->index = 0;
if (result->length > length)
result->length = length;
} else if (n == 1) { } else if (n == 1) {
pa_cvolume volume; pa_cvolume volume;
@ -589,6 +597,7 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume); pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume);
if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&volume)) { if (s->thread_info.soft_muted || !pa_cvolume_is_norm(&volume)) {
pa_log("adjusting volume ");
pa_memchunk_make_writable(result, 0); pa_memchunk_make_writable(result, 0);
if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume)) if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume))
pa_silence_memchunk(result, &s->sample_spec); pa_silence_memchunk(result, &s->sample_spec);
@ -600,7 +609,11 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
result->memblock = pa_memblock_new(s->core->mempool, length); result->memblock = pa_memblock_new(s->core->mempool, length);
ptr = pa_memblock_acquire(result->memblock); ptr = pa_memblock_acquire(result->memblock);
result->length = pa_mix(info, n, ptr, length, &s->sample_spec, &s->thread_info.soft_volume, s->thread_info.soft_muted); result->length = pa_mix(info, n,
ptr, length,
&s->sample_spec,
&s->thread_info.soft_volume,
s->thread_info.soft_muted);
pa_memblock_release(result->memblock); pa_memblock_release(result->memblock);
result->index = 0; result->index = 0;
@ -618,6 +631,7 @@ void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result) {
void pa_sink_render_into(pa_sink*s, pa_memchunk *target) { void pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
pa_mix_info info[MAX_MIX_CHANNELS]; pa_mix_info info[MAX_MIX_CHANNELS];
unsigned n; unsigned n;
size_t length, block_size_max;
pa_sink_assert_ref(s); pa_sink_assert_ref(s);
pa_assert(PA_SINK_OPENED(s->thread_info.state)); pa_assert(PA_SINK_OPENED(s->thread_info.state));
@ -630,34 +644,44 @@ void pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
s->thread_info.rewind_nbytes = 0; s->thread_info.rewind_nbytes = 0;
n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, target->length, info, MAX_MIX_CHANNELS) : 0; length = target->length;
block_size_max = pa_mempool_block_size_max(s->core->mempool);
if (length > block_size_max)
length = pa_frame_align(block_size_max, &s->sample_spec);
if (n == 0) n = s->thread_info.state == PA_SINK_RUNNING ? fill_mix_info(s, &length, info, MAX_MIX_CHANNELS) : 0;
pa_silence_memchunk(target, &s->sample_spec);
else if (n == 1) { if (n == 0) {
if (target->length > info[0].chunk.length) if (target->length > length)
target->length = info[0].chunk.length; target->length = length;
if (s->thread_info.soft_muted)
pa_silence_memchunk(target, &s->sample_spec); pa_silence_memchunk(target, &s->sample_spec);
else { } else if (n == 1) {
void *src, *ptr;
pa_cvolume volume; pa_cvolume volume;
ptr = pa_memblock_acquire(target->memblock); if (target->length > length)
src = pa_memblock_acquire(info[0].chunk.memblock); target->length = length;
memcpy((uint8_t*) ptr + target->index,
(uint8_t*) src + info[0].chunk.index,
target->length);
pa_memblock_release(target->memblock);
pa_memblock_release(info[0].chunk.memblock);
pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume); pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume);
if (!pa_cvolume_is_norm(&volume)) if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume))
pa_volume_memchunk(target, &s->sample_spec, &volume); pa_silence_memchunk(target, &s->sample_spec);
else {
pa_memchunk vchunk;
vchunk = info[0].chunk;
pa_memblock_ref(vchunk.memblock);
if (vchunk.length > target->length)
vchunk.length = target->length;
if (!pa_cvolume_is_norm(&volume)) {
pa_memchunk_make_writable(&vchunk, 0);
pa_volume_memchunk(&vchunk, &s->sample_spec, &volume);
}
pa_memchunk_memcpy(target, &vchunk);
pa_memblock_unref(vchunk.memblock);
} }
} else { } else {
@ -666,8 +690,7 @@ void pa_sink_render_into(pa_sink*s, pa_memchunk *target) {
ptr = pa_memblock_acquire(target->memblock); ptr = pa_memblock_acquire(target->memblock);
target->length = pa_mix(info, n, target->length = pa_mix(info, n,
(uint8_t*) ptr + target->index, (uint8_t*) ptr + target->index, length,
target->length,
&s->sample_spec, &s->sample_spec,
&s->thread_info.soft_volume, &s->thread_info.soft_volume,
s->thread_info.soft_muted); s->thread_info.soft_muted);
@ -923,12 +946,11 @@ unsigned pa_sink_used_by(pa_sink *s) {
ret = pa_idxset_size(s->inputs); ret = pa_idxset_size(s->inputs);
pa_assert(ret >= s->n_corked); pa_assert(ret >= s->n_corked);
ret -= s->n_corked;
/* Streams connected to our monitor source do not matter for /* Streams connected to our monitor source do not matter for
* pa_sink_used_by()!.*/ * pa_sink_used_by()!.*/
return ret; return ret - s->n_corked;
} }
int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) {
@ -973,7 +995,9 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
pa_sink_invalidate_requested_latency(s); pa_sink_invalidate_requested_latency(s);
/* Make sure we're not rewound when the hw buffer is remixed and request a remix*/
i->thread_info.ignore_rewind = TRUE; i->thread_info.ignore_rewind = TRUE;
i->thread_info.since_underrun = 0;
pa_sink_request_rewind(s, 0); pa_sink_request_rewind(s, 0);
return 0; return 0;
@ -1124,9 +1148,6 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse
*((pa_bool_t*) userdata) = s->thread_info.soft_muted; *((pa_bool_t*) userdata) = s->thread_info.soft_muted;
return 0; return 0;
case PA_SINK_MESSAGE_PING:
return 0;
case PA_SINK_MESSAGE_SET_STATE: case PA_SINK_MESSAGE_SET_STATE:
s->thread_info.state = PA_PTR_TO_UINT(userdata); s->thread_info.state = PA_PTR_TO_UINT(userdata);
@ -1286,7 +1307,6 @@ void pa_sink_set_max_rewind(pa_sink *s, size_t max_rewind) {
void *state = NULL; void *state = NULL;
pa_sink_assert_ref(s); pa_sink_assert_ref(s);
pa_assert(PA_SINK_LINKED(s->thread_info.state));
if (max_rewind == s->thread_info.max_rewind) if (max_rewind == s->thread_info.max_rewind)
return; return;
@ -1295,6 +1315,9 @@ void pa_sink_set_max_rewind(pa_sink *s, size_t max_rewind) {
while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL))) while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)))
pa_sink_input_set_max_rewind(i, s->thread_info.max_rewind); pa_sink_input_set_max_rewind(i, s->thread_info.max_rewind);
if (s->monitor_source)
pa_source_set_max_rewind(s->monitor_source, s->thread_info.max_rewind);
} }
void pa_sink_invalidate_requested_latency(pa_sink *s) { void pa_sink_invalidate_requested_latency(pa_sink *s) {

View file

@ -89,14 +89,14 @@ struct pa_sink {
pa_asyncmsgq *asyncmsgq; pa_asyncmsgq *asyncmsgq;
pa_rtpoll *rtpoll; pa_rtpoll *rtpoll;
pa_memblock *silence; pa_memchunk silence;
pa_usec_t min_latency; /* we won't go below this latency */ pa_usec_t min_latency; /* we won't go below this latency */
pa_usec_t max_latency; /* An upper limit for the latencies */ pa_usec_t max_latency; /* An upper limit for the latencies */
int (*set_state)(pa_sink *s, pa_sink_state_t state); /* may be NULL */ int (*set_state)(pa_sink *s, pa_sink_state_t state); /* may be NULL */
int (*set_volume)(pa_sink *s); /* dito */
int (*get_volume)(pa_sink *s); /* dito */ int (*get_volume)(pa_sink *s); /* dito */
int (*set_volume)(pa_sink *s); /* dito */
int (*get_mute)(pa_sink *s); /* dito */ int (*get_mute)(pa_sink *s); /* dito */
int (*set_mute)(pa_sink *s); /* dito */ int (*set_mute)(pa_sink *s); /* dito */
pa_usec_t (*get_latency)(pa_sink *s); /* dito */ pa_usec_t (*get_latency)(pa_sink *s); /* dito */
@ -138,7 +138,6 @@ typedef enum pa_sink_message {
PA_SINK_MESSAGE_GET_LATENCY, PA_SINK_MESSAGE_GET_LATENCY,
PA_SINK_MESSAGE_GET_REQUESTED_LATENCY, PA_SINK_MESSAGE_GET_REQUESTED_LATENCY,
PA_SINK_MESSAGE_SET_STATE, PA_SINK_MESSAGE_SET_STATE,
PA_SINK_MESSAGE_PING,
PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER, PA_SINK_MESSAGE_REMOVE_INPUT_AND_BUFFER,
PA_SINK_MESSAGE_ATTACH, PA_SINK_MESSAGE_ATTACH,
PA_SINK_MESSAGE_DETACH, PA_SINK_MESSAGE_DETACH,
@ -199,13 +198,6 @@ int pa_sink_update_status(pa_sink*s);
int pa_sink_suspend(pa_sink *s, pa_bool_t suspend); int pa_sink_suspend(pa_sink *s, pa_bool_t suspend);
int pa_sink_suspend_all(pa_core *c, pa_bool_t suspend); int pa_sink_suspend_all(pa_core *c, pa_bool_t suspend);
void pa_sink_rewind(pa_sink *s, size_t length);
/* Sends a ping message to the sink thread, to make it wake up and
* check for data to process even if there is no real message is
* sent */
void pa_sink_ping(pa_sink *s);
void pa_sink_set_volume(pa_sink *sink, const pa_cvolume *volume); void pa_sink_set_volume(pa_sink *sink, const pa_cvolume *volume);
const pa_cvolume *pa_sink_get_volume(pa_sink *sink); const pa_cvolume *pa_sink_get_volume(pa_sink *sink);
void pa_sink_set_mute(pa_sink *sink, pa_bool_t mute); void pa_sink_set_mute(pa_sink *sink, pa_bool_t mute);
@ -217,14 +209,14 @@ unsigned pa_sink_used_by(pa_sink *s); /* Number of connected streams which are n
/* To be called exclusively by the sink driver, from IO context */ /* To be called exclusively by the sink driver, from IO context */
void pa_sink_process_rewind(pa_sink *s);
void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result); void pa_sink_render(pa_sink*s, size_t length, pa_memchunk *result);
void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result); void pa_sink_render_full(pa_sink *s, size_t length, pa_memchunk *result);
void pa_sink_render_into(pa_sink*s, pa_memchunk *target); void pa_sink_render_into(pa_sink*s, pa_memchunk *target);
void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target); void pa_sink_render_into_full(pa_sink *s, pa_memchunk *target);
void pa_sink_skip(pa_sink *s, size_t length); void pa_sink_skip(pa_sink *s, size_t length);
void pa_sink_process_rewind(pa_sink *s, size_t nbytes);
int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk); int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);
void pa_sink_attach_within_thread(pa_sink *s); void pa_sink_attach_within_thread(pa_sink *s);

View file

@ -235,7 +235,7 @@ int pa_play_file(
pa_sample_spec ss; pa_sample_spec ss;
pa_sink_input_new_data data; pa_sink_input_new_data data;
int fd; int fd;
pa_memblock *silence; pa_memchunk silence;
pa_assert(sink); pa_assert(sink);
pa_assert(fname); pa_assert(fname);
@ -336,13 +336,9 @@ int pa_play_file(
u->sink_input->kill = sink_input_kill_cb; u->sink_input->kill = sink_input_kill_cb;
u->sink_input->userdata = u; u->sink_input->userdata = u;
silence = pa_silence_memblock_new( pa_sink_input_get_silence(u->sink_input, &silence);
u->core->mempool, u->memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0, pa_frame_size(&u->sink_input->sample_spec), 1, 1, 0, &silence);
&u->sink_input->sample_spec, pa_memblock_unref(silence.memblock);
u->sink_input->thread_info.resampler ? pa_resampler_max_block_size(u->sink_input->thread_info.resampler) : 0);
u->memblockq = pa_memblockq_new(0, MEMBLOCKQ_MAXLENGTH, 0, pa_frame_size(&u->sink_input->sample_spec), 1, 1, 0, silence);
pa_memblock_unref(silence);
pa_sink_input_put(u->sink_input); pa_sink_input_put(u->sink_input);

View file

@ -40,6 +40,8 @@
#include "source-output.h" #include "source-output.h"
#define MEMBLOCKQ_MAXLENGTH (32*1024*1024)
static PA_DEFINE_CHECK_TYPE(pa_source_output, pa_msgobject); static PA_DEFINE_CHECK_TYPE(pa_source_output, pa_msgobject);
static void source_output_free(pa_object* mo); static void source_output_free(pa_object* mo);
@ -74,6 +76,20 @@ void pa_source_output_new_data_done(pa_source_output_new_data *data) {
pa_proplist_free(data->proplist); pa_proplist_free(data->proplist);
} }
static void reset_callbacks(pa_source_output *o) {
pa_assert(o);
o->push = NULL;
o->rewind = NULL;
o->set_max_rewind = NULL;
o->attach = NULL;
o->detach = NULL;
o->suspend = NULL;
o->moved = NULL;
o->kill = NULL;
o->get_latency = NULL;
}
pa_source_output* pa_source_output_new( pa_source_output* pa_source_output_new(
pa_core *core, pa_core *core,
pa_source_output_new_data *data, pa_source_output_new_data *data,
@ -175,13 +191,7 @@ pa_source_output* pa_source_output_new(
o->sample_spec = data->sample_spec; o->sample_spec = data->sample_spec;
o->channel_map = data->channel_map; o->channel_map = data->channel_map;
o->push = NULL; reset_callbacks(o);
o->kill = NULL;
o->get_latency = NULL;
o->detach = NULL;
o->attach = NULL;
o->suspend = NULL;
o->moved = NULL;
o->userdata = NULL; o->userdata = NULL;
o->thread_info.state = o->state; o->thread_info.state = o->state;
@ -190,6 +200,16 @@ pa_source_output* pa_source_output_new(
o->thread_info.resampler = resampler; o->thread_info.resampler = resampler;
o->thread_info.requested_source_latency = 0; o->thread_info.requested_source_latency = 0;
o->thread_info.delay_memblockq = pa_memblockq_new(
0,
MEMBLOCKQ_MAXLENGTH,
0,
pa_frame_size(&o->source->sample_spec),
0,
1,
0,
&o->source->silence);
pa_assert_se(pa_idxset_put(core->source_outputs, o, &o->index) == 0); pa_assert_se(pa_idxset_put(core->source_outputs, o, &o->index) == 0);
pa_assert_se(pa_idxset_put(o->source->outputs, pa_source_output_ref(o), NULL) == 0); pa_assert_se(pa_idxset_put(o->source->outputs, pa_source_output_ref(o), NULL) == 0);
@ -205,6 +225,17 @@ pa_source_output* pa_source_output_new(
return o; return o;
} }
static void update_n_corked(pa_source_output *o, pa_source_output_state_t state) {
pa_assert(o);
if (o->state == PA_SOURCE_OUTPUT_CORKED && state != PA_SOURCE_OUTPUT_CORKED)
pa_assert_se(o->source->n_corked -- >= 1);
else if (o->state != PA_SOURCE_OUTPUT_CORKED && state == PA_SOURCE_OUTPUT_CORKED)
o->source->n_corked++;
pa_source_update_status(o->source);
}
static int source_output_set_state(pa_source_output *o, pa_source_output_state_t state) { static int source_output_set_state(pa_source_output *o, pa_source_output_state_t state) {
pa_assert(o); pa_assert(o);
@ -214,12 +245,7 @@ static int source_output_set_state(pa_source_output *o, pa_source_output_state_t
if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0) if (pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o), PA_SOURCE_OUTPUT_MESSAGE_SET_STATE, PA_UINT_TO_PTR(state), 0, NULL) < 0)
return -1; return -1;
if (o->state == PA_SOURCE_OUTPUT_CORKED && state != PA_SOURCE_OUTPUT_CORKED) update_n_corked(o, state);
pa_assert_se(o->source->n_corked -- >= 1);
else if (o->state != PA_SOURCE_OUTPUT_CORKED && state == PA_SOURCE_OUTPUT_CORKED)
o->source->n_corked++;
pa_source_update_status(o->source);
o->state = state; o->state = state;
if (state != PA_SOURCE_OUTPUT_UNLINKED) if (state != PA_SOURCE_OUTPUT_UNLINKED)
@ -253,13 +279,7 @@ void pa_source_output_unlink(pa_source_output*o) {
} else } else
o->state = PA_SOURCE_OUTPUT_UNLINKED; o->state = PA_SOURCE_OUTPUT_UNLINKED;
o->push = NULL; reset_callbacks(o);
o->kill = NULL;
o->get_latency = NULL;
o->attach = NULL;
o->detach = NULL;
o->suspend = NULL;
o->moved = NULL;
if (linked) { if (linked) {
pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index);
@ -282,6 +302,9 @@ static void source_output_free(pa_object* mo) {
pa_assert(!o->thread_info.attached); pa_assert(!o->thread_info.attached);
if (o->thread_info.delay_memblockq)
pa_memblockq_free(o->thread_info.delay_memblockq);
if (o->thread_info.resampler) if (o->thread_info.resampler)
pa_resampler_free(o->thread_info.resampler); pa_resampler_free(o->thread_info.resampler);
@ -293,17 +316,17 @@ static void source_output_free(pa_object* mo) {
} }
void pa_source_output_put(pa_source_output *o) { void pa_source_output_put(pa_source_output *o) {
pa_source_output_state_t state;
pa_source_output_assert_ref(o); pa_source_output_assert_ref(o);
pa_assert(o->state == PA_SOURCE_OUTPUT_INIT); pa_assert(o->state == PA_SOURCE_OUTPUT_INIT);
pa_assert(o->push); pa_assert(o->push);
o->thread_info.state = o->state = o->flags & PA_SOURCE_OUTPUT_START_CORKED ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING; state = o->flags & PA_SOURCE_OUTPUT_START_CORKED ? PA_SOURCE_OUTPUT_CORKED : PA_SOURCE_OUTPUT_RUNNING;
if (o->state == PA_SOURCE_OUTPUT_CORKED) update_n_corked(o, state);
o->source->n_corked++; o->thread_info.state = o->state = state;
pa_source_update_status(o->source);
pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL); pa_asyncmsgq_send(o->source->asyncmsgq, PA_MSGOBJECT(o->source), PA_SOURCE_MESSAGE_ADD_OUTPUT, o, 0, NULL);
pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index);
@ -335,7 +358,8 @@ pa_usec_t pa_source_output_get_latency(pa_source_output *o) {
/* Called from thread context */ /* Called from thread context */
void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) { void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
pa_memchunk rchunk; size_t length;
size_t limit;
pa_source_output_assert_ref(o); pa_source_output_assert_ref(o);
pa_assert(PA_SOURCE_OUTPUT_LINKED(o->thread_info.state)); pa_assert(PA_SOURCE_OUTPUT_LINKED(o->thread_info.state));
@ -347,23 +371,83 @@ void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk) {
pa_assert(o->state == PA_SOURCE_OUTPUT_RUNNING); pa_assert(o->state == PA_SOURCE_OUTPUT_RUNNING);
if (!o->thread_info.resampler) { if (pa_memblockq_push(o->thread_info.delay_memblockq, chunk) < 0) {
o->push(o, chunk); pa_log_debug("Delay queue overflow!");
return; pa_memblockq_seek(o->thread_info.delay_memblockq, chunk->length, PA_SEEK_RELATIVE);
} }
pa_resampler_run(o->thread_info.resampler, chunk, &rchunk); limit = o->rewind ? 0 : o->source->thread_info.max_rewind;
if (!rchunk.length)
/* Implement the delay queue */
while ((length = pa_memblockq_get_length(o->thread_info.delay_memblockq)) > limit) {
pa_memchunk qchunk;
length -= limit;
pa_assert_se(pa_memblockq_peek(o->thread_info.delay_memblockq, &qchunk) >= 0);
if (qchunk.length > length)
qchunk.length = length;
pa_assert(qchunk.length > 0);
if (!o->thread_info.resampler)
o->push(o, &qchunk);
else {
pa_memchunk rchunk;
pa_resampler_run(o->thread_info.resampler, &qchunk, &rchunk);
if (rchunk.length > 0)
o->push(o, &rchunk);
pa_memblock_unref(rchunk.memblock);
}
pa_memblock_unref(qchunk.memblock);
}
}
/* Called from thread context */
void pa_source_output_process_rewind(pa_source_output *o, size_t nbytes /* in sink sample spec */) {
pa_source_output_assert_ref(o);
pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state));
pa_assert(pa_frame_aligned(nbytes, &o->source->sample_spec));
if (nbytes <= 0)
return; return;
pa_assert(rchunk.memblock); if (o->rewind) {
o->push(o, &rchunk); pa_assert(pa_memblockq_get_length(o->thread_info.delay_memblockq) == 0);
pa_memblock_unref(rchunk.memblock);
if (o->thread_info.resampler)
nbytes = pa_resampler_result(o->thread_info.resampler, nbytes);
pa_log_debug("Have to rewind %lu bytes on implementor.", (unsigned long) nbytes);
if (nbytes > 0)
o->rewind(o, nbytes);
if (o->thread_info.resampler)
pa_resampler_reset(o->thread_info.resampler);
} else
pa_memblockq_rewind(o->thread_info.delay_memblockq, nbytes);
}
/* Called from thread context */
void pa_source_output_set_max_rewind(pa_source_output *o, size_t nbytes /* in the source's sample spec */) {
pa_source_output_assert_ref(o);
pa_assert(PA_SOURCE_OUTPUT_LINKED(o->thread_info.state));
pa_assert(pa_frame_aligned(nbytes, &o->source->sample_spec));
if (o->set_max_rewind)
o->set_max_rewind(o, o->thread_info.resampler ? pa_resampler_result(o->thread_info.resampler, nbytes) : nbytes);
} }
pa_usec_t pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t usec) { pa_usec_t pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t usec) {
pa_source_output_assert_ref(o); pa_source_output_assert_ref(o);
pa_assert(PA_SOURCE_OUTPUT_LINKED(o->state));
if (usec > 0) { if (usec > 0) {
@ -372,7 +456,6 @@ pa_usec_t pa_source_output_set_requested_latency(pa_source_output *o, pa_usec_t
if (o->source->min_latency > 0 && usec < o->source->min_latency) if (o->source->min_latency > 0 && usec < o->source->min_latency)
usec = o->source->min_latency; usec = o->source->min_latency;
} }
if (PA_SOURCE_OUTPUT_LINKED(o->state)) if (PA_SOURCE_OUTPUT_LINKED(o->state))
@ -535,6 +618,14 @@ int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int
switch (code) { switch (code) {
case PA_SOURCE_OUTPUT_MESSAGE_GET_LATENCY: {
pa_usec_t *r = userdata;
*r += pa_bytes_to_usec(pa_memblockq_get_length(o->thread_info.delay_memblockq), &o->source->sample_spec);
return 0;
}
case PA_SOURCE_OUTPUT_MESSAGE_SET_RATE: { case PA_SOURCE_OUTPUT_MESSAGE_SET_RATE: {
o->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata); o->thread_info.sample_spec.rate = PA_PTR_TO_UINT(userdata);

View file

@ -82,6 +82,14 @@ struct pa_source_output {
* context. */ * context. */
void (*push)(pa_source_output *o, const pa_memchunk *chunk); void (*push)(pa_source_output *o, const pa_memchunk *chunk);
/* Only relevant for monitor sources right now: called when the
* recorded stream is rewound. */
void (*rewind)(pa_source_output *o, size_t nbytes);
/* Called whenever the maximum rewindable size of the source
* changes. Called from RT context. */
void (*set_max_rewind) (pa_source_output *o, size_t nbytes); /* may be NULL */
/* If non-NULL this function is called when the output is first /* If non-NULL this function is called when the output is first
* connected to a source. Called from IO thread context */ * connected to a source. Called from IO thread context */
void (*attach) (pa_source_output *o); /* may be NULL */ void (*attach) (pa_source_output *o); /* may be NULL */
@ -117,6 +125,10 @@ struct pa_source_output {
pa_resampler* resampler; /* may be NULL */ pa_resampler* resampler; /* may be NULL */
/* We maintain a delay memblockq here for source outputs that
* don't implement rewind() */
pa_memblockq *delay_memblockq;
/* The requested latency for the source */ /* The requested latency for the source */
pa_usec_t requested_source_latency; pa_usec_t requested_source_latency;
} thread_info; } thread_info;
@ -196,6 +208,8 @@ int pa_source_output_move_to(pa_source_output *o, pa_source *dest);
/* To be used exclusively by the source driver thread */ /* To be used exclusively by the source driver thread */
void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk); void pa_source_output_push(pa_source_output *o, const pa_memchunk *chunk);
void pa_source_output_process_rewind(pa_source_output *o, size_t nbytes);
void pa_source_output_set_max_rewind(pa_source_output *o, size_t nbytes);
int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk *chunk); int pa_source_output_process_msg(pa_msgobject *mo, int code, void *userdata, int64_t offset, pa_memchunk *chunk);

View file

@ -99,6 +99,18 @@ void pa_source_new_data_done(pa_source_new_data *data) {
pa_proplist_free(data->proplist); pa_proplist_free(data->proplist);
} }
static void reset_callbacks(pa_source *s) {
pa_assert(s);
s->set_state = NULL;
s->get_volume = NULL;
s->set_volume = NULL;
s->get_mute = NULL;
s->set_mute = NULL;
s->get_latency = NULL;
s->update_requested_latency = NULL;
}
pa_source* pa_source_new( pa_source* pa_source_new(
pa_core *core, pa_core *core,
pa_source_new_data *data, pa_source_new_data *data,
@ -171,25 +183,27 @@ pa_source* pa_source_new(
s->muted = data->muted; s->muted = data->muted;
s->refresh_volume = s->refresh_muted = FALSE; s->refresh_volume = s->refresh_muted = FALSE;
s->min_latency = DEFAULT_MIN_LATENCY; reset_callbacks(s);
s->max_latency = s->min_latency;
s->get_latency = NULL;
s->set_volume = NULL;
s->get_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
s->set_state = NULL;
s->update_requested_latency = NULL;
s->userdata = NULL; s->userdata = NULL;
s->asyncmsgq = NULL; s->asyncmsgq = NULL;
s->rtpoll = NULL; s->rtpoll = NULL;
pa_silence_memchunk_get(
&core->silence_cache,
core->mempool,
&s->silence,
&s->sample_spec,
0);
s->min_latency = DEFAULT_MIN_LATENCY;
s->max_latency = s->min_latency;
s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func); s->thread_info.outputs = pa_hashmap_new(pa_idxset_trivial_hash_func, pa_idxset_trivial_compare_func);
s->thread_info.soft_volume = s->volume; s->thread_info.soft_volume = s->volume;
s->thread_info.soft_muted = s->muted; s->thread_info.soft_muted = s->muted;
s->thread_info.state = s->state; s->thread_info.state = s->state;
s->thread_info.max_rewind = 0;
s->thread_info.requested_latency_valid = TRUE; s->thread_info.requested_latency_valid = TRUE;
s->thread_info.requested_latency = 0; s->thread_info.requested_latency = 0;
@ -247,8 +261,8 @@ void pa_source_put(pa_source *s) {
pa_source_assert_ref(s); pa_source_assert_ref(s);
pa_assert(s->state == PA_SINK_INIT); pa_assert(s->state == PA_SINK_INIT);
pa_assert(s->rtpoll);
pa_assert(s->asyncmsgq); pa_assert(s->asyncmsgq);
pa_assert(s->rtpoll);
pa_assert(!s->min_latency || !s->max_latency || s->min_latency <= s->max_latency); pa_assert(!s->min_latency || !s->max_latency || s->min_latency <= s->max_latency);
@ -290,13 +304,7 @@ void pa_source_unlink(pa_source *s) {
else else
s->state = PA_SOURCE_UNLINKED; s->state = PA_SOURCE_UNLINKED;
s->get_latency = NULL; reset_callbacks(s);
s->get_volume = NULL;
s->set_volume = NULL;
s->set_mute = NULL;
s->get_mute = NULL;
s->set_state = NULL;
s->update_requested_latency = NULL;
if (linked) { if (linked) {
pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index);
@ -323,6 +331,9 @@ static void source_free(pa_object *o) {
pa_hashmap_free(s->thread_info.outputs, NULL, NULL); pa_hashmap_free(s->thread_info.outputs, NULL, NULL);
if (s->silence.memblock)
pa_memblock_unref(s->silence.memblock);
pa_xfree(s->name); pa_xfree(s->name);
pa_xfree(s->driver); pa_xfree(s->driver);
@ -332,6 +343,20 @@ static void source_free(pa_object *o) {
pa_xfree(s); pa_xfree(s);
} }
void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
pa_source_assert_ref(s);
pa_assert(q);
s->asyncmsgq = q;
}
void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
pa_source_assert_ref(s);
pa_assert(p);
s->rtpoll = p;
}
int pa_source_update_status(pa_source*s) { int pa_source_update_status(pa_source*s) {
pa_source_assert_ref(s); pa_source_assert_ref(s);
pa_assert(PA_SOURCE_LINKED(s->state)); pa_assert(PA_SOURCE_LINKED(s->state));
@ -352,11 +377,22 @@ int pa_source_suspend(pa_source *s, pa_bool_t suspend) {
return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE); return source_set_state(s, pa_source_used_by(s) ? PA_SOURCE_RUNNING : PA_SOURCE_IDLE);
} }
void pa_source_ping(pa_source *s) { void pa_source_process_rewind(pa_source *s, size_t nbytes) {
pa_source_assert_ref(s); pa_source_output *o;
pa_assert(PA_SOURCE_LINKED(s->state)); void *state = NULL;
pa_asyncmsgq_post(s->asyncmsgq, PA_MSGOBJECT(s), PA_SOURCE_MESSAGE_PING, NULL, 0, NULL, NULL); pa_source_assert_ref(s);
pa_assert(PA_SOURCE_OPENED(s->thread_info.state));
if (nbytes <= 0)
return;
pa_log_debug("Processing rewind...");
while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
pa_source_output_assert_ref(o);
pa_source_output_process_rewind(o, nbytes);
}
} }
void pa_source_post(pa_source*s, const pa_memchunk *chunk) { void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
@ -381,15 +417,19 @@ void pa_source_post(pa_source*s, const pa_memchunk *chunk) {
else else
pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume); pa_volume_memchunk(&vchunk, &s->sample_spec, &s->thread_info.soft_volume);
while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
pa_source_output_assert_ref(o);
pa_source_output_push(o, &vchunk); pa_source_output_push(o, &vchunk);
}
pa_memblock_unref(vchunk.memblock); pa_memblock_unref(vchunk.memblock);
} else { } else {
while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL))) {
pa_source_output_assert_ref(o);
pa_source_output_push(o, chunk); pa_source_output_push(o, chunk);
} }
}
} }
pa_usec_t pa_source_get_latency(pa_source *s) { pa_usec_t pa_source_get_latency(pa_source *s) {
@ -497,6 +537,7 @@ void pa_source_set_description(pa_source *s, const char *description) {
return; return;
old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION); old = pa_proplist_gets(s->proplist, PA_PROP_DEVICE_DESCRIPTION);
if (old && description && !strcmp(old, description)) if (old && description && !strcmp(old, description))
return; return;
@ -511,20 +552,6 @@ void pa_source_set_description(pa_source *s, const char *description) {
} }
} }
void pa_source_set_asyncmsgq(pa_source *s, pa_asyncmsgq *q) {
pa_source_assert_ref(s);
pa_assert(q);
s->asyncmsgq = q;
}
void pa_source_set_rtpoll(pa_source *s, pa_rtpoll *p) {
pa_source_assert_ref(s);
pa_assert(p);
s->rtpoll = p;
}
unsigned pa_source_linked_by(pa_source *s) { unsigned pa_source_linked_by(pa_source *s) {
pa_source_assert_ref(s); pa_source_assert_ref(s);
pa_assert(PA_SOURCE_LINKED(s->state)); pa_assert(PA_SOURCE_LINKED(s->state));
@ -555,6 +582,8 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_
pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o)); pa_hashmap_put(s->thread_info.outputs, PA_UINT32_TO_PTR(o->index), pa_source_output_ref(o));
pa_source_output_set_max_rewind(o, s->thread_info.max_rewind);
pa_assert(!o->thread_info.attached); pa_assert(!o->thread_info.attached);
o->thread_info.attached = TRUE; o->thread_info.attached = TRUE;
@ -599,9 +628,6 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_
*((pa_bool_t*) userdata) = s->thread_info.soft_muted; *((pa_bool_t*) userdata) = s->thread_info.soft_muted;
return 0; return 0;
case PA_SOURCE_MESSAGE_PING:
return 0;
case PA_SOURCE_MESSAGE_SET_STATE: case PA_SOURCE_MESSAGE_SET_STATE:
s->thread_info.state = PA_PTR_TO_UINT(userdata); s->thread_info.state = PA_PTR_TO_UINT(userdata);
return 0; return 0;
@ -731,6 +757,21 @@ pa_usec_t pa_source_get_requested_latency(pa_source *s) {
return usec; return usec;
} }
void pa_source_set_max_rewind(pa_source *s, size_t max_rewind) {
pa_source_output *o;
void *state = NULL;
pa_source_assert_ref(s);
if (max_rewind == s->thread_info.max_rewind)
return;
s->thread_info.max_rewind = max_rewind;
while ((o = pa_hashmap_iterate(s->thread_info.outputs, &state, NULL)))
pa_source_output_set_max_rewind(o, s->thread_info.max_rewind);
}
void pa_source_invalidate_requested_latency(pa_source *s) { void pa_source_invalidate_requested_latency(pa_source *s) {
pa_source_assert_ref(s); pa_source_assert_ref(s);

View file

@ -91,6 +91,8 @@ struct pa_source {
pa_asyncmsgq *asyncmsgq; pa_asyncmsgq *asyncmsgq;
pa_rtpoll *rtpoll; pa_rtpoll *rtpoll;
pa_memchunk silence;
pa_usec_t min_latency; /* we won't go below this latency setting */ pa_usec_t min_latency; /* we won't go below this latency setting */
pa_usec_t max_latency; /* An upper limit for the latencies */ pa_usec_t max_latency; /* An upper limit for the latencies */
@ -112,6 +114,10 @@ struct pa_source {
pa_bool_t requested_latency_valid; pa_bool_t requested_latency_valid;
size_t requested_latency; size_t requested_latency;
/* Then number of bytes this source will be rewound for at
* max */
size_t max_rewind;
} thread_info; } thread_info;
void *userdata; void *userdata;
@ -130,7 +136,6 @@ typedef enum pa_source_message {
PA_SOURCE_MESSAGE_GET_LATENCY, PA_SOURCE_MESSAGE_GET_LATENCY,
PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY, PA_SOURCE_MESSAGE_GET_REQUESTED_LATENCY,
PA_SOURCE_MESSAGE_SET_STATE, PA_SOURCE_MESSAGE_SET_STATE,
PA_SOURCE_MESSAGE_PING,
PA_SOURCE_MESSAGE_ATTACH, PA_SOURCE_MESSAGE_ATTACH,
PA_SOURCE_MESSAGE_DETACH, PA_SOURCE_MESSAGE_DETACH,
PA_SOURCE_MESSAGE_MAX PA_SOURCE_MESSAGE_MAX
@ -189,8 +194,6 @@ int pa_source_update_status(pa_source*s);
int pa_source_suspend(pa_source *s, pa_bool_t suspend); int pa_source_suspend(pa_source *s, pa_bool_t suspend);
int pa_source_suspend_all(pa_core *c, pa_bool_t suspend); int pa_source_suspend_all(pa_core *c, pa_bool_t suspend);
void pa_source_ping(pa_source *s);
void pa_source_set_volume(pa_source *source, const pa_cvolume *volume); void pa_source_set_volume(pa_source *source, const pa_cvolume *volume);
const pa_cvolume *pa_source_get_volume(pa_source *source); const pa_cvolume *pa_source_get_volume(pa_source *source);
void pa_source_set_mute(pa_source *source, pa_bool_t mute); void pa_source_set_mute(pa_source *source, pa_bool_t mute);
@ -203,6 +206,7 @@ unsigned pa_source_used_by(pa_source *s); /* Number of connected streams that ar
/* To be called exclusively by the source driver, from IO context */ /* To be called exclusively by the source driver, from IO context */
void pa_source_post(pa_source*s, const pa_memchunk *b); void pa_source_post(pa_source*s, const pa_memchunk *b);
void pa_source_process_rewind(pa_source *s, size_t nbytes);
int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64_t, pa_memchunk *chunk); int pa_source_process_msg(pa_msgobject *o, int code, void *userdata, int64_t, pa_memchunk *chunk);
@ -211,6 +215,8 @@ void pa_source_detach_within_thread(pa_source *s);
pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s); pa_usec_t pa_source_get_requested_latency_within_thread(pa_source *s);
void pa_source_set_max_rewind(pa_source *s, size_t max_rewind);
/* To be called exclusively by source output drivers, from IO context */ /* To be called exclusively by source output drivers, from IO context */
void pa_source_invalidate_requested_latency(pa_source *s); void pa_source_invalidate_requested_latency(pa_source *s);

View file

@ -61,16 +61,18 @@ int main(int argc, char *argv[]) {
pa_mempool *p; pa_mempool *p;
pa_memblockq *bq; pa_memblockq *bq;
pa_memchunk chunk1, chunk2, chunk3, chunk4; pa_memchunk chunk1, chunk2, chunk3, chunk4;
pa_memblock *silence; pa_memchunk silence;
pa_log_set_maximal_level(PA_LOG_DEBUG); pa_log_set_maximal_level(PA_LOG_DEBUG);
p = pa_mempool_new(0); p = pa_mempool_new(0);
silence = pa_memblock_new_fixed(p, (char*) "__", 2, 1); silence.memblock = pa_memblock_new_fixed(p, (char*) "__", 2, 1);
assert(silence); assert(silence.memblock);
silence.index = 0;
silence.length = pa_memblock_get_length(silence.memblock);
bq = pa_memblockq_new(0, 40, 10, 2, 4, 4, 40, silence); bq = pa_memblockq_new(0, 40, 10, 2, 4, 4, 40, &silence);
assert(bq); assert(bq);
chunk1.memblock = pa_memblock_new_fixed(p, (char*) "11", 2, 1); chunk1.memblock = pa_memblock_new_fixed(p, (char*) "11", 2, 1);
@ -152,7 +154,7 @@ int main(int argc, char *argv[]) {
dump(bq); dump(bq);
pa_memblockq_free(bq); pa_memblockq_free(bq);
pa_memblock_unref(silence); pa_memblock_unref(silence.memblock);
pa_memblock_unref(chunk1.memblock); pa_memblock_unref(chunk1.memblock);
pa_memblock_unref(chunk2.memblock); pa_memblock_unref(chunk2.memblock);
pa_memblock_unref(chunk3.memblock); pa_memblock_unref(chunk3.memblock);