From e874f979cea257b5d6bb6f9cd16e96602cdb5c6b Mon Sep 17 00:00:00 2001 From: David Kleingeld Date: Mon, 1 Sep 2025 18:15:05 +0200 Subject: [PATCH] Adds audio replay system, replay action and moves microphone code to audio crate The replay system stores the last 30 seconds of audio for every call source and the users microphone. Using the `CaptureRecentAudio` action all those tracks gets stored in a tar as separate .wav files. We show notification when starting the save and when its done. With the replay system the microphone code got more tightly bound to the audio crate than the livekit_client crate, so it was moved to audio. --- Cargo.lock | 58 ++-- Cargo.toml | 3 +- crates/audio/Cargo.toml | 11 +- crates/audio/src/audio.rs | 163 ++++++++-- crates/audio/src/audio_settings.rs | 7 + crates/audio/src/replays.rs | 80 +++++ crates/audio/src/rodio_ext.rs | 296 ++++++++++++++++-- crates/call/src/call_impl/room.rs | 10 +- crates/livekit_client/Cargo.toml | 1 - crates/livekit_client/examples/test_app.rs | 5 +- crates/livekit_client/src/livekit_client.rs | 5 +- .../src/livekit_client/playback.rs | 105 +------ .../src/livekit_client/playback/source.rs | 18 +- crates/livekit_client/src/record.rs | 2 +- crates/livekit_client/src/test.rs | 1 + crates/zed/src/zed.rs | 88 +++++- 16 files changed, 669 insertions(+), 184 deletions(-) create mode 100644 crates/audio/src/replays.rs diff --git a/Cargo.lock b/Cargo.lock index 63561664da3f951657484e880416e4ace80ce0b8..0b283601d1c182d83f5f7164c7bd545e6f80eb9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1385,14 +1385,19 @@ name = "audio" version = "0.1.0" dependencies = [ "anyhow", + "async-tar", "collections", + "crossbeam", + "futures 0.3.31", "gpui", "libwebrtc", + "log", "parking_lot", "rodio", "schemars", "serde", "settings", + "smol", "util", "workspace-hack", ] @@ -2676,7 +2681,7 @@ dependencies = [ "cap-primitives", "cap-std", "io-lifetimes", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2705,7 +2710,7 @@ dependencies = [ "maybe-owned", "rustix 1.0.7", "rustix-linux-procfs", - "windows-sys 0.52.0", + "windows-sys 0.59.0", "winx", ] @@ -4146,6 +4151,19 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -5296,7 +5314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -5683,7 +5701,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" dependencies = [ "cfg-if", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6061,7 +6079,7 @@ checksum = "94e7099f6313ecacbe1256e8ff9d617b75d1bcb16a6fddef94866d225a01a14a" dependencies = [ "io-lifetimes", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8504,7 +8522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2285ddfe3054097ef4b2fe909ef8c3bcd1ea52a8f0d274416caebeef39f04a65" dependencies = [ "io-lifetimes", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8577,7 +8595,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi 0.5.0", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -8659,7 +8677,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -9400,7 +9418,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -9637,7 +9655,6 @@ dependencies = [ "core-video", "coreaudio-rs 0.12.1", "cpal", - "dasp_sample", "futures 0.3.31", "gpui", "gpui_tokio", @@ -13059,7 +13076,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -13856,7 +13873,7 @@ dependencies = [ [[package]] name = "rodio" version = "0.21.1" -source = "git+https://github.com/RustAudio/rodio?branch=microphone#0e6e6436b3a97f4af72baafe11a02ade2b457b62" +source = "git+https://github.com/RustAudio/rodio?branch=better_wav_output#82514bd1f2c6cfd9a1a885019b26a8ffea75bc5c" dependencies = [ "cpal", "dasp_sample", @@ -14086,7 +14103,7 @@ dependencies = [ "itoa", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -14099,7 +14116,7 @@ dependencies = [ "errno 0.3.11", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -14221,7 +14238,7 @@ dependencies = [ "security-framework 3.2.0", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -15535,7 +15552,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -16193,7 +16210,7 @@ dependencies = [ "fd-lock", "io-lifetimes", "rustix 0.38.44", - "windows-sys 0.52.0", + "windows-sys 0.59.0", "winx", ] @@ -16375,7 +16392,7 @@ dependencies = [ "getrandom 0.3.2", "once_cell", "rustix 1.0.7", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -18943,7 +18960,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.48.0", ] [[package]] @@ -19602,7 +19619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d" dependencies = [ "bitflags 2.9.0", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -19936,6 +19953,7 @@ dependencies = [ "core-foundation-sys", "cranelift-codegen", "crc32fast", + "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", "crypto-common", diff --git a/Cargo.toml b/Cargo.toml index 0f30712efc21cec9d9cfc46ec75e6655e43665e9..0cf695a3294ea01d7529d22a7638059f5beaeded 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -276,6 +276,7 @@ context_server = { path = "crates/context_server" } copilot = { path = "crates/copilot" } crashes = { path = "crates/crashes" } credentials_provider = { path = "crates/credentials_provider" } +crossbeam = "0.8.4" dap = { path = "crates/dap" } dap_adapters = { path = "crates/dap_adapters" } db = { path = "crates/db" } @@ -367,7 +368,7 @@ remote_server = { path = "crates/remote_server" } repl = { path = "crates/repl" } reqwest_client = { path = "crates/reqwest_client" } rich_text = { path = "crates/rich_text" } -rodio = { git = "https://github.com/RustAudio/rodio", branch = "microphone"} +rodio = { git = "https://github.com/RustAudio/rodio", branch = "better_wav_output"} rope = { path = "crates/rope" } rpc = { path = "crates/rpc" } rules_library = { path = "crates/rules_library" } diff --git a/crates/audio/Cargo.toml b/crates/audio/Cargo.toml index 8826b5f49c263170c41a5354cd076c905e014943..ce5f7da0f93a4c2dd40a70f33af3ba9cefcdb970 100644 --- a/crates/audio/Cargo.toml +++ b/crates/audio/Cargo.toml @@ -14,13 +14,18 @@ doctest = false [dependencies] anyhow.workspace = true +async-tar.workspace = true collections.workspace = true +crossbeam.workspace = true gpui.workspace = true -settings.workspace = true +log.workspace = true +futures.workspace = true +parking_lot.workspace = true +rodio = { workspace = true, features = [ "wav", "playback" ] } schemars.workspace = true serde.workspace = true -parking_lot.workspace = true -rodio = { workspace = true, features = [ "wav", "playback", "tracing" ] } +settings.workspace = true +smol.workspace = true util.workspace = true workspace-hack.workspace = true diff --git a/crates/audio/src/audio.rs b/crates/audio/src/audio.rs index ee046e6fde3a791ca7bdfde138e6cacda75d0576..ae5489b6cb11169ca717c2a18591856bc04745b9 100644 --- a/crates/audio/src/audio.rs +++ b/crates/audio/src/audio.rs @@ -1,37 +1,57 @@ -use anyhow::{Context as _, Result, anyhow}; +use anyhow::{Context as _, Result}; use collections::HashMap; -use gpui::{App, BorrowAppContext, Global}; -use libwebrtc::native::apm; +use futures::channel::mpsc::UnboundedSender; +use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global}; +use libwebrtc::{native::apm, prelude::AudioFrame}; +use log::info; use parking_lot::Mutex; use rodio::{ - Decoder, OutputStream, OutputStreamBuilder, Source, cpal::Sample, mixer::Mixer, - source::Buffered, + Decoder, OutputStream, OutputStreamBuilder, Source, + cpal::Sample, + mixer::Mixer, + nz, + source::{Buffered, LimitSettings, UniformSourceIterator}, }; use settings::Settings; -use std::{io::Cursor, num::NonZero, sync::Arc}; -use util::ResultExt; +use std::{ + borrow::Cow, + io::Cursor, + num::NonZero, + path::PathBuf, + sync::{ + Arc, + mpsc::{TryRecvError, channel}, + }, + thread, + time::Duration, +}; +use util::{ResultExt, debug_panic}; mod audio_settings; +mod replays; mod rodio_ext; pub use audio_settings::AudioSettings; pub use rodio_ext::RodioExt; -// NOTE: We use WebRTC's mixer which only supports +// NOTE: We used to use WebRTC's mixer which only supported // 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up" // for audio output devices like speakers/bluetooth, we just hard-code // this; and downsample when we need to. // // Since most noise cancelling requires 16kHz we will move to -// that in the future. Same for channel count. That should be input -// channels and fixed to 1. -pub const SAMPLE_RATE: NonZero = NonZero::new(48000).expect("not zero"); -pub const CHANNEL_COUNT: NonZero = NonZero::new(2).expect("not zero"); +// that in the future. +pub const SAMPLE_RATE: NonZero = nz!(48000); +pub const CHANNEL_COUNT: NonZero = nz!(2); +const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio + (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize; + +pub const REPLAY_DURATION: Duration = Duration::from_secs(30); pub fn init(cx: &mut App) { AudioSettings::register(cx); } -#[derive(Copy, Clone, Eq, Hash, PartialEq)] +#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] pub enum Sound { Joined, Leave, @@ -61,6 +81,7 @@ pub struct Audio { output_mixer: Option, pub echo_canceller: Arc>, source_cache: HashMap>>>>, + replays: replays::Replays, } impl Default for Audio { @@ -72,6 +93,7 @@ impl Default for Audio { true, false, false, false, ))), source_cache: Default::default(), + replays: Default::default(), } } } @@ -79,16 +101,19 @@ impl Default for Audio { impl Global for Audio {} impl Audio { - fn ensure_output_exists(&mut self) -> Option<&Mixer> { + fn ensure_output_exists(&mut self) -> Result<&Mixer> { if self.output_handle.is_none() { - self.output_handle = OutputStreamBuilder::open_default_stream().log_err(); + self.output_handle = Some( + OutputStreamBuilder::open_default_stream() + .context("Could not open default output stream")?, + ); if let Some(output_handle) = &self.output_handle { let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE); + // or the mixer will end immediately as its empty. + mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE)); self.output_mixer = Some(mixer); let echo_canceller = Arc::clone(&self.echo_canceller); - const BUFFER_SIZE: usize = // echo canceller wants 10ms of audio - (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize; let source = source.inspect_buffer::(move |buffer| { let mut buf: [i16; _] = buffer.map(|s| s.to_sample()); echo_canceller @@ -104,18 +129,109 @@ impl Audio { } } - self.output_mixer.as_ref() + Ok(self + .output_mixer + .as_ref() + .expect("we only get here if opening the outputstream succeeded")) + } + + pub fn save_replays( + &self, + executor: BackgroundExecutor, + ) -> gpui::Task> { + self.replays.replays_to_tar(executor) } - pub fn play_source( - source: impl rodio::Source + Send + 'static, + pub fn open_microphone( + cx: AsyncApp, + frame_tx: UnboundedSender>, + ) -> anyhow::Result<()> { + let (apm, mut replays) = cx.try_read_default_global::(|audio, _| { + (Arc::clone(&audio.echo_canceller), audio.replays.clone()) + })?; + + let (stream_error_tx, stream_error_rx) = channel(); + thread::spawn(move || { + let stream = rodio::microphone::MicrophoneBuilder::new() + .default_device()? + .default_config()? + .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))]) + .prefer_channel_counts([nz!(1), nz!(2)]) + .prefer_buffer_sizes(512..) + .open_stream()?; + info!("Opened microphone: {:?}", stream.config()); + + let stream = UniformSourceIterator::new(stream, CHANNEL_COUNT, SAMPLE_RATE) + .limit(LimitSettings::live_performance()) + .process_buffer::(|buffer| { + let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); + if let Err(e) = apm + .lock() + .process_stream( + &mut int_buffer, + SAMPLE_RATE.get() as i32, + CHANNEL_COUNT.get() as i32, + ) + .context("livekit audio processor error") + { + let _ = stream_error_tx.send(e); + } else { + for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { + *sample = (*processed).to_sample(); + } + } + }) + .automatic_gain_control(1.0, 4.0, 0.0, 5.0) + .periodic_access(Duration::from_millis(100), move |agc_source| { + agc_source.set_enabled(true); // todo dvdsk how to get settings in here? + }); + + // todo dvdsk keep the above here, move the rest back to livekit? + let (replay, mut stream) = stream.replayable(REPLAY_DURATION); + replays.add_voip_stream("local microphone".to_string(), replay); + + loop { + let sampled: Vec<_> = stream + .by_ref() + .take(BUFFER_SIZE) + .map(|s| s.to_sample()) + .collect(); + + match stream_error_rx.try_recv() { + Ok(apm_error) => return Err::<(), _>(apm_error), + Err(TryRecvError::Disconnected) => { + debug_panic!("Stream should end on its own without sending an error") + } + Err(TryRecvError::Empty) => (), + } + + frame_tx + .unbounded_send(AudioFrame { + sample_rate: SAMPLE_RATE.get(), + num_channels: CHANNEL_COUNT.get() as u32, + samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32, + data: Cow::Owned(sampled), + }) + .context("Failed to send audio frame")? + } + }); + + Ok(()) + } + + pub fn play_voip_stream( + stream_source: impl rodio::Source + Send + 'static, + stream_name: String, cx: &mut App, ) -> anyhow::Result<()> { + let (replay_source, source) = stream_source.replayable(REPLAY_DURATION); + cx.update_default_global(|this: &mut Self, _cx| { let output_mixer = this .ensure_output_exists() - .ok_or_else(|| anyhow!("Could not open audio output"))?; + .context("Could not get output mixer")?; output_mixer.add(source); + this.replays.add_voip_stream(stream_name, replay_source); Ok(()) }) } @@ -123,7 +239,10 @@ impl Audio { pub fn play_sound(sound: Sound, cx: &mut App) { cx.update_default_global(|this: &mut Self, cx| { let source = this.sound_source(sound, cx).log_err()?; - let output_mixer = this.ensure_output_exists()?; + let output_mixer = this + .ensure_output_exists() + .context("Could not get output mixer") + .log_err()?; output_mixer.add(source); Some(()) diff --git a/crates/audio/src/audio_settings.rs b/crates/audio/src/audio_settings.rs index 168519030bcbd4a422965580ddbe01121934278d..08ac40c96092b829c270f09fcea892147c4e59ef 100644 --- a/crates/audio/src/audio_settings.rs +++ b/crates/audio/src/audio_settings.rs @@ -9,6 +9,9 @@ pub struct AudioSettings { /// Opt into the new audio system. #[serde(rename = "experimental.rodio_audio", default)] pub rodio_audio: bool, // default is false + /// Opt into the new audio systems automatic gain control + #[serde(rename = "experimental.automatic_volume", default)] + pub automatic_volume: bool, } /// Configuration of audio in Zed. @@ -19,6 +22,10 @@ pub struct AudioSettingsContent { /// Whether to use the experimental audio system #[serde(rename = "experimental.rodio_audio", default)] pub rodio_audio: bool, + /// Whether the experimental audio systems should automatically + /// manage the volume of calls + #[serde(rename = "experimental.automatic_volume", default)] + pub automatic_volume: bool, } impl Settings for AudioSettings { diff --git a/crates/audio/src/replays.rs b/crates/audio/src/replays.rs new file mode 100644 index 0000000000000000000000000000000000000000..ae32696bfa9d9cafb96ed810278d442915a672da --- /dev/null +++ b/crates/audio/src/replays.rs @@ -0,0 +1,80 @@ +use anyhow::{Context, anyhow}; +use async_tar::{Builder, Header}; +use gpui::{BackgroundExecutor, Task}; + +use collections::HashMap; +use parking_lot::Mutex; +use rodio::Source; +use smol::fs::File; +use std::{io, path::PathBuf, sync::Arc, time::Duration}; + +use crate::{REPLAY_DURATION, rodio_ext::Replay}; + +#[derive(Default, Clone)] +pub(crate) struct Replays(Arc>>); + +impl Replays { + pub(crate) fn add_voip_stream(&mut self, stream_name: String, source: Replay) { + let mut map = self.0.lock(); + map.retain(|_, replay| replay.source_is_active()); + // on the old pipeline all the streams are named microphone + // make sure names dont collide in that case by adding a number. + let stream_name = stream_name + &map.len().to_string(); + map.insert(stream_name, source); + } + + pub(crate) fn replays_to_tar( + &self, + executor: BackgroundExecutor, + ) -> Task> { + let map = Arc::clone(&self.0); + executor.spawn(async move { + let recordings: Vec<_> = map + .lock() + .iter_mut() + .map(|(name, replay)| { + let queued = REPLAY_DURATION.min(replay.duration_ready()); + (name.clone(), replay.take_duration(queued).record()) + }) + .collect(); + let longest = recordings + .iter() + .map(|(_, r)| { + r.total_duration() + .expect("SamplesBuffer always returns a total duration") + }) + .max() + .ok_or(anyhow!("There is no audio to capture"))?; + + let path = std::env::current_dir() + .context("Could not get current dir")? + .join("replays.tar"); + let tar = File::create(&path) + .await + .context("Could not create file for tar")?; + + let mut tar = Builder::new(tar); + + for (name, recording) in recordings { + let mut writer = io::Cursor::new(Vec::new()); + rodio::wav_to_writer(recording, &mut writer).context("failed to encode wav")?; + let wav_data = writer.into_inner(); + let path = name.replace(' ', "_") + ".wav"; + let mut header = Header::new_gnu(); + // rw permissions for everyone + header.set_mode(0o666); + header.set_size(wav_data.len() as u64); + tar.append_data(&mut header, path, wav_data.as_slice()) + .await + .context("failed to apped wav to tar")?; + } + tar.into_inner() + .await + .context("Could not finish writing tar")? + .sync_all() + .await + .context("Could not flush tar file to disk")?; + Ok((path, longest)) + }) + } +} diff --git a/crates/audio/src/rodio_ext.rs b/crates/audio/src/rodio_ext.rs index 23e85a4163ad64a8f03e7bbea8a5cdd6f1918d8d..c23e48a5592cd97cfc19bc72f84c7f34954acafc 100644 --- a/crates/audio/src/rodio_ext.rs +++ b/crates/audio/src/rodio_ext.rs @@ -1,18 +1,28 @@ -use rodio::Source; +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use crossbeam::queue::ArrayQueue; +use rodio::{ChannelCount, Sample, SampleRate, Source}; pub trait RodioExt: Source + Sized { fn process_buffer(self, callback: F) -> ProcessBuffer where - F: FnMut(&mut [rodio::Sample; N]); + F: FnMut(&mut [Sample; N]); fn inspect_buffer(self, callback: F) -> InspectBuffer where - F: FnMut(&[rodio::Sample; N]); + F: FnMut(&[Sample; N]); + fn replayable(self, duration: Duration) -> (Replay, Replayable); } impl RodioExt for S { fn process_buffer(self, callback: F) -> ProcessBuffer where - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { ProcessBuffer { inner: self, @@ -23,7 +33,7 @@ impl RodioExt for S { } fn inspect_buffer(self, callback: F) -> InspectBuffer where - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { InspectBuffer { inner: self, @@ -32,17 +42,82 @@ impl RodioExt for S { free: 0, } } + fn replayable(self, duration: Duration) -> (Replay, Replayable) { + let samples_per_second = self.sample_rate().get() * self.channels().get() as u32; + let samples_to_queue = duration.as_secs_f64() * samples_per_second as f64; + let samples_to_queue = + (samples_to_queue as usize).next_multiple_of(self.channels().get().into()); + + let chunk_size = + samples_to_queue.min(1000usize.next_multiple_of(self.channels().get().into())); + let chunks_to_queue = samples_to_queue.div_ceil(chunk_size); + + let queue = Arc::new(ReplayQueue::new(chunks_to_queue, chunk_size)); + ( + Replay { + rx: Arc::clone(&queue), + buffer: Vec::new().into_iter(), + sleep_duration: duration / 2, + sample_rate: self.sample_rate(), + channel_count: self.channels(), + }, + Replayable { + tx: queue, + inner: self, + buffer: Vec::with_capacity(chunk_size), + chunk_size, + }, + ) + } +} + +#[derive(Debug)] +struct ReplayQueue { + inner: ArrayQueue>, + normal_chunk_len: usize, + /// The last chunk in the queue may be smaller then + /// the normal chunk size. This is always equal to the + /// size of the last element in the queue. + /// (so normally chunk_size) + last_chunk_len: AtomicUsize, +} + +impl ReplayQueue { + fn new(queue_len: usize, chunk_size: usize) -> Self { + Self { + inner: ArrayQueue::new(queue_len), + normal_chunk_len: chunk_size, + last_chunk_len: AtomicUsize::new(chunk_size), + } + } + fn len(&self) -> usize { + self.inner.len().saturating_sub(1) * self.normal_chunk_len + + self.last_chunk_len.load(Ordering::Acquire) + } + + fn pop(&self) -> Option> { + self.inner.pop() + } + + fn push_last(&self, samples: Vec) { + self.last_chunk_len.store(samples.len(), Ordering::Release); + let _pushed_out_of_ringbuf = self.inner.force_push(samples); + } + + fn push_normal(&self, samples: Vec) { + let _pushed_out_of_ringbuf = self.inner.force_push(samples); + } } pub struct ProcessBuffer where S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { inner: S, callback: F, /// Buffer used for both input and output. - buffer: [rodio::Sample; N], + buffer: [Sample; N], /// Next already processed sample is at this index /// in buffer. /// @@ -54,9 +129,9 @@ where impl Iterator for ProcessBuffer where S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { - type Item = rodio::Sample; + type Item = Sample; fn next(&mut self) -> Option { self.next += 1; @@ -78,7 +153,7 @@ where impl Source for ProcessBuffer where S: Source + Sized, - F: FnMut(&mut [rodio::Sample; N]), + F: FnMut(&mut [Sample; N]), { fn current_span_len(&self) -> Option { // TODO dvdsk this should be a spanless Source @@ -101,12 +176,12 @@ where pub struct InspectBuffer where S: Source + Sized, - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { inner: S, callback: F, /// Stores already emitted samples, once its full we call the callback. - buffer: [rodio::Sample; N], + buffer: [Sample; N], /// Next free element in buffer. If this is equal to the buffer length /// we have no more free lements. free: usize, @@ -115,9 +190,9 @@ where impl Iterator for InspectBuffer where S: Source + Sized, - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { - type Item = rodio::Sample; + type Item = Sample; fn next(&mut self) -> Option { let Some(sample) = self.inner.next() else { @@ -139,7 +214,7 @@ where impl Source for InspectBuffer where S: Source + Sized, - F: FnMut(&[rodio::Sample; N]), + F: FnMut(&[Sample; N]), { fn current_span_len(&self) -> Option { // TODO dvdsk this should be a spanless Source @@ -159,21 +234,135 @@ where } } +#[derive(Debug)] +pub struct Replayable { + inner: S, + buffer: Vec, + chunk_size: usize, + tx: Arc, +} + +impl Iterator for Replayable { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.inner.next() { + self.buffer.push(sample); + if self.buffer.len() == self.chunk_size { + self.tx.push_normal(std::mem::take(&mut self.buffer)); + } + Some(sample) + } else { + let last_chunk = std::mem::take(&mut self.buffer); + self.tx.push_last(last_chunk); + None + } + } +} + +impl Source for Replayable { + fn current_span_len(&self) -> Option { + // Todo dvdsk should be spanless too + self.inner.current_span_len() + } + + fn channels(&self) -> ChannelCount { + self.inner.channels() + } + + fn sample_rate(&self) -> SampleRate { + self.inner.sample_rate() + } + + fn total_duration(&self) -> Option { + self.inner.total_duration() + } +} + +#[derive(Debug)] +pub struct Replay { + rx: Arc, + buffer: std::vec::IntoIter, + sleep_duration: Duration, + sample_rate: SampleRate, + channel_count: ChannelCount, +} + +impl Replay { + pub fn source_is_active(&self) -> bool { + Arc::strong_count(&self.rx) == 2 + } + + /// Returns duration of what is in the buffer and + /// can be returned without blocking. + pub fn duration_ready(&self) -> Duration { + let samples_per_second = self.channels().get() as u32 * self.sample_rate().get(); + let samples_queued = self.rx.len() + self.buffer.len(); + + let seconds_queued = samples_queued as f64 / samples_per_second as f64; + Duration::from_secs_f64(seconds_queued) + } +} + +impl Iterator for Replay { + type Item = Sample; + + fn next(&mut self) -> Option { + if let Some(sample) = self.buffer.next() { + return Some(sample); + } + + loop { + if let Some(new_buffer) = self.rx.pop() { + self.buffer = new_buffer.into_iter(); + return self.buffer.next(); + } + + if !self.source_is_active() { + return None; + } + + std::thread::sleep(self.sleep_duration); + } + } +} + +impl Source for Replay { + fn current_span_len(&self) -> Option { + None // source is not compatible with spans + } + + fn channels(&self) -> ChannelCount { + self.channel_count + } + + fn sample_rate(&self) -> SampleRate { + self.sample_rate + } + + fn total_duration(&self) -> Option { + None + } +} + #[cfg(test)] mod tests { - use rodio::static_buffer::StaticSamplesBuffer; + use rodio::{nz, static_buffer::StaticSamplesBuffer}; use super::*; - #[cfg(test)] + const SAMPLES: [Sample; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; + + fn test_source() -> StaticSamplesBuffer { + StaticSamplesBuffer::new(nz!(1), nz!(1), &SAMPLES) + } + mod process_buffer { use super::*; #[test] fn callback_gets_all_samples() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let _ = input .process_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES)) @@ -181,9 +370,7 @@ mod tests { } #[test] fn callback_modifies_yielded() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let yielded: Vec<_> = input .process_buffer::<{ SAMPLES.len() }, _>(|buffer| { @@ -199,9 +386,7 @@ mod tests { } #[test] fn source_truncates_to_whole_buffers() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let yielded = input .process_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3])) @@ -210,15 +395,12 @@ mod tests { } } - #[cfg(test)] mod inspect_buffer { use super::*; #[test] fn callback_gets_all_samples() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let _ = input .inspect_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES)) @@ -226,9 +408,7 @@ mod tests { } #[test] fn source_does_not_truncate() { - const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0]; - let input = - StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES); + let input = test_source(); let yielded = input .inspect_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3])) @@ -236,4 +416,54 @@ mod tests { assert_eq!(yielded, SAMPLES.len()) } } + + mod instant_replay { + use super::*; + + #[test] + fn continues_after_history() { + let input = test_source(); + + let (mut replay, mut source) = input.replayable(Duration::from_secs(3)); + + source.by_ref().take(3).count(); + let yielded: Vec = replay.by_ref().take(3).collect(); + assert_eq!(&yielded, &SAMPLES[0..3],); + + source.count(); + let yielded: Vec = replay.collect(); + assert_eq!(&yielded, &SAMPLES[3..5],); + } + + #[test] + fn keeps_only_latest() { + let input = test_source(); + + let (mut replay, mut source) = input.replayable(Duration::from_secs(2)); + + source.by_ref().take(5).count(); // get all items but do not end the source + let yielded: Vec = replay.by_ref().take(2).collect(); + // Note we do not get the last element, it has not been send yet + // due to buffering. + assert_eq!(&yielded, &SAMPLES[2..4]); + + source.count(); // exhaust source + let yielded: Vec = replay.collect(); + assert_eq!(&yielded, &[SAMPLES[4]]); + } + + #[test] + fn keeps_correct_amount_of_seconds() { + let input = StaticSamplesBuffer::new(nz!(16_000), nz!(1), &[0.0; 40_000]); + + let (replay, mut source) = input.replayable(Duration::from_secs(2)); + + source.by_ref().count(); + let n_yielded = replay.count(); + assert_eq!( + n_yielded as u32, + source.sample_rate().get() * source.channels().get() as u32 * 2 + ); + } + } } diff --git a/crates/call/src/call_impl/room.rs b/crates/call/src/call_impl/room.rs index c31a458c64124c266c56a7004746d7b6a0a4adc6..cb4802ce2699051916e6448f62ad7bc664755a6d 100644 --- a/crates/call/src/call_impl/room.rs +++ b/crates/call/src/call_impl/room.rs @@ -1322,8 +1322,16 @@ impl Room { return Task::ready(Err(anyhow!("live-kit was not initialized"))); }; + let user_name = self + .user_store + .read(cx) + .current_user() + .map(|user| user.name.clone()) + .flatten() + .unwrap_or_else(|| "unknown".to_string()); + cx.spawn(async move |this, cx| { - let publication = room.publish_local_microphone_track(cx).await; + let publication = room.publish_local_microphone_track(&user_name, cx).await; this.update(cx, |this, cx| { let live_kit = this .live_kit diff --git a/crates/livekit_client/Cargo.toml b/crates/livekit_client/Cargo.toml index 467c2f7437e7c4b92776020dc42deca47e0f61d8..921918fdac076caa10828045f69d6a2e17092446 100644 --- a/crates/livekit_client/Cargo.toml +++ b/crates/livekit_client/Cargo.toml @@ -42,7 +42,6 @@ util.workspace = true workspace-hack.workspace = true rodio = { workspace = true, features = ["wav_output", "recording"] } -dasp_sample = "0.11" [target.'cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))'.dependencies] libwebrtc = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks" } diff --git a/crates/livekit_client/examples/test_app.rs b/crates/livekit_client/examples/test_app.rs index 75806429905e6bcbfcc17f25a29007f18e78757b..58aa80ead462cc57e91b90ccde2139e235500e55 100644 --- a/crates/livekit_client/examples/test_app.rs +++ b/crates/livekit_client/examples/test_app.rs @@ -255,7 +255,10 @@ impl LivekitWindow { } else { let room = self.room.clone(); cx.spawn_in(window, async move |this, cx| { - let (publication, stream) = room.publish_local_microphone_track(cx).await.unwrap(); + let (publication, stream) = room + .publish_local_microphone_track("test_user", cx) + .await + .unwrap(); this.update(cx, |this, cx| { this.microphone_track = Some(publication); this.microphone_stream = Some(stream); diff --git a/crates/livekit_client/src/livekit_client.rs b/crates/livekit_client/src/livekit_client.rs index 50035d6eb5a9f11d22d44b223d52c1112f3b28d0..b8ecc6f771ef1840d3a34c61178e1fa463715888 100644 --- a/crates/livekit_client/src/livekit_client.rs +++ b/crates/livekit_client/src/livekit_client.rs @@ -97,9 +97,12 @@ impl Room { pub async fn publish_local_microphone_track( &self, + user_name: &str, cx: &mut AsyncApp, ) -> Result<(LocalTrackPublication, playback::AudioStream)> { - let (track, stream) = self.playback.capture_local_microphone_track(&cx)?; + let (track, stream) = self + .playback + .capture_local_microphone_track(user_name, &cx)?; let publication = self .local_participant() .publish_track( diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index f28f363d48b1719212c703513fe7c6c332ee9f91..8b1105e16bd5daf2630ea964b98a0e13db9c128a 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -1,9 +1,7 @@ use anyhow::{Context as _, Result}; use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE}; -use cpal::Sample; use cpal::traits::{DeviceTrait, StreamTrait as _}; -use dasp_sample::ToSample; use futures::channel::mpsc::UnboundedSender; use futures::{Stream, StreamExt as _}; use gpui::{ @@ -24,16 +22,13 @@ use livekit::webrtc::{ use log::info; use parking_lot::Mutex; use rodio::Source; -use rodio::source::{LimitSettings, UniformSourceIterator}; use settings::Settings; use std::cell::RefCell; -use std::num::NonZero; use std::sync::Weak; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; -use std::sync::mpsc::{TryRecvError, channel}; use std::time::Duration; use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread}; -use util::{ResultExt as _, debug_panic, maybe}; +use util::{ResultExt as _, maybe}; mod source; @@ -51,14 +46,16 @@ pub(crate) fn play_remote_audio_track( ) -> Result { let stop_handle = Arc::new(AtomicBool::new(false)); let stop_handle_clone = stop_handle.clone(); - let stream = source::LiveKitStream::new(cx.background_executor(), track) + let stream = source::LiveKitStream::new(cx.background_executor(), track); + + let stream = stream .stoppable() .periodic_access(Duration::from_millis(50), move |s| { if stop_handle.load(Ordering::Relaxed) { s.stop(); } }); - audio::Audio::play_source(stream, cx).context("Could not play audio")?; + audio::Audio::play_voip_stream(stream, track.name(), cx).context("Could not play audio")?; let on_drop = util::defer(move || { stop_handle_clone.store(true, Ordering::Relaxed); @@ -144,6 +141,7 @@ impl AudioStack { pub(crate) fn capture_local_microphone_track( &self, + user_name: &str, cx: &AsyncApp, ) -> Result<(crate::LocalAudioTrack, AudioStream)> { let source = NativeAudioSource::new( @@ -155,7 +153,7 @@ impl AudioStack { ); let track = track::LocalAudioTrack::create_audio_track( - "microphone", + user_name, RtcAudioSource::Native(source.clone()), ); @@ -172,13 +170,10 @@ impl AudioStack { let rodio_pipeline = AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default(); let capture_task = if rodio_pipeline { - let apm = cx - .try_read_global::(|audio, _| Arc::clone(&audio.echo_canceller)) - .unwrap(); // TODO fixme - self.executor.spawn(async move { - info!("Using experimental.rodio_audio audio pipeline"); - Self::capture_input_rodio(apm, frame_tx).await - }) + // TODO global might not yet have been initialized + info!("Using experimental.rodio_audio audio pipeline"); + audio::Audio::open_microphone(cx.clone(), frame_tx)?; + Task::ready(Ok(())) } else { self.executor.spawn(async move { Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into()) @@ -270,84 +265,6 @@ impl AudioStack { } } - async fn capture_input_rodio( - apm: Arc>, - frame_tx: UnboundedSender>, - ) -> Result<()> { - use audio::RodioExt; - const LIVEKIT_BUFFER_SIZE: usize = - (audio::SAMPLE_RATE.get() as usize / 100) * audio::CHANNEL_COUNT.get() as usize; - - let (stream_error_tx, stream_error_rx) = channel(); - - thread::spawn(move || { - let stream = rodio::microphone::MicrophoneBuilder::new() - .default_device()? - .default_config()? - .prefer_sample_rates([ - SAMPLE_RATE, - SAMPLE_RATE.saturating_mul(NonZero::new(2).expect("not zero")), - ]) - .prefer_channel_counts([ - NonZero::new(1).expect("not zero"), - NonZero::new(2).expect("not zero"), - ]) - .prefer_buffer_sizes(512..) - .open_stream()?; - info!("Opened microphone: {:?}", stream.config()); - let mut stream = - UniformSourceIterator::new(stream, audio::CHANNEL_COUNT, audio::SAMPLE_RATE) - .limit(LimitSettings::live_performance()) - .process_buffer::(|buffer| { - let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample()); - if let Err(e) = apm - .lock() - .process_stream( - &mut int_buffer, - audio::SAMPLE_RATE.get() as i32, - audio::CHANNEL_COUNT.get() as i32, - ) - .context("livekit audio processor error") - { - let _ = stream_error_tx.send(e); - } else { - for (sample, processed) in buffer.iter_mut().zip(&int_buffer) { - *sample = (*processed).to_sample_(); - } - } - }) - .automatic_gain_control(1.0, 4.0, 0.0, 5.0); - - loop { - let sampled: Vec<_> = stream - .by_ref() - .take(LIVEKIT_BUFFER_SIZE) - .map(|s| s.to_sample()) - .collect(); - - match stream_error_rx.try_recv() { - Ok(apm_error) => return Err::<(), _>(apm_error), - Err(TryRecvError::Disconnected) => { - debug_panic!("Stream should end on its own without sending an error") - } - Err(TryRecvError::Empty) => (), - } - - frame_tx - .unbounded_send(AudioFrame { - sample_rate: SAMPLE_RATE.get(), - num_channels: audio::CHANNEL_COUNT.get() as u32, - samples_per_channel: sampled.len() as u32 - / audio::CHANNEL_COUNT.get() as u32, - data: Cow::Owned(sampled), - }) - .context("Failed to send audio frame")? - } - }); - - Ok(()) - } - async fn capture_input( apm: Arc>, frame_tx: UnboundedSender>, diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index 51ceb081d78ecd47f898e025ee8e0cb8235e2f13..67bfe793902da94a114ca617ce5bfa33c68d02e7 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -3,16 +3,18 @@ use std::num::NonZero; use futures::StreamExt; use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame}; use livekit::track::RemoteAudioTrack; -use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter}; +use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, nz}; -use crate::livekit_client::playback::{CHANNEL_COUNT, SAMPLE_RATE}; +use audio::{CHANNEL_COUNT, SAMPLE_RATE}; fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer { let samples = frame.data.iter().copied(); let samples = SampleTypeConverter::<_, _>::new(samples); let samples: Vec = samples.collect(); SamplesBuffer::new( - NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"), + // here be dragons + // NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"), + nz!(2), NonZero::new(frame.sample_rate).expect("audio frame sample rate is nonzero"), samples, ) @@ -63,11 +65,17 @@ impl Source for LiveKitStream { } fn channels(&self) -> rodio::ChannelCount { - self.inner.channels() + // This must be hardcoded because the playback source assumes constant + // sample rate and channel count. The queue upon which this is build + // will however report different counts and rates. Even though we put in + // only items with our (constant) CHANNEL_COUNT & SAMPLE_RATE this will + // play silence on one channel and at 44100 which is not what our + // constants are. + CHANNEL_COUNT } fn sample_rate(&self) -> rodio::SampleRate { - self.inner.sample_rate() + SAMPLE_RATE // see comment on channels } fn total_duration(&self) -> Option { diff --git a/crates/livekit_client/src/record.rs b/crates/livekit_client/src/record.rs index ad9d9435d9d029b7725b8858eb569d0794512dea..24e260e71665704c1010d07e082a03fbe6306a30 100644 --- a/crates/livekit_client/src/record.rs +++ b/crates/livekit_client/src/record.rs @@ -89,7 +89,7 @@ fn write_out( NonZero::new(config.sample_rate().0).expect("config sample_rate is never zero"), samples, ); - match rodio::output_to_wav(&mut samples, path) { + match rodio::wav_to_file(&mut samples, path) { Ok(_) => Ok(()), Err(e) => Err(anyhow::anyhow!("Failed to write wav file: {}", e)), } diff --git a/crates/livekit_client/src/test.rs b/crates/livekit_client/src/test.rs index 873e0222d013c20c5f4aff2a263b925c7d21aff6..612b032895b42afcfdc09e8d24023fbd1fcdf5c1 100644 --- a/crates/livekit_client/src/test.rs +++ b/crates/livekit_client/src/test.rs @@ -728,6 +728,7 @@ impl Room { pub async fn publish_local_microphone_track( &self, + _track_name: &str, cx: &mut AsyncApp, ) -> Result<(LocalTrackPublication, AudioStream)> { self.local_participant().publish_microphone_track(cx).await diff --git a/crates/zed/src/zed.rs b/crates/zed/src/zed.rs index cb5cfea81a06fd27cb19d7f2e2cd845e9236ae2c..ea2106593eb1e0788b3b8a46f7ae8c32ac75c1e4 100644 --- a/crates/zed/src/zed.rs +++ b/crates/zed/src/zed.rs @@ -13,6 +13,7 @@ use agent_ui::{AgentDiffToolbar, AgentPanelDelegate}; use anyhow::Context as _; pub use app_menus::*; use assets::Assets; +use audio::{AudioSettings, REPLAY_DURATION}; use breadcrumbs::Breadcrumbs; use client::zed_urls; use collections::VecDeque; @@ -128,7 +129,10 @@ actions!( dev, [ /// Record 10s of audio from your current microphone - CaptureAudio + CaptureAudio, + /// Stores last 30s of audio from everyone on the current call + /// in a tar file in the current working directory. + CaptureRecentAudio, ] ); @@ -925,6 +929,9 @@ fn register_actions( }) .register_action(|workspace, _: &CaptureAudio, window, cx| { capture_audio(workspace, window, cx); + }) + .register_action(|workspace, _: &CaptureRecentAudio, window, cx| { + capture_recent_audio(workspace, window, cx); }); if workspace.project().read(cx).is_via_remote_server() { @@ -1937,6 +1944,85 @@ fn capture_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context) { + struct CaptureRecentAudioNotification { + focus_handle: gpui::FocusHandle, + save_result: Option>, + _save_task: Task>, + } + + impl gpui::EventEmitter for CaptureRecentAudioNotification {} + impl gpui::EventEmitter for CaptureRecentAudioNotification {} + impl gpui::Focusable for CaptureRecentAudioNotification { + fn focus_handle(&self, _cx: &App) -> gpui::FocusHandle { + self.focus_handle.clone() + } + } + impl workspace::notifications::Notification for CaptureRecentAudioNotification {} + + impl Render for CaptureRecentAudioNotification { + fn render(&mut self, _window: &mut Window, cx: &mut Context) -> impl IntoElement { + let message = match &self.save_result { + None => format!( + "Saving up to {} seconds of recent audio", + REPLAY_DURATION.as_secs(), + ), + Some(Ok((path, duration))) => format!( + "Saved {} seconds of all audio to {}", + duration.as_secs(), + path.display(), + ), + Some(Err(e)) => format!("Error saving audio replays: {e:?}"), + }; + + NotificationFrame::new() + .with_title(Some("Saved Audio")) + .show_suppress_button(false) + .on_close(cx.listener(|_, _, _, cx| { + cx.emit(DismissEvent); + })) + .with_content(message) + } + } + + impl CaptureRecentAudioNotification { + fn new(cx: &mut Context) -> Self { + if AudioSettings::get_global(cx).rodio_audio { + let executor = cx.background_executor().clone(); + let save_task = cx.default_global::().save_replays(executor); + let _save_task = cx.spawn(async move |this, cx| { + let res = save_task.await; + this.update(cx, |this, cx| { + this.save_result = Some(res); + cx.notify(); + }) + }); + + Self { + focus_handle: cx.focus_handle(), + _save_task, + save_result: None, + } + } else { + Self { + focus_handle: cx.focus_handle(), + _save_task: Task::ready(Ok(())), + save_result: Some(Err(anyhow::anyhow!( + "Capturing recent audio is only supported on the experimental rodio audio pipeline" + ))), + } + } + } + } + + workspace.show_notification( + NotificationId::unique::(), + cx, + |cx| cx.new(CaptureRecentAudioNotification::new), + ); +} + #[cfg(test)] mod tests { use super::*;