diff --git a/Cargo.lock b/Cargo.lock index a20a71803777cf0a5f89a8d6776797b27800be8c..bb3b3d31e5d13e2f59d1adbda927efc6e6884b92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,6 +1411,7 @@ dependencies = [ "log", "parking_lot", "rodio", + "rubato", "serde", "settings", "smol", @@ -13511,6 +13512,18 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad8388ea1a9e0ea807e442e8263a699e7edcb320ecbcd21b4fa8ff859acce3ba" +[[package]] +name = "rubato" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5258099699851cfd0082aeb645feb9c084d9a5e1f1b8d5372086b989fc5e56a1" +dependencies = [ + "num-complex", + "num-integer", + "num-traits", + "realfft", +] + [[package]] name = "rules_library" version = "0.1.0" diff --git a/crates/audio/Cargo.toml b/crates/audio/Cargo.toml index 7f2fed80e2315e51fca7d8477b04885998336632..671439855189839fedd60313224e113a600e27dc 100644 --- a/crates/audio/Cargo.toml +++ b/crates/audio/Cargo.toml @@ -22,6 +22,7 @@ denoise = { path = "../denoise" } log.workspace = true parking_lot.workspace = true rodio = { workspace = true, features = [ "wav", "playback", "wav_output" ] } +rubato = "0.16.2" serde.workspace = true settings.workspace = true smol.workspace = true diff --git a/crates/audio/src/rodio_ext.rs b/crates/audio/src/rodio_ext.rs index af4cc89252dfdc1498471ec7ac09b56d59b62eca..0fe5b449ad8a67630faf2211bc5aafd3f9ae4900 100644 --- a/crates/audio/src/rodio_ext.rs +++ b/crates/audio/src/rodio_ext.rs @@ -1,25 +1,16 @@ -use std::{ - num::NonZero, - sync::{ - Arc, Mutex, - atomic::{AtomicBool, Ordering}, - }, - time::Duration, -}; - -use crossbeam::queue::ArrayQueue; +use std::{num::NonZero, time::Duration}; + use denoise::{Denoiser, DenoiserError}; use log::warn; -use rodio::{ - ChannelCount, Sample, SampleRate, Source, conversions::SampleRateConverter, nz, - source::UniformSourceIterator, -}; +use rodio::{ChannelCount, Sample, SampleRate, Source, conversions::ChannelCountConverter, nz}; -const MAX_CHANNELS: usize = 8; +use crate::rodio_ext::resample::FixedResampler; +pub use replayable::{Replay, ReplayDurationTooShort, Replayable}; -#[derive(Debug, thiserror::Error)] -#[error("Replay duration is too short must be >= 100ms")] -pub struct ReplayDurationTooShort; +mod replayable; +mod resample; + +const MAX_CHANNELS: usize = 8; // These all require constant sources (so the span is infinitely long) // this is not guaranteed by rodio however we know it to be true in all our @@ -41,8 +32,8 @@ pub trait RodioExt: Source + Sized { self, channel_count: ChannelCount, sample_rate: SampleRate, - ) -> UniformSourceIterator; - fn constant_samplerate(self, sample_rate: SampleRate) -> ConstantSampleRate; + ) -> ConstantChannelCount>; + fn constant_samplerate(self, sample_rate: SampleRate) -> FixedResampler; fn possibly_disconnected_channels_to_mono(self) -> ToMono; } @@ -81,38 +72,7 @@ impl RodioExt for S { self, duration: Duration, ) -> Result<(Replay, Replayable), ReplayDurationTooShort> { - if duration < Duration::from_millis(100) { - return Err(ReplayDurationTooShort); - } - - let samples_per_second = self.sample_rate().get() as usize * self.channels().get() as usize; - let samples_to_queue = duration.as_secs_f64() * samples_per_second as f64; - let samples_to_queue = - (samples_to_queue as usize).next_multiple_of(self.channels().get().into()); - - let chunk_size = - (samples_per_second.div_ceil(10)).next_multiple_of(self.channels().get() as usize); - let chunks_to_queue = samples_to_queue.div_ceil(chunk_size); - - let is_active = Arc::new(AtomicBool::new(true)); - let queue = Arc::new(ReplayQueue::new(chunks_to_queue, chunk_size)); - Ok(( - Replay { - rx: Arc::clone(&queue), - buffer: Vec::new().into_iter(), - sleep_duration: duration / 2, - sample_rate: self.sample_rate(), - channel_count: self.channels(), - source_is_active: is_active.clone(), - }, - Replayable { - tx: queue, - inner: self, - buffer: Vec::with_capacity(chunk_size), - chunk_size, - is_active, - }, - )) + replayable::replayable(self, duration) } fn take_samples(self, n: usize) -> TakeSamples { TakeSamples { @@ -128,37 +88,37 @@ impl RodioExt for S { self, channel_count: ChannelCount, sample_rate: SampleRate, - ) -> UniformSourceIterator { - UniformSourceIterator::new(self, channel_count, sample_rate) + ) -> ConstantChannelCount> { + ConstantChannelCount::new(self.constant_samplerate(sample_rate), channel_count) } - fn constant_samplerate(self, sample_rate: SampleRate) -> ConstantSampleRate { - ConstantSampleRate::new(self, sample_rate) + fn constant_samplerate(self, sample_rate: SampleRate) -> FixedResampler { + FixedResampler::new(self, sample_rate) } fn possibly_disconnected_channels_to_mono(self) -> ToMono { ToMono::new(self) } } -pub struct ConstantSampleRate { - inner: SampleRateConverter, +pub struct ConstantChannelCount { + inner: ChannelCountConverter, channels: ChannelCount, sample_rate: SampleRate, } -impl ConstantSampleRate { - fn new(source: S, target_rate: SampleRate) -> Self { - let input_sample_rate = source.sample_rate(); - let channels = source.channels(); - let inner = SampleRateConverter::new(source, input_sample_rate, target_rate, channels); +impl ConstantChannelCount { + fn new(source: S, target_channels: ChannelCount) -> Self { + let input_channels = source.channels(); + let sample_rate = source.sample_rate(); + let inner = ChannelCountConverter::new(source, input_channels, target_channels); Self { + sample_rate, inner, - channels, - sample_rate: target_rate, + channels: target_channels, } } } -impl Iterator for ConstantSampleRate { +impl Iterator for ConstantChannelCount { type Item = rodio::Sample; fn next(&mut self) -> Option { @@ -170,7 +130,7 @@ impl Iterator for ConstantSampleRate { } } -impl Source for ConstantSampleRate { +impl Source for ConstantChannelCount { fn current_span_len(&self) -> Option { None } @@ -307,53 +267,6 @@ impl Source for TakeSamples { } } -/// constant source, only works on a single span -#[derive(Debug)] -struct ReplayQueue { - inner: ArrayQueue>, - normal_chunk_len: usize, - /// The last chunk in the queue may be smaller than - /// the normal chunk size. This is always equal to the - /// size of the last element in the queue. - /// (so normally chunk_size) - last_chunk: Mutex>, -} - -impl ReplayQueue { - fn new(queue_len: usize, chunk_size: usize) -> Self { - Self { - inner: ArrayQueue::new(queue_len), - normal_chunk_len: chunk_size, - last_chunk: Mutex::new(Vec::new()), - } - } - /// Returns the length in samples - fn len(&self) -> usize { - self.inner.len().saturating_sub(1) * self.normal_chunk_len - + self - .last_chunk - .lock() - .expect("Self::push_last can not poison this lock") - .len() - } - - fn pop(&self) -> Option> { - self.inner.pop() // removes element that was inserted first - } - - fn push_last(&self, mut samples: Vec) { - let mut last_chunk = self - .last_chunk - .lock() - .expect("Self::len can not poison this lock"); - std::mem::swap(&mut *last_chunk, &mut samples); - } - - fn push_normal(&self, samples: Vec) { - let _pushed_out_of_ringbuf = self.inner.force_push(samples); - } -} - /// constant source, only works on a single span pub struct ProcessBuffer where @@ -487,147 +400,15 @@ where } } -/// constant source, only works on a single span -#[derive(Debug)] -pub struct Replayable { - inner: S, - buffer: Vec, - chunk_size: usize, - tx: Arc, - is_active: Arc, -} - -impl Iterator for Replayable { - type Item = Sample; - - fn next(&mut self) -> Option { - if let Some(sample) = self.inner.next() { - self.buffer.push(sample); - // If the buffer is full send it - if self.buffer.len() == self.chunk_size { - self.tx.push_normal(std::mem::take(&mut self.buffer)); - } - Some(sample) - } else { - let last_chunk = std::mem::take(&mut self.buffer); - self.tx.push_last(last_chunk); - self.is_active.store(false, Ordering::Relaxed); - None - } - } - - fn size_hint(&self) -> (usize, Option) { - self.inner.size_hint() - } -} - -impl Source for Replayable { - fn current_span_len(&self) -> Option { - self.inner.current_span_len() - } - - fn channels(&self) -> ChannelCount { - self.inner.channels() - } - - fn sample_rate(&self) -> SampleRate { - self.inner.sample_rate() - } - - fn total_duration(&self) -> Option { - self.inner.total_duration() - } -} - -/// constant source, only works on a single span -#[derive(Debug)] -pub struct Replay { - rx: Arc, - buffer: std::vec::IntoIter, - sleep_duration: Duration, - sample_rate: SampleRate, - channel_count: ChannelCount, - source_is_active: Arc, -} - -impl Replay { - pub fn source_is_active(&self) -> bool { - // - source could return None and not drop - // - source could be dropped before returning None - self.source_is_active.load(Ordering::Relaxed) && Arc::strong_count(&self.rx) < 2 - } - - /// Duration of what is in the buffer and can be returned without blocking. - pub fn duration_ready(&self) -> Duration { - let samples_per_second = self.channels().get() as u32 * self.sample_rate().get(); - - let seconds_queued = self.samples_ready() as f64 / samples_per_second as f64; - Duration::from_secs_f64(seconds_queued) - } - - /// Number of samples in the buffer and can be returned without blocking. - pub fn samples_ready(&self) -> usize { - self.rx.len() + self.buffer.len() - } -} - -impl Iterator for Replay { - type Item = Sample; - - fn next(&mut self) -> Option { - if let Some(sample) = self.buffer.next() { - return Some(sample); - } - - loop { - if let Some(new_buffer) = self.rx.pop() { - self.buffer = new_buffer.into_iter(); - return self.buffer.next(); - } - - if !self.source_is_active() { - return None; - } - - // The queue does not support blocking on a next item. We want this queue as it - // is quite fast and provides a fixed size. We know how many samples are in a - // buffer so if we do not get one now we must be getting one after `sleep_duration`. - std::thread::sleep(self.sleep_duration); - } - } - - fn size_hint(&self) -> (usize, Option) { - ((self.rx.len() + self.buffer.len()), None) - } -} - -impl Source for Replay { - fn current_span_len(&self) -> Option { - None // source is not compatible with spans - } - - fn channels(&self) -> ChannelCount { - self.channel_count - } - - fn sample_rate(&self) -> SampleRate { - self.sample_rate - } - - fn total_duration(&self) -> Option { - None - } -} - #[cfg(test)] mod tests { use rodio::{nz, static_buffer::StaticSamplesBuffer}; use super::*; - const SAMPLES: [Sample; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; + pub const SAMPLES: [Sample; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - fn test_source() -> StaticSamplesBuffer { + pub fn test_source() -> StaticSamplesBuffer { StaticSamplesBuffer::new(nz!(1), nz!(1), &SAMPLES) } @@ -690,74 +471,4 @@ mod tests { assert_eq!(yielded, SAMPLES.len()) } } - - mod instant_replay { - use super::*; - - #[test] - fn continues_after_history() { - let input = test_source(); - - let (mut replay, mut source) = input - .replayable(Duration::from_secs(3)) - .expect("longer than 100ms"); - - source.by_ref().take(3).count(); - let yielded: Vec = replay.by_ref().take(3).collect(); - assert_eq!(&yielded, &SAMPLES[0..3],); - - source.count(); - let yielded: Vec = replay.collect(); - assert_eq!(&yielded, &SAMPLES[3..5],); - } - - #[test] - fn keeps_only_latest() { - let input = test_source(); - - let (mut replay, mut source) = input - .replayable(Duration::from_secs(2)) - .expect("longer than 100ms"); - - source.by_ref().take(5).count(); // get all items but do not end the source - let yielded: Vec = replay.by_ref().take(2).collect(); - assert_eq!(&yielded, &SAMPLES[3..5]); - source.count(); // exhaust source - assert_eq!(replay.next(), None); - } - - #[test] - fn keeps_correct_amount_of_seconds() { - let input = StaticSamplesBuffer::new(nz!(1), nz!(16_000), &[0.0; 40_000]); - - let (replay, mut source) = input - .replayable(Duration::from_secs(2)) - .expect("longer than 100ms"); - - // exhaust but do not yet end source - source.by_ref().take(40_000).count(); - - // take all samples we can without blocking - let ready = replay.samples_ready(); - let n_yielded = replay.take_samples(ready).count(); - - let max = source.sample_rate().get() * source.channels().get() as u32 * 2; - let margin = 16_000 / 10; // 100ms - assert!(n_yielded as u32 >= max - margin); - } - - #[test] - fn samples_ready() { - let input = StaticSamplesBuffer::new(nz!(1), nz!(16_000), &[0.0; 40_000]); - let (mut replay, source) = input - .replayable(Duration::from_secs(2)) - .expect("longer than 100ms"); - assert_eq!(replay.by_ref().samples_ready(), 0); - - source.take(8000).count(); // half a second - let margin = 16_000 / 10; // 100ms - let ready = replay.samples_ready(); - assert!(ready >= 8000 - margin); - } - } } diff --git a/crates/audio/src/rodio_ext/replayable.rs b/crates/audio/src/rodio_ext/replayable.rs new file mode 100644 index 0000000000000000000000000000000000000000..564cd754f7f594ddd3c36c00081e11c10745b368 --- /dev/null +++ b/crates/audio/src/rodio_ext/replayable.rs @@ -0,0 +1,308 @@ +use std::{ + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + }, + time::Duration, +}; + +use crossbeam::queue::ArrayQueue; +use rodio::{ChannelCount, Sample, SampleRate, Source}; + +#[derive(Debug, thiserror::Error)] +#[error("Replay duration is too short must be >= 100ms")] +pub struct ReplayDurationTooShort; + +pub fn replayable( + source: S, + duration: Duration, +) -> Result<(Replay, Replayable), ReplayDurationTooShort> { + if duration < Duration::from_millis(100) { + return Err(ReplayDurationTooShort); + } + + let samples_per_second = source.sample_rate().get() as usize * source.channels().get() as usize; + let samples_to_queue = duration.as_secs_f64() * samples_per_second as f64; + let samples_to_queue = + (samples_to_queue as usize).next_multiple_of(source.channels().get().into()); + + let chunk_size = + (samples_per_second.div_ceil(10)).next_multiple_of(source.channels().get() as usize); + let chunks_to_queue = samples_to_queue.div_ceil(chunk_size); + + let is_active = Arc::new(AtomicBool::new(true)); + let queue = Arc::new(ReplayQueue::new(chunks_to_queue, chunk_size)); + Ok(( + Replay { + rx: Arc::clone(&queue), + buffer: Vec::new().into_iter(), + sleep_duration: duration / 2, + sample_rate: source.sample_rate(), + channel_count: source.channels(), + source_is_active: is_active.clone(), + }, + Replayable { + tx: queue, + inner: source, + buffer: Vec::with_capacity(chunk_size), + chunk_size, + is_active, + }, + )) +} + +/// constant source, only works on a single span +#[derive(Debug)] +struct ReplayQueue { + inner: ArrayQueue>, + normal_chunk_len: usize, + /// The last chunk in the queue may be smaller than + /// the normal chunk size. This is always equal to the + /// size of the last element in the queue. + /// (so normally chunk_size) + last_chunk: Mutex>, +} + +impl ReplayQueue { + fn new(queue_len: usize, chunk_size: usize) -> Self { + Self { + inner: ArrayQueue::new(queue_len), + normal_chunk_len: chunk_size, + last_chunk: Mutex::new(Vec::new()), + } + } + /// Returns the length in samples + fn len(&self) -> usize { + self.inner.len().saturating_sub(1) * self.normal_chunk_len + + self + .last_chunk + .lock() + .expect("Self::push_last can not poison this lock") + .len() + } + + fn pop(&self) -> Option> { + self.inner.pop() // removes element that was inserted first + } + + fn push_last(&self, mut samples: Vec) { + let mut last_chunk = self + .last_chunk + .lock() + .expect("Self::len can not poison this lock"); + std::mem::swap(&mut *last_chunk, &mut samples); + } + + fn push_normal(&self, samples: Vec) { + let _pushed_out_of_ringbuf = self.inner.force_push(samples); + } +} + +/// constant source, only works on a single span +#[derive(Debug)] +pub struct Replayable { + inner: S, + buffer: Vec, + chunk_size: usize, + tx: Arc, + is_active: Arc, +} + +impl Iterator for Replayable { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.inner.next() { + self.buffer.push(sample); + // If the buffer is full send it + if self.buffer.len() == self.chunk_size { + self.tx.push_normal(std::mem::take(&mut self.buffer)); + } + Some(sample) + } else { + let last_chunk = std::mem::take(&mut self.buffer); + self.tx.push_last(last_chunk); + self.is_active.store(false, Ordering::Relaxed); + None + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl Source for Replayable { + fn current_span_len(&self) -> Option { + self.inner.current_span_len() + } + + fn channels(&self) -> ChannelCount { + self.inner.channels() + } + + fn sample_rate(&self) -> SampleRate { + self.inner.sample_rate() + } + + fn total_duration(&self) -> Option { + self.inner.total_duration() + } +} + +/// constant source, only works on a single span +#[derive(Debug)] +pub struct Replay { + rx: Arc, + buffer: std::vec::IntoIter, + sleep_duration: Duration, + sample_rate: SampleRate, + channel_count: ChannelCount, + source_is_active: Arc, +} + +impl Replay { + pub fn source_is_active(&self) -> bool { + // - source could return None and not drop + // - source could be dropped before returning None + self.source_is_active.load(Ordering::Relaxed) && Arc::strong_count(&self.rx) < 2 + } + + /// Duration of what is in the buffer and can be returned without blocking. + pub fn duration_ready(&self) -> Duration { + let samples_per_second = self.channels().get() as u32 * self.sample_rate().get(); + + let seconds_queued = self.samples_ready() as f64 / samples_per_second as f64; + Duration::from_secs_f64(seconds_queued) + } + + /// Number of samples in the buffer and can be returned without blocking. + pub fn samples_ready(&self) -> usize { + self.rx.len() + self.buffer.len() + } +} + +impl Iterator for Replay { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.buffer.next() { + return Some(sample); + } + + loop { + if let Some(new_buffer) = self.rx.pop() { + self.buffer = new_buffer.into_iter(); + return self.buffer.next(); + } + + if !self.source_is_active() { + return None; + } + + // The queue does not support blocking on a next item. We want this queue as it + // is quite fast and provides a fixed size. We know how many samples are in a + // buffer so if we do not get one now we must be getting one after `sleep_duration`. + std::thread::sleep(self.sleep_duration); + } + } + + fn size_hint(&self) -> (usize, Option) { + ((self.rx.len() + self.buffer.len()), None) + } +} + +impl Source for Replay { + fn current_span_len(&self) -> Option { + None // source is not compatible with spans + } + + fn channels(&self) -> ChannelCount { + self.channel_count + } + + fn sample_rate(&self) -> SampleRate { + self.sample_rate + } + + fn total_duration(&self) -> Option { + None + } +} + +#[cfg(test)] +mod tests { + use rodio::{nz, static_buffer::StaticSamplesBuffer}; + + use super::*; + use crate::{ + RodioExt, + rodio_ext::tests::{SAMPLES, test_source}, + }; + + #[test] + fn continues_after_history() { + let input = test_source(); + + let (mut replay, mut source) = input + .replayable(Duration::from_secs(3)) + .expect("longer than 100ms"); + + source.by_ref().take(3).count(); + let yielded: Vec = replay.by_ref().take(3).collect(); + assert_eq!(&yielded, &SAMPLES[0..3],); + + source.count(); + let yielded: Vec = replay.collect(); + assert_eq!(&yielded, &SAMPLES[3..5],); + } + + #[test] + fn keeps_only_latest() { + let input = test_source(); + + let (mut replay, mut source) = input + .replayable(Duration::from_secs(2)) + .expect("longer than 100ms"); + + source.by_ref().take(5).count(); // get all items but do not end the source + let yielded: Vec = replay.by_ref().take(2).collect(); + assert_eq!(&yielded, &SAMPLES[3..5]); + source.count(); // exhaust source + assert_eq!(replay.next(), None); + } + + #[test] + fn keeps_correct_amount_of_seconds() { + let input = StaticSamplesBuffer::new(nz!(1), nz!(16_000), &[0.0; 40_000]); + + let (replay, mut source) = input + .replayable(Duration::from_secs(2)) + .expect("longer than 100ms"); + + // exhaust but do not yet end source + source.by_ref().take(40_000).count(); + + // take all samples we can without blocking + let ready = replay.samples_ready(); + let n_yielded = replay.take_samples(ready).count(); + + let max = source.sample_rate().get() * source.channels().get() as u32 * 2; + let margin = 16_000 / 10; // 100ms + assert!(n_yielded as u32 >= max - margin); + } + + #[test] + fn samples_ready() { + let input = StaticSamplesBuffer::new(nz!(1), nz!(16_000), &[0.0; 40_000]); + let (mut replay, source) = input + .replayable(Duration::from_secs(2)) + .expect("longer than 100ms"); + assert_eq!(replay.by_ref().samples_ready(), 0); + + source.take(8000).count(); // half a second + let margin = 16_000 / 10; // 100ms + let ready = replay.samples_ready(); + assert!(ready >= 8000 - margin); + } +} diff --git a/crates/audio/src/rodio_ext/resample.rs b/crates/audio/src/rodio_ext/resample.rs new file mode 100644 index 0000000000000000000000000000000000000000..c435e011cbdd3f67469bc8e7b63bab09a201d66f --- /dev/null +++ b/crates/audio/src/rodio_ext/resample.rs @@ -0,0 +1,98 @@ +use std::time::Duration; + +use rodio::{Sample, SampleRate, Source}; +use rubato::{FftFixedInOut, Resampler}; + +pub struct FixedResampler { + input: S, + next_channel: usize, + next_frame: usize, + output_buffer: Vec>, + input_buffer: Vec>, + target_sample_rate: SampleRate, + resampler: FftFixedInOut, +} + +impl FixedResampler { + pub fn new(input: S, target_sample_rate: SampleRate) -> Self { + let chunk_size_in = + Duration::from_millis(50).as_secs_f32() * input.sample_rate().get() as f32; + let chunk_size_in = chunk_size_in.ceil() as usize; + + let resampler = FftFixedInOut::new( + input.sample_rate().get() as usize, + target_sample_rate.get() as usize, + chunk_size_in, + input.channels().get() as usize, + ) + .expect( + "sample rates are non zero, and we are not changing it so there is no resample ratio", + ); + + Self { + next_channel: 0, + next_frame: 0, + output_buffer: resampler.output_buffer_allocate(true), + input_buffer: resampler.input_buffer_allocate(false), + target_sample_rate, + resampler, + input, + } + } +} + +impl Source for FixedResampler { + fn current_span_len(&self) -> Option { + None + } + + fn channels(&self) -> rodio::ChannelCount { + self.input.channels() + } + + fn sample_rate(&self) -> rodio::SampleRate { + self.target_sample_rate + } + + fn total_duration(&self) -> Option { + self.input.total_duration() + } +} + +impl FixedResampler { + fn next_sample(&mut self) -> Option { + let sample = self.output_buffer[self.next_channel] + .get(self.next_frame) + .copied(); + self.next_channel = (self.next_channel + 1) % self.input.channels().get() as usize; + self.next_frame += 1; + + sample + } +} + +impl Iterator for FixedResampler { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.next_sample() { + return Some(sample); + } + + for input_channel in &mut self.input_buffer { + input_channel.clear(); + } + + for _ in 0..self.resampler.input_frames_next() { + for input_channel in &mut self.input_buffer { + input_channel.push(self.input.next()?); + } + } + + self.resampler + .process_into_buffer(&mut self.input_buffer, &mut self.output_buffer, None).expect("Input and output buffer channels are correct as they have been set by the resampler. The buffer for each channel is the same length. The buffer length is what is requested the resampler."); + + self.next_frame = 0; + self.next_sample() + } +}