diff --git a/Cargo.lock b/Cargo.lock index 63561664da3f951657484e880416e4ace80ce0b8..0b283601d1c182d83f5f7164c7bd545e6f80eb9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1385,14 +1385,19 @@ name = "audio" version = "0.1.0" dependencies = [ "anyhow", + "async-tar", "collections", + "crossbeam", + "futures 0.3.31", "gpui", "libwebrtc", + "log", "parking_lot", "rodio", "schemars", "serde", "settings", + "smol", "util", "workspace-hack", ] @@ -2676,7 +2681,7 @@ dependencies = [ "cap-primitives", "cap-std", "io-lifetimes", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2705,7 +2710,7 @@ dependencies = [ "maybe-owned", "rustix 1.0.7", "rustix-linux-procfs", - "windows-sys 0.52.0", + "windows-sys 0.59.0", "winx", ] @@ -4146,6 +4151,19 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -5296,7 +5314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -5683,7 +5701,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6061,7 +6079,7 @@ checksum = "94e7099f6313ecacbe1256e8ff9d617b75d1bcb16a6fddef94866d225a01a14a" dependencies = [ "io-lifetimes", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8504,7 +8522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2285ddfe3054097ef4b2fe909ef8c3bcd1ea52a8f0d274416caebeef39f04a65" dependencies = [ "io-lifetimes", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8577,7 +8595,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi 0.5.0", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8659,7 +8677,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -9400,7 +9418,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9637,7 +9655,6 @@ dependencies = [ "core-video", "coreaudio-rs 0.12.1", "cpal", - "dasp_sample", "futures 0.3.31", "gpui", "gpui_tokio", @@ -13059,7 +13076,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -13856,7 +13873,7 @@ dependencies = [ [[package]] name = "rodio" version = "0.21.1" -source = "git+https://github.com/RustAudio/rodio?branch=microphone#0e6e6436b3a97f4af72baafe11a02ade2b457b62" +source = "git+https://github.com/RustAudio/rodio?branch=better_wav_output#82514bd1f2c6cfd9a1a885019b26a8ffea75bc5c" dependencies = [ "cpal", "dasp_sample", @@ -14086,7 +14103,7 @@ dependencies = [ "itoa", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -14099,7 +14116,7 @@ dependencies = [ "errno 0.3.11", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -14221,7 +14238,7 @@ dependencies = [ "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -15535,7 +15552,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -16193,7 +16210,7 @@ dependencies = [ "fd-lock", "io-lifetimes", "rustix 0.38.44", - "windows-sys 0.52.0", + "windows-sys 0.59.0", "winx", ] @@ -16375,7 +16392,7 @@ dependencies = [ "getrandom 0.3.2", "once_cell", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -18943,7 +18960,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.48.0", ] [[package]] @@ -19602,7 +19619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d" dependencies = [ "bitflags 2.9.0", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -19936,6 +19953,7 @@ dependencies = [ "core-foundation-sys", "cranelift-codegen", "crc32fast", + "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "crypto-common", diff --git a/Cargo.toml b/Cargo.toml index 0f30712efc21cec9d9cfc46ec75e6655e43665e9..0cf695a3294ea01d7529d22a7638059f5beaeded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -276,6 +276,7 @@ context_server = { path = "crates/context_server" } copilot = { path = "crates/copilot" } crashes = { path = "crates/crashes" } credentials_provider = { path = "crates/credentials_provider" } +crossbeam = "0.8.4" dap = { path = "crates/dap" } dap_adapters = { path = "crates/dap_adapters" } db = { path = "crates/db" } @@ -367,7 +368,7 @@ remote_server = { path = "crates/remote_server" } repl = { path = "crates/repl" } reqwest_client = { path = "crates/reqwest_client" } rich_text = { path = "crates/rich_text" } -rodio = { git = "https://github.com/RustAudio/rodio", branch = "microphone"} +rodio = { git = "https://github.com/RustAudio/rodio", branch = "better_wav_output"} rope = { path = "crates/rope" } rpc = { path = "crates/rpc" } rules_library = { path = "crates/rules_library" } diff --git a/crates/audio/Cargo.toml b/crates/audio/Cargo.toml index 8826b5f49c263170c41a5354cd076c905e014943..ce5f7da0f93a4c2dd40a70f33af3ba9cefcdb970 100644 --- a/crates/audio/Cargo.toml +++ b/crates/audio/Cargo.toml @@ -14,13 +14,18 @@ doctest = false [dependencies] anyhow.workspace = true +async-tar.workspace = true collections.workspace = true +crossbeam.workspace = true gpui.workspace = true -settings.workspace = true +log.workspace = true +futures.workspace = true +parking_lot.workspace = true +rodio = { workspace = true, features = [ "wav", "playback" ] } schemars.workspace = true serde.workspace = true -parking_lot.workspace = true -rodio = { workspace = true, features = [ "wav", "playback", "tracing" ] } +settings.workspace = true +smol.workspace = true util.workspace = true workspace-hack.workspace = true diff --git a/crates/audio/src/audio.rs b/crates/audio/src/audio.rs index ee046e6fde3a791ca7bdfde138e6cacda75d0576..ae5489b6cb11169ca717c2a18591856bc04745b9 100644 --- a/crates/audio/src/audio.rs +++ b/crates/audio/src/audio.rs @@ -1,37 +1,57 @@ -use anyhow::{Context as _, Result, anyhow}; +use anyhow::{Context as _, Result}; use collections::HashMap; -use gpui::{App, BorrowAppContext, Global}; -use libwebrtc::native::apm; +use futures::channel::mpsc::UnboundedSender; +use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global}; +use libwebrtc::{native::apm, prelude::AudioFrame}; +use log::info; use parking_lot::Mutex; use rodio::{ - Decoder, OutputStream, OutputStreamBuilder, Source, cpal::Sample, mixer::Mixer, - source::Buffered, + Decoder, OutputStream, OutputStreamBuilder, Source, + cpal::Sample, + mixer::Mixer, + nz, + source::{Buffered, LimitSettings, UniformSourceIterator}, }; use settings::Settings; -use std::{io::Cursor, num::NonZero, sync::Arc}; -use util::ResultExt; +use std::{ + borrow::Cow, + io::Cursor, + num::NonZero, + path::PathBuf, + sync::{ + Arc, + mpsc::{TryRecvError, channel}, + }, + thread, + time::Duration, +}; +use util::{ResultExt, debug_panic}; mod audio_settings; +mod replays; mod rodio_ext; pub use audio_settings::AudioSettings; pub use rodio_ext::RodioExt; -// NOTE: We use WebRTC's mixer which only supports +// NOTE: We used to use WebRTC's mixer which only supported // 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up" // for audio output devices like speakers/bluetooth, we just hard-code // this; and downsample when we need to. // // Since most noise cancelling requires 16kHz we will move to -// that in the future. Same for channel count. That should be input -// channels and fixed to 1. -pub const SAMPLE_RATE: NonZero = NonZero::new(48000).expect("not zero"); -pub const CHANNEL_COUNT: NonZero = NonZero::new(2).expect("not zero"); +// that in the future. +pub const SAMPLE_RATE: NonZero = nz!(48000); +pub const CHANNEL_COUNT: NonZero = nz!(2); +const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio + (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize; + +pub const REPLAY_DURATION: Duration = Duration::from_secs(30); pub fn init(cx: &mut App) { AudioSettings::register(cx); } -#[derive(Copy, Clone, Eq, Hash, PartialEq)] +#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] pub enum Sound { Joined, Leave, @@ -61,6 +81,7 @@ pub struct Audio { output_mixer: Option, pub echo_canceller: Arc>, source_cache: HashMap>>>>, + replays: replays::Replays, } impl Default for Audio { @@ -72,6 +93,7 @@ impl Default for Audio { true, false, false, false, ))), source_cache: Default::default(), + replays: Default::default(), } } } @@ -79,16 +101,19 @@ impl Default for Audio { impl Global for Audio {} impl Audio { - fn ensure_output_exists(&mut self) -> Option<&Mixer> { + fn ensure_output_exists(&mut self) -> Result<&Mixer> { if self.output_handle.is_none() { - self.output_handle = OutputStreamBuilder::open_default_stream().log_err(); + self.output_handle = Some( + OutputStreamBuilder::open_default_stream() + .context("Could not open default output stream")?, + ); if let Some(output_handle) = &self.output_handle { let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE); + // or the mixer will end immediately as its empty. + mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)); self.output_mixer = Some(mixer); let echo_canceller = Arc::clone(&self.echo_canceller); - const BUFFER_SIZE: usize = // echo canceller wants 10ms of audio - (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize; let source = source.inspect_buffer::(move |buffer| { let mut buf: [i16; _] = buffer.map(|s| s.to_sample()); echo_canceller @@ -104,18 +129,109 @@ impl Audio { } } - self.output_mixer.as_ref() + Ok(self + .output_mixer + .as_ref() + .expect("we only get here if opening the outputstream succeeded")) + } + + pub fn save_replays( + &self, + executor: BackgroundExecutor, + ) -> gpui::Task> { + self.replays.replays_to_tar(executor) } - pub fn play_source( - source: impl rodio::Source + Send + 'static, + pub fn open_microphone( + cx: AsyncApp, + frame_tx: UnboundedSender>, + ) -> anyhow::Result<()> { + let (apm, mut replays) = cx.try_read_default_global::(|audio, _| { + (Arc::clone(&audio.echo_canceller), audio.replays.clone()) + })?; + + let (stream_error_tx, stream_error_rx) = channel(); + thread::spawn(move || { + let stream = rodio::microphone::MicrophoneBuilder::new() + .default_device()? + .default_config()? + .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))]) + .prefer_channel_counts([nz!(1), nz!(2)]) + .prefer_buffer_sizes(512..) + .open_stream()?; + info!("Opened microphone: {:?}", stream.config()); + + let stream = UniformSourceIterator::new(stream, CHANNEL_COUNT, SAMPLE_RATE) + .limit(LimitSettings::live_performance()) + .process_buffer::(|buffer| { + let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); + if let Err(e) = apm + .lock() + .process_stream( + &mut int_buffer, + SAMPLE_RATE.get() as i32, + CHANNEL_COUNT.get() as i32, + ) + .context("livekit audio processor error") + { + let _ = stream_error_tx.send(e); + } else { + for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { + *sample = (*processed).to_sample(); + } + } + }) + .automatic_gain_control(1.0, 4.0, 0.0, 5.0) + .periodic_access(Duration::from_millis(100), move |agc_source| { + agc_source.set_enabled(true); // todo dvdsk how to get settings in here? + }); + + // todo dvdsk keep the above here, move the rest back to livekit? + let (replay, mut stream) = stream.replayable(REPLAY_DURATION); + replays.add_voip_stream("local microphone".to_string(), replay); + + loop { + let sampled: Vec<_> = stream + .by_ref() + .take(BUFFER_SIZE) + .map(|s| s.to_sample()) + .collect(); + + match stream_error_rx.try_recv() { + Ok(apm_error) => return Err::<(), _>(apm_error), + Err(TryRecvError::Disconnected) => { + debug_panic!("Stream should end on its own without sending an error") + } + Err(TryRecvError::Empty) => (), + } + + frame_tx + .unbounded_send(AudioFrame { + sample_rate: SAMPLE_RATE.get(), + num_channels: CHANNEL_COUNT.get() as u32, + samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32, + data: Cow::Owned(sampled), + }) + .context("Failed to send audio frame")? + } + }); + + Ok(()) + } + + pub fn play_voip_stream( + stream_source: impl rodio::Source + Send + 'static, + stream_name: String, cx: &mut App, ) -> anyhow::Result<()> { + let (replay_source, source) = stream_source.replayable(REPLAY_DURATION); + cx.update_default_global(|this: &mut Self, _cx| { let output_mixer = this .ensure_output_exists() - .ok_or_else(|| anyhow!("Could not open audio output"))?; + .context("Could not get output mixer")?; output_mixer.add(source); + this.replays.add_voip_stream(stream_name, replay_source); Ok(()) }) } @@ -123,7 +239,10 @@ impl Audio { pub fn play_sound(sound: Sound, cx: &mut App) { cx.update_default_global(|this: &mut Self, cx| { let source = this.sound_source(sound, cx).log_err()?; - let output_mixer = this.ensure_output_exists()?; + let output_mixer = this + .ensure_output_exists() + .context("Could not get output mixer") + .log_err()?; output_mixer.add(source); Some(()) diff --git a/crates/audio/src/audio_settings.rs b/crates/audio/src/audio_settings.rs index 168519030bcbd4a422965580ddbe01121934278d..08ac40c96092b829c270f09fcea892147c4e59ef 100644 --- a/crates/audio/src/audio_settings.rs +++ b/crates/audio/src/audio_settings.rs @@ -9,6 +9,9 @@ pub struct AudioSettings { /// Opt into the new audio system. #[serde(rename = "experimental.rodio_audio", default)] pub rodio_audio: bool, // default is false + /// Opt into the new audio systems automatic gain control + #[serde(rename = "experimental.automatic_volume", default)] + pub automatic_volume: bool, } /// Configuration of audio in Zed. @@ -19,6 +22,10 @@ pub struct AudioSettingsContent { /// Whether to use the experimental audio system #[serde(rename = "experimental.rodio_audio", default)] pub rodio_audio: bool, + /// Whether the experimental audio systems should automatically + /// manage the volume of calls + #[serde(rename = "experimental.automatic_volume", default)] + pub automatic_volume: bool, } impl Settings for AudioSettings { diff --git a/crates/audio/src/replays.rs b/crates/audio/src/replays.rs new file mode 100644 index 0000000000000000000000000000000000000000..ae32696bfa9d9cafb96ed810278d442915a672da --- /dev/null +++ b/crates/audio/src/replays.rs @@ -0,0 +1,80 @@ +use anyhow::{Context, anyhow}; +use async_tar::{Builder, Header}; +use gpui::{BackgroundExecutor, Task}; + +use collections::HashMap; +use parking_lot::Mutex; +use rodio::Source; +use smol::fs::File; +use std::{io, path::PathBuf, sync::Arc, time::Duration}; + +use crate::{REPLAY_DURATION, rodio_ext::Replay}; + +#[derive(Default, Clone)] +pub(crate) struct Replays(Arc>>); + +impl Replays { + pub(crate) fn add_voip_stream(&mut self, stream_name: String, source: Replay) { + let mut map = self.0.lock(); + map.retain(|_, replay| replay.source_is_active()); + // on the old pipeline all the streams are named microphone + // make sure names dont collide in that case by adding a number. + let stream_name = stream_name + &map.len().to_string(); + map.insert(stream_name, source); + } + + pub(crate) fn replays_to_tar( + &self, + executor: BackgroundExecutor, + ) -> Task> { + let map = Arc::clone(&self.0); + executor.spawn(async move { + let recordings: Vec<_> = map + .lock() + .iter_mut() + .map(|(name, replay)| { + let queued = REPLAY_DURATION.min(replay.duration_ready()); + (name.clone(), replay.take_duration(queued).record()) + }) + .collect(); + let longest = recordings + .iter() + .map(|(_, r)| { + r.total_duration() + .expect("SamplesBuffer always returns a total duration") + }) + .max() + .ok_or(anyhow!("There is no audio to capture"))?; + + let path = std::env::current_dir() + .context("Could not get current dir")? + .join("replays.tar"); + let tar = File::create(&path) + .await + .context("Could not create file for tar")?; + + let mut tar = Builder::new(tar); + + for (name, recording) in recordings { + let mut writer = io::Cursor::new(Vec::new()); + rodio::wav_to_writer(recording, &mut writer).context("failed to encode wav")?; + let wav_data = writer.into_inner(); + let path = name.replace(' ', "_") + ".wav"; + let mut header = Header::new_gnu(); + // rw permissions for everyone + header.set_mode(0o666); + header.set_size(wav_data.len() as u64); + tar.append_data(&mut header, path, wav_data.as_slice()) + .await + .context("failed to apped wav to tar")?; + } + tar.into_inner() + .await + .context("Could not finish writing tar")? + .sync_all() + .await + .context("Could not flush tar file to disk")?; + Ok((path, longest)) + }) + } +} diff --git a/crates/audio/src/rodio_ext.rs b/crates/audio/src/rodio_ext.rs index 23e85a4163ad64a8f03e7bbea8a5cdd6f1918d8d..c23e48a5592cd97cfc19bc72f84c7f34954acafc 100644 --- a/crates/audio/src/rodio_ext.rs +++ b/crates/audio/src/rodio_ext.rs @@ -1,18 +1,28 @@ -use rodio::Source; +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use crossbeam::queue::ArrayQueue; +use rodio::{ChannelCount, Sample, SampleRate, Source}; pub trait RodioExt: Source + Sized { fn process_buffer(self, callback: F) -> ProcessBuffer where - F: FnMut(&mut [rodio::Sample; N]); + F: FnMut(&mut [Sample; N]); fn inspect_buffer(self, callback: F) -> InspectBuffer where - F: FnMut(&[rodio::Sample; N]); + F: FnMut(&[Sample; N]); + fn replayable(self, duration: Duration) -> (Replay, Replayable); } impl RodioExt for S { fn process_buffer(self, callback: F) -> ProcessBuffer where - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { ProcessBuffer { inner: self, @@ -23,7 +33,7 @@ impl RodioExt for S { } fn inspect_buffer(self, callback: F) -> InspectBuffer where - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { InspectBuffer { inner: self, @@ -32,17 +42,82 @@ impl RodioExt for S { free: 0, } } + fn replayable(self, duration: Duration) -> (Replay, Replayable) { + let samples_per_second = self.sample_rate().get() * self.channels().get() as u32; + let samples_to_queue = duration.as_secs_f64() * samples_per_second as f64; + let samples_to_queue = + (samples_to_queue as usize).next_multiple_of(self.channels().get().into()); + + let chunk_size = + samples_to_queue.min(1000usize.next_multiple_of(self.channels().get().into())); + let chunks_to_queue = samples_to_queue.div_ceil(chunk_size); + + let queue = Arc::new(ReplayQueue::new(chunks_to_queue, chunk_size)); + ( + Replay { + rx: Arc::clone(&queue), + buffer: Vec::new().into_iter(), + sleep_duration: duration / 2, + sample_rate: self.sample_rate(), + channel_count: self.channels(), + }, + Replayable { + tx: queue, + inner: self, + buffer: Vec::with_capacity(chunk_size), + chunk_size, + }, + ) + } +} + +#[derive(Debug)] +struct ReplayQueue { + inner: ArrayQueue>, + normal_chunk_len: usize, + /// The last chunk in the queue may be smaller then + /// the normal chunk size. This is always equal to the + /// size of the last element in the queue. + /// (so normally chunk_size) + last_chunk_len: AtomicUsize, +} + +impl ReplayQueue { + fn new(queue_len: usize, chunk_size: usize) -> Self { + Self { + inner: ArrayQueue::new(queue_len), + normal_chunk_len: chunk_size, + last_chunk_len: AtomicUsize::new(chunk_size), + } + } + fn len(&self) -> usize { + self.inner.len().saturating_sub(1) * self.normal_chunk_len + + self.last_chunk_len.load(Ordering::Acquire) + } + + fn pop(&self) -> Option> { + self.inner.pop() + } + + fn push_last(&self, samples: Vec) { + self.last_chunk_len.store(samples.len(), Ordering::Release); + let _pushed_out_of_ringbuf = self.inner.force_push(samples); + } + + fn push_normal(&self, samples: Vec) { + let _pushed_out_of_ringbuf = self.inner.force_push(samples); + } } pub struct ProcessBuffer where S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { inner: S, callback: F, /// Buffer used for both input and output. - buffer: [rodio::Sample; N], + buffer: [Sample; N], /// Next already processed sample is at this index /// in buffer. /// @@ -54,9 +129,9 @@ where impl Iterator for ProcessBuffer where S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { - type Item = rodio::Sample; + type Item = Sample; fn next(&mut self) -> Option { self.next += 1; @@ -78,7 +153,7 @@ where impl Source for ProcessBuffer where S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { fn current_span_len(&self) -> Option { // TODO dvdsk this should be a spanless Source @@ -101,12 +176,12 @@ where pub struct InspectBuffer where S: Source + Sized, - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { inner: S, callback: F, /// Stores already emitted samples, once its full we call the callback. - buffer: [rodio::Sample; N], + buffer: [Sample; N], /// Next free element in buffer. If this is equal to the buffer length /// we have no more free lements. free: usize, @@ -115,9 +190,9 @@ where impl Iterator for InspectBuffer where S: Source + Sized, - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { - type Item = rodio::Sample; + type Item = Sample; fn next(&mut self) -> Option { let Some(sample) = self.inner.next() else { @@ -139,7 +214,7 @@ where impl Source for InspectBuffer where S: Source + Sized, - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { fn current_span_len(&self) -> Option { // TODO dvdsk this should be a spanless Source @@ -159,21 +234,135 @@ where } } +#[derive(Debug)] +pub struct Replayable { + inner: S, + buffer: Vec, + chunk_size: usize, + tx: Arc, +} + +impl Iterator for Replayable { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.inner.next() { + self.buffer.push(sample); + if self.buffer.len() == self.chunk_size { + self.tx.push_normal(std::mem::take(&mut self.buffer)); + } + Some(sample) + } else { + let last_chunk = std::mem::take(&mut self.buffer); + self.tx.push_last(last_chunk); + None + } + } +} + +impl Source for Replayable { + fn current_span_len(&self) -> Option { + // Todo dvdsk should be spanless too + self.inner.current_span_len() + } + + fn channels(&self) -> ChannelCount { + self.inner.channels() + } + + fn sample_rate(&self) -> SampleRate { + self.inner.sample_rate() + } + + fn total_duration(&self) -> Option { + self.inner.total_duration() + } +} + +#[derive(Debug)] +pub struct Replay { + rx: Arc, + buffer: std::vec::IntoIter, + sleep_duration: Duration, + sample_rate: SampleRate, + channel_count: ChannelCount, +} + +impl Replay { + pub fn source_is_active(&self) -> bool { + Arc::strong_count(&self.rx) == 2 + } + + /// Returns duration of what is in the buffer and + /// can be returned without blocking. + pub fn duration_ready(&self) -> Duration { + let samples_per_second = self.channels().get() as u32 * self.sample_rate().get(); + let samples_queued = self.rx.len() + self.buffer.len(); + + let seconds_queued = samples_queued as f64 / samples_per_second as f64; + Duration::from_secs_f64(seconds_queued) + } +} + +impl Iterator for Replay { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.buffer.next() { + return Some(sample); + } + + loop { + if let Some(new_buffer) = self.rx.pop() { + self.buffer = new_buffer.into_iter(); + return self.buffer.next(); + } + + if !self.source_is_active() { + return None; + } + + std::thread::sleep(self.sleep_duration); + } + } +} + +impl Source for Replay { + fn current_span_len(&self) -> Option { + None // source is not compatible with spans + } + + fn channels(&self) -> ChannelCount { + self.channel_count + } + + fn sample_rate(&self) -> SampleRate { + self.sample_rate + } + + fn total_duration(&self) -> Option { + None + } +} + #[cfg(test)] mod tests { - use rodio::static_buffer::StaticSamplesBuffer; + use rodio::{nz, static_buffer::StaticSamplesBuffer}; use super::*; - #[cfg(test)] + const SAMPLES: [Sample; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; + + fn test_source() -> StaticSamplesBuffer { + StaticSamplesBuffer::new(nz!(1), nz!(1), &SAMPLES) + } + mod process_buffer { use super::*; #[test] fn callback_gets_all_samples() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let _ = input .process_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES)) @@ -181,9 +370,7 @@ mod tests { } #[test] fn callback_modifies_yielded() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let yielded: Vec<_> = input .process_buffer::<{ SAMPLES.len() }, _>(|buffer| { @@ -199,9 +386,7 @@ mod tests { } #[test] fn source_truncates_to_whole_buffers() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let yielded = input .process_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3])) @@ -210,15 +395,12 @@ mod tests { } } - #[cfg(test)] mod inspect_buffer { use super::*; #[test] fn callback_gets_all_samples() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let _ = input .inspect_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES)) @@ -226,9 +408,7 @@ mod tests { } #[test] fn source_does_not_truncate() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let yielded = input .inspect_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3])) @@ -236,4 +416,54 @@ mod tests { assert_eq!(yielded, SAMPLES.len()) } } + + mod instant_replay { + use super::*; + + #[test] + fn continues_after_history() { + let input = test_source(); + + let (mut replay, mut source) = input.replayable(Duration::from_secs(3)); + + source.by_ref().take(3).count(); + let yielded: Vec = replay.by_ref().take(3).collect(); + assert_eq!(&yielded, &SAMPLES[0..3],); + + source.count(); + let yielded: Vec = replay.collect(); + assert_eq!(&yielded, &SAMPLES[3..5],); + } + + #[test] + fn keeps_only_latest() { + let input = test_source(); + + let (mut replay, mut source) = input.replayable(Duration::from_secs(2)); + + source.by_ref().take(5).count(); // get all items but do not end the source + let yielded: Vec = replay.by_ref().take(2).collect(); + // Note we do not get the last element, it has not been send yet + // due to buffering. + assert_eq!(&yielded, &SAMPLES[2..4]); + + source.count(); // exhaust source + let yielded: Vec = replay.collect(); + assert_eq!(&yielded, &[SAMPLES[4]]); + } + + #[test] + fn keeps_correct_amount_of_seconds() { + let input = StaticSamplesBuffer::new(nz!(16_000), nz!(1), &[0.0; 40_000]); + + let (replay, mut source) = input.replayable(Duration::from_secs(2)); + + source.by_ref().count(); + let n_yielded = replay.count(); + assert_eq!( + n_yielded as u32, + source.sample_rate().get() * source.channels().get() as u32 * 2 + ); + } + } } diff --git a/crates/call/src/call_impl/room.rs b/crates/call/src/call_impl/room.rs index c31a458c64124c266c56a7004746d7b6a0a4adc6..cb4802ce2699051916e6448f62ad7bc664755a6d 100644 --- a/crates/call/src/call_impl/room.rs +++ b/crates/call/src/call_impl/room.rs @@ -1322,8 +1322,16 @@ impl Room { return Task::ready(Err(anyhow!("live-kit was not initialized"))); }; + let user_name = self + .user_store + .read(cx) + .current_user() + .map(|user| user.name.clone()) + .flatten() + .unwrap_or_else(|| "unknown".to_string()); + cx.spawn(async move |this, cx| { - let publication = room.publish_local_microphone_track(cx).await; + let publication = room.publish_local_microphone_track(&user_name, cx).await; this.update(cx, |this, cx| { let live_kit = this .live_kit diff --git a/crates/livekit_client/Cargo.toml b/crates/livekit_client/Cargo.toml index 467c2f7437e7c4b92776020dc42deca47e0f61d8..921918fdac076caa10828045f69d6a2e17092446 100644 --- a/crates/livekit_client/Cargo.toml +++ b/crates/livekit_client/Cargo.toml @@ -42,7 +42,6 @@ util.workspace = true workspace-hack.workspace = true rodio = { workspace = true, features = ["wav_output", "recording"] } -dasp_sample = "0.11" [target.'cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))'.dependencies] libwebrtc = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks" } diff --git a/crates/livekit_client/examples/test_app.rs b/crates/livekit_client/examples/test_app.rs index 75806429905e6bcbfcc17f25a29007f18e78757b..58aa80ead462cc57e91b90ccde2139e235500e55 100644 --- a/crates/livekit_client/examples/test_app.rs +++ b/crates/livekit_client/examples/test_app.rs @@ -255,7 +255,10 @@ impl LivekitWindow { } else { let room = self.room.clone(); cx.spawn_in(window, async move |this, cx| { - let (publication, stream) = room.publish_local_microphone_track(cx).await.unwrap(); + let (publication, stream) = room + .publish_local_microphone_track("test_user", cx) + .await + .unwrap(); this.update(cx, |this, cx| { this.microphone_track = Some(publication); this.microphone_stream = Some(stream); diff --git a/crates/livekit_client/src/livekit_client.rs b/crates/livekit_client/src/livekit_client.rs index 50035d6eb5a9f11d22d44b223d52c1112f3b28d0..b8ecc6f771ef1840d3a34c61178e1fa463715888 100644 --- a/crates/livekit_client/src/livekit_client.rs +++ b/crates/livekit_client/src/livekit_client.rs @@ -97,9 +97,12 @@ impl Room { pub async fn publish_local_microphone_track( &self, + user_name: &str, cx: &mut AsyncApp, ) -> Result<(LocalTrackPublication, playback::AudioStream)> { - let (track, stream) = self.playback.capture_local_microphone_track(&cx)?; + let (track, stream) = self + .playback + .capture_local_microphone_track(user_name, &cx)?; let publication = self .local_participant() .publish_track( diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index f28f363d48b1719212c703513fe7c6c332ee9f91..8b1105e16bd5daf2630ea964b98a0e13db9c128a 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -1,9 +1,7 @@ use anyhow::{Context as _, Result}; use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE}; -use cpal::Sample; use cpal::traits::{DeviceTrait, StreamTrait as _}; -use dasp_sample::ToSample; use futures::channel::mpsc::UnboundedSender; use futures::{Stream, StreamExt as _}; use gpui::{ @@ -24,16 +22,13 @@ use livekit::webrtc::{ use log::info; use parking_lot::Mutex; use rodio::Source; -use rodio::source::{LimitSettings, UniformSourceIterator}; use settings::Settings; use std::cell::RefCell; -use std::num::NonZero; use std::sync::Weak; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; -use std::sync::mpsc::{TryRecvError, channel}; use std::time::Duration; use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread}; -use util::{ResultExt as _, debug_panic, maybe}; +use util::{ResultExt as _, maybe}; mod source; @@ -51,14 +46,16 @@ pub(crate) fn play_remote_audio_track( ) -> Result { let stop_handle = Arc::new(AtomicBool::new(false)); let stop_handle_clone = stop_handle.clone(); - let stream = source::LiveKitStream::new(cx.background_executor(), track) + let stream = source::LiveKitStream::new(cx.background_executor(), track); + + let stream = stream .stoppable() .periodic_access(Duration::from_millis(50), move |s| { if stop_handle.load(Ordering::Relaxed) { s.stop(); } }); - audio::Audio::play_source(stream, cx).context("Could not play audio")?; + audio::Audio::play_voip_stream(stream, track.name(), cx).context("Could not play audio")?; let on_drop = util::defer(move || { stop_handle_clone.store(true, Ordering::Relaxed); @@ -144,6 +141,7 @@ impl AudioStack { pub(crate) fn capture_local_microphone_track( &self, + user_name: &str, cx: &AsyncApp, ) -> Result<(crate::LocalAudioTrack, AudioStream)> { let source = NativeAudioSource::new( @@ -155,7 +153,7 @@ impl AudioStack { ); let track = track::LocalAudioTrack::create_audio_track( - "microphone", + user_name, RtcAudioSource::Native(source.clone()), ); @@ -172,13 +170,10 @@ impl AudioStack { let rodio_pipeline = AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default(); let capture_task = if rodio_pipeline { - let apm = cx - .try_read_global::(|audio, _| Arc::clone(&audio.echo_canceller)) - .unwrap(); // TODO fixme - self.executor.spawn(async move { - info!("Using experimental.rodio_audio audio pipeline"); - Self::capture_input_rodio(apm, frame_tx).await - }) + // TODO global might not yet have been initialized + info!("Using experimental.rodio_audio audio pipeline"); + audio::Audio::open_microphone(cx.clone(), frame_tx)?; + Task::ready(Ok(())) } else { self.executor.spawn(async move { Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into()) @@ -270,84 +265,6 @@ impl AudioStack { } } - async fn capture_input_rodio( - apm: Arc>, - frame_tx: UnboundedSender>, - ) -> Result<()> { - use audio::RodioExt; - const LIVEKIT_BUFFER_SIZE: usize = - (audio::SAMPLE_RATE.get() as usize / 100) * audio::CHANNEL_COUNT.get() as usize; - - let (stream_error_tx, stream_error_rx) = channel(); - - thread::spawn(move || { - let stream = rodio::microphone::MicrophoneBuilder::new() - .default_device()? - .default_config()? - .prefer_sample_rates([ - SAMPLE_RATE, - SAMPLE_RATE.saturating_mul(NonZero::new(2).expect("not zero")), - ]) - .prefer_channel_counts([ - NonZero::new(1).expect("not zero"), - NonZero::new(2).expect("not zero"), - ]) - .prefer_buffer_sizes(512..) - .open_stream()?; - info!("Opened microphone: {:?}", stream.config()); - let mut stream = - UniformSourceIterator::new(stream, audio::CHANNEL_COUNT, audio::SAMPLE_RATE) - .limit(LimitSettings::live_performance()) - .process_buffer::(|buffer| { - let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); - if let Err(e) = apm - .lock() - .process_stream( - &mut int_buffer, - audio::SAMPLE_RATE.get() as i32, - audio::CHANNEL_COUNT.get() as i32, - ) - .context("livekit audio processor error") - { - let _ = stream_error_tx.send(e); - } else { - for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { - *sample = (*processed).to_sample_(); - } - } - }) - .automatic_gain_control(1.0, 4.0, 0.0, 5.0); - - loop { - let sampled: Vec<_> = stream - .by_ref() - .take(LIVEKIT_BUFFER_SIZE) - .map(|s| s.to_sample()) - .collect(); - - match stream_error_rx.try_recv() { - Ok(apm_error) => return Err::<(), _>(apm_error), - Err(TryRecvError::Disconnected) => { - debug_panic!("Stream should end on its own without sending an error") - } - Err(TryRecvError::Empty) => (), - } - - frame_tx - .unbounded_send(AudioFrame { - sample_rate: SAMPLE_RATE.get(), - num_channels: audio::CHANNEL_COUNT.get() as u32, - samples_per_channel: sampled.len() as u32 - / audio::CHANNEL_COUNT.get() as u32, - data: Cow::Owned(sampled), - }) - .context("Failed to send audio frame")? - } - }); - - Ok(()) - } - async fn capture_input( apm: Arc>, frame_tx: UnboundedSender>, diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index 51ceb081d78ecd47f898e025ee8e0cb8235e2f13..67bfe793902da94a114ca617ce5bfa33c68d02e7 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -3,16 +3,18 @@ use std::num::NonZero; use futures::StreamExt; use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame}; use livekit::track::RemoteAudioTrack; -use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter}; +use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, nz}; -use crate::livekit_client::playback::{CHANNEL_COUNT, SAMPLE_RATE}; +use audio::{CHANNEL_COUNT, SAMPLE_RATE}; fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer { let samples = frame.data.iter().copied(); let samples = SampleTypeConverter::<_, _>::new(samples); let samples: Vec = samples.collect(); SamplesBuffer::new( - NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"), + // here be dragons + // NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"), + nz!(2), NonZero::new(frame.sample_rate).expect("audio frame sample rate is nonzero"), samples, ) @@ -63,11 +65,17 @@ impl Source for LiveKitStream { } fn channels(&self) -> rodio::ChannelCount { - self.inner.channels() + // This must be hardcoded because the playback source assumes constant + // sample rate and channel count. The queue upon which this is build + // will however report different counts and rates. Even though we put in + // only items with our (constant) CHANNEL_COUNT & SAMPLE_RATE this will + // play silence on one channel and at 44100 which is not what our + // constants are. + CHANNEL_COUNT } fn sample_rate(&self) -> rodio::SampleRate { - self.inner.sample_rate() + SAMPLE_RATE // see comment on channels } fn total_duration(&self) -> Option { diff --git a/crates/livekit_client/src/record.rs b/crates/livekit_client/src/record.rs index ad9d9435d9d029b7725b8858eb569d0794512dea..24e260e71665704c1010d07e082a03fbe6306a30 100644 --- a/crates/livekit_client/src/record.rs +++ b/crates/livekit_client/src/record.rs @@ -89,7 +89,7 @@ fn write_out( NonZero::new(config.sample_rate().0).expect("config sample_rate is never zero"), samples, ); - match rodio::output_to_wav(&mut samples, path) { + match rodio::wav_to_file(&mut samples, path) { Ok(_) => Ok(()), Err(e) => Err(anyhow::anyhow!("Failed to write wav file: {}", e)), } diff --git a/crates/livekit_client/src/test.rs b/crates/livekit_client/src/test.rs index 873e0222d013c20c5f4aff2a263b925c7d21aff6..612b032895b42afcfdc09e8d24023fbd1fcdf5c1 100644 --- a/crates/livekit_client/src/test.rs +++ b/crates/livekit_client/src/test.rs @@ -728,6 +728,7 @@ impl Room { pub async fn publish_local_microphone_track( &self, + _track_name: &str, cx: &mut AsyncApp, ) -> Result<(LocalTrackPublication, AudioStream)> { self.local_participant().publish_microphone_track(cx).await diff --git a/crates/zed/src/zed.rs b/crates/zed/src/zed.rs index cb5cfea81a06fd27cb19d7f2e2cd845e9236ae2c..ea2106593eb1e0788b3b8a46f7ae8c32ac75c1e4 100644 --- a/crates/zed/src/zed.rs +++ b/crates/zed/src/zed.rs @@ -13,6 +13,7 @@ use agent_ui::{AgentDiffToolbar, AgentPanelDelegate}; use anyhow::Context as _; pub use app_menus::*; use assets::Assets; +use audio::{AudioSettings, REPLAY_DURATION}; use breadcrumbs::Breadcrumbs; use client::zed_urls; use collections::VecDeque; @@ -128,7 +129,10 @@ actions!( dev, [ /// Record 10s of audio from your current microphone - CaptureAudio + CaptureAudio, + /// Stores last 30s of audio from everyone on the current call + /// in a tar file in the current working directory. + CaptureRecentAudio, ] ); @@ -925,6 +929,9 @@ fn register_actions( }) .register_action(|workspace, _: &CaptureAudio, window, cx| { capture_audio(workspace, window, cx); + }) + .register_action(|workspace, _: &CaptureRecentAudio, window, cx| { + capture_recent_audio(workspace, window, cx); }); if workspace.project().read(cx).is_via_remote_server() { @@ -1937,6 +1944,85 @@ fn capture_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context) { + struct CaptureRecentAudioNotification { + focus_handle: gpui::FocusHandle, + save_result: Option>, + _save_task: Task>, + } + + impl gpui::EventEmitter for CaptureRecentAudioNotification {} + impl gpui::EventEmitter for CaptureRecentAudioNotification {} + impl gpui::Focusable for CaptureRecentAudioNotification { + fn focus_handle(&self, _cx: &App) -> gpui::FocusHandle { + self.focus_handle.clone() + } + } + impl workspace::notifications::Notification for CaptureRecentAudioNotification {} + + impl Render for CaptureRecentAudioNotification { + fn render(&mut self, _window: &mut Window, cx: &mut Context) -> impl IntoElement { + let message = match &self.save_result { + None => format!( + "Saving up to {} seconds of recent audio", + REPLAY_DURATION.as_secs(), + ), + Some(Ok((path, duration))) => format!( + "Saved {} seconds of all audio to {}", + duration.as_secs(), + path.display(), + ), + Some(Err(e)) => format!("Error saving audio replays: {e:?}"), + }; + + NotificationFrame::new() + .with_title(Some("Saved Audio")) + .show_suppress_button(false) + .on_close(cx.listener(|_, _, _, cx| { + cx.emit(DismissEvent); + })) + .with_content(message) + } + } + + impl CaptureRecentAudioNotification { + fn new(cx: &mut Context) -> Self { + if AudioSettings::get_global(cx).rodio_audio { + let executor = cx.background_executor().clone(); + let save_task = cx.default_global::().save_replays(executor); + let _save_task = cx.spawn(async move |this, cx| { + let res = save_task.await; + this.update(cx, |this, cx| { + this.save_result = Some(res); + cx.notify(); + }) + }); + + Self { + focus_handle: cx.focus_handle(), + _save_task, + save_result: None, + } + } else { + Self { + focus_handle: cx.focus_handle(), + _save_task: Task::ready(Ok(())), + save_result: Some(Err(anyhow::anyhow!( + "Capturing recent audio is only supported on the experimental rodio audio pipeline" + ))), + } + } + } + } + + workspace.show_notification( + NotificationId::unique::(), + cx, + |cx| cx.new(CaptureRecentAudioNotification::new), + ); +} + #[cfg(test)] mod tests { use super::*;