Adds audio replay system, replay action and moves microphone code to audio crate

David Kleingeld created

The replay system stores the last 30 seconds of audio for every call source and the users microphone. Using the `CaptureRecentAudio` action all those tracks gets stored in a tar as separate .wav files. We show notification when starting the save and when its done. With the replay system the microphone code got more tightly bound to the audio crate than the livekit_client crate, so it was moved to audio.

Change summary

Cargo.lock                                                  |  58 
Cargo.toml                                                  |   3 
crates/audio/Cargo.toml                                     |  11 
crates/audio/src/audio.rs                                   | 163 +++
crates/audio/src/audio_settings.rs                          |   7 
crates/audio/src/replays.rs                                 |  80 +
crates/audio/src/rodio_ext.rs                               | 296 ++++++
crates/call/src/call_impl/room.rs                           |  10 
crates/livekit_client/Cargo.toml                            |   1 
crates/livekit_client/examples/test_app.rs                  |   5 
crates/livekit_client/src/livekit_client.rs                 |   5 
crates/livekit_client/src/livekit_client/playback.rs        | 105 --
crates/livekit_client/src/livekit_client/playback/source.rs |  18 
crates/livekit_client/src/record.rs                         |   2 
crates/livekit_client/src/test.rs                           |   1 
crates/zed/src/zed.rs                                       |  88 ++
16 files changed, 669 insertions(+), 184 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1385,14 +1385,19 @@ name = "audio"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-tar",
  "collections",
+ "crossbeam",
+ "futures 0.3.31",
  "gpui",
  "libwebrtc",
+ "log",
  "parking_lot",
  "rodio",
  "schemars",
  "serde",
  "settings",
+ "smol",
  "util",
  "workspace-hack",
 ]
@@ -2676,7 +2681,7 @@ dependencies = [
  "cap-primitives",
  "cap-std",
  "io-lifetimes",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -2705,7 +2710,7 @@ dependencies = [
  "maybe-owned",
  "rustix 1.0.7",
  "rustix-linux-procfs",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
  "winx",
 ]
 
@@ -4146,6 +4151,19 @@ dependencies = [
  "itertools 0.10.5",
 ]
 
+[[package]]
+name = "crossbeam"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-epoch",
+ "crossbeam-queue",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-channel"
 version = "0.5.15"
@@ -5296,7 +5314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e"
 dependencies = [
  "libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -5683,7 +5701,7 @@ checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78"
 dependencies = [
  "cfg-if",
  "rustix 1.0.7",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -6061,7 +6079,7 @@ checksum = "94e7099f6313ecacbe1256e8ff9d617b75d1bcb16a6fddef94866d225a01a14a"
 dependencies = [
  "io-lifetimes",
  "rustix 1.0.7",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -8504,7 +8522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2285ddfe3054097ef4b2fe909ef8c3bcd1ea52a8f0d274416caebeef39f04a65"
 dependencies = [
  "io-lifetimes",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -8577,7 +8595,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
 dependencies = [
  "hermit-abi 0.5.0",
  "libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -8659,7 +8677,7 @@ dependencies = [
  "portable-atomic",
  "portable-atomic-util",
  "serde",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -9400,7 +9418,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
 dependencies = [
  "cfg-if",
- "windows-targets 0.52.6",
+ "windows-targets 0.48.5",
 ]
 
 [[package]]
@@ -9637,7 +9655,6 @@ dependencies = [
  "core-video",
  "coreaudio-rs 0.12.1",
  "cpal",
- "dasp_sample",
  "futures 0.3.31",
  "gpui",
  "gpui_tokio",
@@ -13059,7 +13076,7 @@ dependencies = [
  "once_cell",
  "socket2",
  "tracing",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -13856,7 +13873,7 @@ dependencies = [
 [[package]]
 name = "rodio"
 version = "0.21.1"
-source = "git+https://github.com/RustAudio/rodio?branch=microphone#0e6e6436b3a97f4af72baafe11a02ade2b457b62"
+source = "git+https://github.com/RustAudio/rodio?branch=better_wav_output#82514bd1f2c6cfd9a1a885019b26a8ffea75bc5c"
 dependencies = [
  "cpal",
  "dasp_sample",
@@ -14086,7 +14103,7 @@ dependencies = [
  "itoa",
  "libc",
  "linux-raw-sys 0.4.15",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -14099,7 +14116,7 @@ dependencies = [
  "errno 0.3.11",
  "libc",
  "linux-raw-sys 0.9.4",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -14221,7 +14238,7 @@ dependencies = [
  "security-framework 3.2.0",
  "security-framework-sys",
  "webpki-root-certs",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -15535,7 +15552,7 @@ dependencies = [
  "cfg-if",
  "libc",
  "psm",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -16193,7 +16210,7 @@ dependencies = [
  "fd-lock",
  "io-lifetimes",
  "rustix 0.38.44",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
  "winx",
 ]
 
@@ -16375,7 +16392,7 @@ dependencies = [
  "getrandom 0.3.2",
  "once_cell",
  "rustix 1.0.7",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -18943,7 +18960,7 @@ version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
 dependencies = [
- "windows-sys 0.52.0",
+ "windows-sys 0.48.0",
 ]
 
 [[package]]
@@ -19602,7 +19619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "3f3fd376f71958b862e7afb20cfe5a22830e1963462f3a17f49d82a6c1d1f42d"
 dependencies = [
  "bitflags 2.9.0",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]
@@ -19936,6 +19953,7 @@ dependencies = [
  "core-foundation-sys",
  "cranelift-codegen",
  "crc32fast",
+ "crossbeam-channel",
  "crossbeam-epoch",
  "crossbeam-utils",
  "crypto-common",

Cargo.toml 🔗

@@ -276,6 +276,7 @@ context_server = { path = "crates/context_server" }
 copilot = { path = "crates/copilot" }
 crashes = { path = "crates/crashes" }
 credentials_provider = { path = "crates/credentials_provider" }
+crossbeam = "0.8.4"
 dap = { path = "crates/dap" }
 dap_adapters = { path = "crates/dap_adapters" }
 db = { path = "crates/db" }
@@ -367,7 +368,7 @@ remote_server = { path = "crates/remote_server" }
 repl = { path = "crates/repl" }
 reqwest_client = { path = "crates/reqwest_client" }
 rich_text = { path = "crates/rich_text" }
-rodio = { git = "https://github.com/RustAudio/rodio", branch = "microphone"}
+rodio = { git = "https://github.com/RustAudio/rodio", branch = "better_wav_output"}
 rope = { path = "crates/rope" }
 rpc = { path = "crates/rpc" }
 rules_library = { path = "crates/rules_library" }

crates/audio/Cargo.toml 🔗

@@ -14,13 +14,18 @@ doctest = false
 
 [dependencies]
 anyhow.workspace = true
+async-tar.workspace = true
 collections.workspace = true
+crossbeam.workspace = true
 gpui.workspace = true
-settings.workspace = true
+log.workspace = true
+futures.workspace = true
+parking_lot.workspace = true
+rodio = { workspace = true, features = [ "wav", "playback" ] }
 schemars.workspace = true
 serde.workspace = true
-parking_lot.workspace = true
-rodio = { workspace = true, features = [ "wav", "playback", "tracing" ] }
+settings.workspace = true
+smol.workspace = true
 util.workspace = true
 workspace-hack.workspace = true
 

crates/audio/src/audio.rs 🔗

@@ -1,37 +1,57 @@
-use anyhow::{Context as _, Result, anyhow};
+use anyhow::{Context as _, Result};
 use collections::HashMap;
-use gpui::{App, BorrowAppContext, Global};
-use libwebrtc::native::apm;
+use futures::channel::mpsc::UnboundedSender;
+use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
+use libwebrtc::{native::apm, prelude::AudioFrame};
+use log::info;
 use parking_lot::Mutex;
 use rodio::{
-    Decoder, OutputStream, OutputStreamBuilder, Source, cpal::Sample, mixer::Mixer,
-    source::Buffered,
+    Decoder, OutputStream, OutputStreamBuilder, Source,
+    cpal::Sample,
+    mixer::Mixer,
+    nz,
+    source::{Buffered, LimitSettings, UniformSourceIterator},
 };
 use settings::Settings;
-use std::{io::Cursor, num::NonZero, sync::Arc};
-use util::ResultExt;
+use std::{
+    borrow::Cow,
+    io::Cursor,
+    num::NonZero,
+    path::PathBuf,
+    sync::{
+        Arc,
+        mpsc::{TryRecvError, channel},
+    },
+    thread,
+    time::Duration,
+};
+use util::{ResultExt, debug_panic};
 
 mod audio_settings;
+mod replays;
 mod rodio_ext;
 pub use audio_settings::AudioSettings;
 pub use rodio_ext::RodioExt;
 
-// NOTE: We use WebRTC's mixer which only supports
+// NOTE: We used to use WebRTC's mixer which only supported
 // 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
 // for audio output devices like speakers/bluetooth, we just hard-code
 // this; and downsample when we need to.
 //
 // Since most noise cancelling requires 16kHz we will move to
-// that in the future. Same for channel count. That should be input
-// channels and fixed to 1.
-pub const SAMPLE_RATE: NonZero<u32> = NonZero::new(48000).expect("not zero");
-pub const CHANNEL_COUNT: NonZero<u16> = NonZero::new(2).expect("not zero");
+// that in the future.
+pub const SAMPLE_RATE: NonZero<u32> = nz!(48000);
+pub const CHANNEL_COUNT: NonZero<u16> = nz!(2);
+const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
+    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
+
+pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 
 pub fn init(cx: &mut App) {
     AudioSettings::register(cx);
 }
 
-#[derive(Copy, Clone, Eq, Hash, PartialEq)]
+#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 pub enum Sound {
     Joined,
     Leave,
@@ -61,6 +81,7 @@ pub struct Audio {
     output_mixer: Option<Mixer>,
     pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
     source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
+    replays: replays::Replays,
 }
 
 impl Default for Audio {
@@ -72,6 +93,7 @@ impl Default for Audio {
                 true, false, false, false,
             ))),
             source_cache: Default::default(),
+            replays: Default::default(),
         }
     }
 }
@@ -79,16 +101,19 @@ impl Default for Audio {
 impl Global for Audio {}
 
 impl Audio {
-    fn ensure_output_exists(&mut self) -> Option<&Mixer> {
+    fn ensure_output_exists(&mut self) -> Result<&Mixer> {
         if self.output_handle.is_none() {
-            self.output_handle = OutputStreamBuilder::open_default_stream().log_err();
+            self.output_handle = Some(
+                OutputStreamBuilder::open_default_stream()
+                    .context("Could not open default output stream")?,
+            );
             if let Some(output_handle) = &self.output_handle {
                 let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
+                // or the mixer will end immediately as its empty.
+                mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
                 self.output_mixer = Some(mixer);
 
                 let echo_canceller = Arc::clone(&self.echo_canceller);
-                const BUFFER_SIZE: usize = // echo canceller wants 10ms of audio
-                    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
                 let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
                     let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
                     echo_canceller
@@ -104,18 +129,109 @@ impl Audio {
             }
         }
 
-        self.output_mixer.as_ref()
+        Ok(self
+            .output_mixer
+            .as_ref()
+            .expect("we only get here if opening the outputstream succeeded"))
+    }
+
+    pub fn save_replays(
+        &self,
+        executor: BackgroundExecutor,
+    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
+        self.replays.replays_to_tar(executor)
     }
 
-    pub fn play_source(
-        source: impl rodio::Source + Send + 'static,
+    pub fn open_microphone(
+        cx: AsyncApp,
+        frame_tx: UnboundedSender<AudioFrame<'static>>,
+    ) -> anyhow::Result<()> {
+        let (apm, mut replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
+            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
+        })?;
+
+        let (stream_error_tx, stream_error_rx) = channel();
+        thread::spawn(move || {
+            let stream = rodio::microphone::MicrophoneBuilder::new()
+                .default_device()?
+                .default_config()?
+                .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))])
+                .prefer_channel_counts([nz!(1), nz!(2)])
+                .prefer_buffer_sizes(512..)
+                .open_stream()?;
+            info!("Opened microphone: {:?}", stream.config());
+
+            let stream = UniformSourceIterator::new(stream, CHANNEL_COUNT, SAMPLE_RATE)
+                .limit(LimitSettings::live_performance())
+                .process_buffer::<BUFFER_SIZE, _>(|buffer| {
+                    let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
+                    if let Err(e) = apm
+                        .lock()
+                        .process_stream(
+                            &mut int_buffer,
+                            SAMPLE_RATE.get() as i32,
+                            CHANNEL_COUNT.get() as i32,
+                        )
+                        .context("livekit audio processor error")
+                    {
+                        let _ = stream_error_tx.send(e);
+                    } else {
+                        for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
+                            *sample = (*processed).to_sample();
+                        }
+                    }
+                })
+                .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
+                .periodic_access(Duration::from_millis(100), move |agc_source| {
+                    agc_source.set_enabled(true); // todo dvdsk how to get settings in here?
+                });
+
+            // todo dvdsk keep the above here, move the rest back to livekit?
+            let (replay, mut stream) = stream.replayable(REPLAY_DURATION);
+            replays.add_voip_stream("local microphone".to_string(), replay);
+
+            loop {
+                let sampled: Vec<_> = stream
+                    .by_ref()
+                    .take(BUFFER_SIZE)
+                    .map(|s| s.to_sample())
+                    .collect();
+
+                match stream_error_rx.try_recv() {
+                    Ok(apm_error) => return Err::<(), _>(apm_error),
+                    Err(TryRecvError::Disconnected) => {
+                        debug_panic!("Stream should end on its own without sending an error")
+                    }
+                    Err(TryRecvError::Empty) => (),
+                }
+
+                frame_tx
+                    .unbounded_send(AudioFrame {
+                        sample_rate: SAMPLE_RATE.get(),
+                        num_channels: CHANNEL_COUNT.get() as u32,
+                        samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32,
+                        data: Cow::Owned(sampled),
+                    })
+                    .context("Failed to send audio frame")?
+            }
+        });
+
+        Ok(())
+    }
+
+    pub fn play_voip_stream(
+        stream_source: impl rodio::Source + Send + 'static,
+        stream_name: String,
         cx: &mut App,
     ) -> anyhow::Result<()> {
+        let (replay_source, source) = stream_source.replayable(REPLAY_DURATION);
+
         cx.update_default_global(|this: &mut Self, _cx| {
             let output_mixer = this
                 .ensure_output_exists()
-                .ok_or_else(|| anyhow!("Could not open audio output"))?;
+                .context("Could not get output mixer")?;
             output_mixer.add(source);
+            this.replays.add_voip_stream(stream_name, replay_source);
             Ok(())
         })
     }
@@ -123,7 +239,10 @@ impl Audio {
     pub fn play_sound(sound: Sound, cx: &mut App) {
         cx.update_default_global(|this: &mut Self, cx| {
             let source = this.sound_source(sound, cx).log_err()?;
-            let output_mixer = this.ensure_output_exists()?;
+            let output_mixer = this
+                .ensure_output_exists()
+                .context("Could not get output mixer")
+                .log_err()?;
 
             output_mixer.add(source);
             Some(())

crates/audio/src/audio_settings.rs 🔗

@@ -9,6 +9,9 @@ pub struct AudioSettings {
     /// Opt into the new audio system.
     #[serde(rename = "experimental.rodio_audio", default)]
     pub rodio_audio: bool, // default is false
+    /// Opt into the new audio systems automatic gain control
+    #[serde(rename = "experimental.automatic_volume", default)]
+    pub automatic_volume: bool,
 }
 
 /// Configuration of audio in Zed.
@@ -19,6 +22,10 @@ pub struct AudioSettingsContent {
     /// Whether to use the experimental audio system
     #[serde(rename = "experimental.rodio_audio", default)]
     pub rodio_audio: bool,
+    /// Whether the experimental audio systems should automatically
+    /// manage the volume of calls
+    #[serde(rename = "experimental.automatic_volume", default)]
+    pub automatic_volume: bool,
 }
 
 impl Settings for AudioSettings {

crates/audio/src/replays.rs 🔗

@@ -0,0 +1,80 @@
+use anyhow::{Context, anyhow};
+use async_tar::{Builder, Header};
+use gpui::{BackgroundExecutor, Task};
+
+use collections::HashMap;
+use parking_lot::Mutex;
+use rodio::Source;
+use smol::fs::File;
+use std::{io, path::PathBuf, sync::Arc, time::Duration};
+
+use crate::{REPLAY_DURATION, rodio_ext::Replay};
+
+#[derive(Default, Clone)]
+pub(crate) struct Replays(Arc<Mutex<HashMap<String, Replay>>>);
+
+impl Replays {
+    pub(crate) fn add_voip_stream(&mut self, stream_name: String, source: Replay) {
+        let mut map = self.0.lock();
+        map.retain(|_, replay| replay.source_is_active());
+        // on the old pipeline all the streams are named microphone
+        // make sure names dont collide in that case by adding a number.
+        let stream_name = stream_name + &map.len().to_string();
+        map.insert(stream_name, source);
+    }
+
+    pub(crate) fn replays_to_tar(
+        &self,
+        executor: BackgroundExecutor,
+    ) -> Task<anyhow::Result<(PathBuf, Duration)>> {
+        let map = Arc::clone(&self.0);
+        executor.spawn(async move {
+            let recordings: Vec<_> = map
+                .lock()
+                .iter_mut()
+                .map(|(name, replay)| {
+                    let queued = REPLAY_DURATION.min(replay.duration_ready());
+                    (name.clone(), replay.take_duration(queued).record())
+                })
+                .collect();
+            let longest = recordings
+                .iter()
+                .map(|(_, r)| {
+                    r.total_duration()
+                        .expect("SamplesBuffer always returns a total duration")
+                })
+                .max()
+                .ok_or(anyhow!("There is no audio to capture"))?;
+
+            let path = std::env::current_dir()
+                .context("Could not get current dir")?
+                .join("replays.tar");
+            let tar = File::create(&path)
+                .await
+                .context("Could not create file for tar")?;
+
+            let mut tar = Builder::new(tar);
+
+            for (name, recording) in recordings {
+                let mut writer = io::Cursor::new(Vec::new());
+                rodio::wav_to_writer(recording, &mut writer).context("failed to encode wav")?;
+                let wav_data = writer.into_inner();
+                let path = name.replace(' ', "_") + ".wav";
+                let mut header = Header::new_gnu();
+                // rw permissions for everyone
+                header.set_mode(0o666);
+                header.set_size(wav_data.len() as u64);
+                tar.append_data(&mut header, path, wav_data.as_slice())
+                    .await
+                    .context("failed to apped wav to tar")?;
+            }
+            tar.into_inner()
+                .await
+                .context("Could not finish writing tar")?
+                .sync_all()
+                .await
+                .context("Could not flush tar file to disk")?;
+            Ok((path, longest))
+        })
+    }
+}

crates/audio/src/rodio_ext.rs 🔗

@@ -1,18 +1,28 @@
-use rodio::Source;
+use std::{
+    sync::{
+        Arc,
+        atomic::{AtomicUsize, Ordering},
+    },
+    time::Duration,
+};
+
+use crossbeam::queue::ArrayQueue;
+use rodio::{ChannelCount, Sample, SampleRate, Source};
 
 pub trait RodioExt: Source + Sized {
     fn process_buffer<const N: usize, F>(self, callback: F) -> ProcessBuffer<N, Self, F>
     where
-        F: FnMut(&mut [rodio::Sample; N]);
+        F: FnMut(&mut [Sample; N]);
     fn inspect_buffer<const N: usize, F>(self, callback: F) -> InspectBuffer<N, Self, F>
     where
-        F: FnMut(&[rodio::Sample; N]);
+        F: FnMut(&[Sample; N]);
+    fn replayable(self, duration: Duration) -> (Replay, Replayable<Self>);
 }
 
 impl<S: Source> RodioExt for S {
     fn process_buffer<const N: usize, F>(self, callback: F) -> ProcessBuffer<N, Self, F>
     where
-        F: FnMut(&mut [rodio::Sample; N]),
+        F: FnMut(&mut [Sample; N]),
     {
         ProcessBuffer {
             inner: self,
@@ -23,7 +33,7 @@ impl<S: Source> RodioExt for S {
     }
     fn inspect_buffer<const N: usize, F>(self, callback: F) -> InspectBuffer<N, Self, F>
     where
-        F: FnMut(&[rodio::Sample; N]),
+        F: FnMut(&[Sample; N]),
     {
         InspectBuffer {
             inner: self,
@@ -32,17 +42,82 @@ impl<S: Source> RodioExt for S {
             free: 0,
         }
     }
+    fn replayable(self, duration: Duration) -> (Replay, Replayable<Self>) {
+        let samples_per_second = self.sample_rate().get() * self.channels().get() as u32;
+        let samples_to_queue = duration.as_secs_f64() * samples_per_second as f64;
+        let samples_to_queue =
+            (samples_to_queue as usize).next_multiple_of(self.channels().get().into());
+
+        let chunk_size =
+            samples_to_queue.min(1000usize.next_multiple_of(self.channels().get().into()));
+        let chunks_to_queue = samples_to_queue.div_ceil(chunk_size);
+
+        let queue = Arc::new(ReplayQueue::new(chunks_to_queue, chunk_size));
+        (
+            Replay {
+                rx: Arc::clone(&queue),
+                buffer: Vec::new().into_iter(),
+                sleep_duration: duration / 2,
+                sample_rate: self.sample_rate(),
+                channel_count: self.channels(),
+            },
+            Replayable {
+                tx: queue,
+                inner: self,
+                buffer: Vec::with_capacity(chunk_size),
+                chunk_size,
+            },
+        )
+    }
+}
+
+#[derive(Debug)]
+struct ReplayQueue {
+    inner: ArrayQueue<Vec<Sample>>,
+    normal_chunk_len: usize,
+    /// The last chunk in the queue may be smaller then
+    /// the normal chunk size. This is always equal to the
+    /// size of the last element in the queue.
+    /// (so normally chunk_size)
+    last_chunk_len: AtomicUsize,
+}
+
+impl ReplayQueue {
+    fn new(queue_len: usize, chunk_size: usize) -> Self {
+        Self {
+            inner: ArrayQueue::new(queue_len),
+            normal_chunk_len: chunk_size,
+            last_chunk_len: AtomicUsize::new(chunk_size),
+        }
+    }
+    fn len(&self) -> usize {
+        self.inner.len().saturating_sub(1) * self.normal_chunk_len
+            + self.last_chunk_len.load(Ordering::Acquire)
+    }
+
+    fn pop(&self) -> Option<Vec<Sample>> {
+        self.inner.pop()
+    }
+
+    fn push_last(&self, samples: Vec<Sample>) {
+        self.last_chunk_len.store(samples.len(), Ordering::Release);
+        let _pushed_out_of_ringbuf = self.inner.force_push(samples);
+    }
+
+    fn push_normal(&self, samples: Vec<Sample>) {
+        let _pushed_out_of_ringbuf = self.inner.force_push(samples);
+    }
 }
 
 pub struct ProcessBuffer<const N: usize, S, F>
 where
     S: Source + Sized,
-    F: FnMut(&mut [rodio::Sample; N]),
+    F: FnMut(&mut [Sample; N]),
 {
     inner: S,
     callback: F,
     /// Buffer used for both input and output.
-    buffer: [rodio::Sample; N],
+    buffer: [Sample; N],
     /// Next already processed sample is at this index
     /// in buffer.
     ///
@@ -54,9 +129,9 @@ where
 impl<const N: usize, S, F> Iterator for ProcessBuffer<N, S, F>
 where
     S: Source + Sized,
-    F: FnMut(&mut [rodio::Sample; N]),
+    F: FnMut(&mut [Sample; N]),
 {
-    type Item = rodio::Sample;
+    type Item = Sample;
 
     fn next(&mut self) -> Option<Self::Item> {
         self.next += 1;
@@ -78,7 +153,7 @@ where
 impl<const N: usize, S, F> Source for ProcessBuffer<N, S, F>
 where
     S: Source + Sized,
-    F: FnMut(&mut [rodio::Sample; N]),
+    F: FnMut(&mut [Sample; N]),
 {
     fn current_span_len(&self) -> Option<usize> {
         // TODO dvdsk this should be a spanless Source
@@ -101,12 +176,12 @@ where
 pub struct InspectBuffer<const N: usize, S, F>
 where
     S: Source + Sized,
-    F: FnMut(&[rodio::Sample; N]),
+    F: FnMut(&[Sample; N]),
 {
     inner: S,
     callback: F,
     /// Stores already emitted samples, once its full we call the callback.
-    buffer: [rodio::Sample; N],
+    buffer: [Sample; N],
     /// Next free element in buffer. If this is equal to the buffer length
     /// we have no more free lements.
     free: usize,
@@ -115,9 +190,9 @@ where
 impl<const N: usize, S, F> Iterator for InspectBuffer<N, S, F>
 where
     S: Source + Sized,
-    F: FnMut(&[rodio::Sample; N]),
+    F: FnMut(&[Sample; N]),
 {
-    type Item = rodio::Sample;
+    type Item = Sample;
 
     fn next(&mut self) -> Option<Self::Item> {
         let Some(sample) = self.inner.next() else {
@@ -139,7 +214,7 @@ where
 impl<const N: usize, S, F> Source for InspectBuffer<N, S, F>
 where
     S: Source + Sized,
-    F: FnMut(&[rodio::Sample; N]),
+    F: FnMut(&[Sample; N]),
 {
     fn current_span_len(&self) -> Option<usize> {
         // TODO dvdsk this should be a spanless Source
@@ -159,21 +234,135 @@ where
     }
 }
 
+#[derive(Debug)]
+pub struct Replayable<S: Source> {
+    inner: S,
+    buffer: Vec<Sample>,
+    chunk_size: usize,
+    tx: Arc<ReplayQueue>,
+}
+
+impl<S: Source> Iterator for Replayable<S> {
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(sample) = self.inner.next() {
+            self.buffer.push(sample);
+            if self.buffer.len() == self.chunk_size {
+                self.tx.push_normal(std::mem::take(&mut self.buffer));
+            }
+            Some(sample)
+        } else {
+            let last_chunk = std::mem::take(&mut self.buffer);
+            self.tx.push_last(last_chunk);
+            None
+        }
+    }
+}
+
+impl<S: Source> Source for Replayable<S> {
+    fn current_span_len(&self) -> Option<usize> {
+        // Todo dvdsk should be spanless too
+        self.inner.current_span_len()
+    }
+
+    fn channels(&self) -> ChannelCount {
+        self.inner.channels()
+    }
+
+    fn sample_rate(&self) -> SampleRate {
+        self.inner.sample_rate()
+    }
+
+    fn total_duration(&self) -> Option<Duration> {
+        self.inner.total_duration()
+    }
+}
+
+#[derive(Debug)]
+pub struct Replay {
+    rx: Arc<ReplayQueue>,
+    buffer: std::vec::IntoIter<Sample>,
+    sleep_duration: Duration,
+    sample_rate: SampleRate,
+    channel_count: ChannelCount,
+}
+
+impl Replay {
+    pub fn source_is_active(&self) -> bool {
+        Arc::strong_count(&self.rx) == 2
+    }
+
+    /// Returns duration of what is in the buffer and
+    /// can be returned without blocking.
+    pub fn duration_ready(&self) -> Duration {
+        let samples_per_second = self.channels().get() as u32 * self.sample_rate().get();
+        let samples_queued = self.rx.len() + self.buffer.len();
+
+        let seconds_queued = samples_queued as f64 / samples_per_second as f64;
+        Duration::from_secs_f64(seconds_queued)
+    }
+}
+
+impl Iterator for Replay {
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(sample) = self.buffer.next() {
+            return Some(sample);
+        }
+
+        loop {
+            if let Some(new_buffer) = self.rx.pop() {
+                self.buffer = new_buffer.into_iter();
+                return self.buffer.next();
+            }
+
+            if !self.source_is_active() {
+                return None;
+            }
+
+            std::thread::sleep(self.sleep_duration);
+        }
+    }
+}
+
+impl Source for Replay {
+    fn current_span_len(&self) -> Option<usize> {
+        None // source is not compatible with spans
+    }
+
+    fn channels(&self) -> ChannelCount {
+        self.channel_count
+    }
+
+    fn sample_rate(&self) -> SampleRate {
+        self.sample_rate
+    }
+
+    fn total_duration(&self) -> Option<Duration> {
+        None
+    }
+}
+
 #[cfg(test)]
 mod tests {
-    use rodio::static_buffer::StaticSamplesBuffer;
+    use rodio::{nz, static_buffer::StaticSamplesBuffer};
 
     use super::*;
 
-    #[cfg(test)]
+    const SAMPLES: [Sample; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
+
+    fn test_source() -> StaticSamplesBuffer {
+        StaticSamplesBuffer::new(nz!(1), nz!(1), &SAMPLES)
+    }
+
     mod process_buffer {
         use super::*;
 
         #[test]
         fn callback_gets_all_samples() {
-            const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
-            let input =
-                StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES);
+            let input = test_source();
 
             let _ = input
                 .process_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES))
@@ -181,9 +370,7 @@ mod tests {
         }
         #[test]
         fn callback_modifies_yielded() {
-            const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
-            let input =
-                StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES);
+            let input = test_source();
 
             let yielded: Vec<_> = input
                 .process_buffer::<{ SAMPLES.len() }, _>(|buffer| {
@@ -199,9 +386,7 @@ mod tests {
         }
         #[test]
         fn source_truncates_to_whole_buffers() {
-            const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
-            let input =
-                StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES);
+            let input = test_source();
 
             let yielded = input
                 .process_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3]))
@@ -210,15 +395,12 @@ mod tests {
         }
     }
 
-    #[cfg(test)]
     mod inspect_buffer {
         use super::*;
 
         #[test]
         fn callback_gets_all_samples() {
-            const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
-            let input =
-                StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES);
+            let input = test_source();
 
             let _ = input
                 .inspect_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES))
@@ -226,9 +408,7 @@ mod tests {
         }
         #[test]
         fn source_does_not_truncate() {
-            const SAMPLES: [f32; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
-            let input =
-                StaticSamplesBuffer::new(1.try_into().unwrap(), 1.try_into().unwrap(), &SAMPLES);
+            let input = test_source();
 
             let yielded = input
                 .inspect_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3]))
@@ -236,4 +416,54 @@ mod tests {
             assert_eq!(yielded, SAMPLES.len())
         }
     }
+
+    mod instant_replay {
+        use super::*;
+
+        #[test]
+        fn continues_after_history() {
+            let input = test_source();
+
+            let (mut replay, mut source) = input.replayable(Duration::from_secs(3));
+
+            source.by_ref().take(3).count();
+            let yielded: Vec<Sample> = replay.by_ref().take(3).collect();
+            assert_eq!(&yielded, &SAMPLES[0..3],);
+
+            source.count();
+            let yielded: Vec<Sample> = replay.collect();
+            assert_eq!(&yielded, &SAMPLES[3..5],);
+        }
+
+        #[test]
+        fn keeps_only_latest() {
+            let input = test_source();
+
+            let (mut replay, mut source) = input.replayable(Duration::from_secs(2));
+
+            source.by_ref().take(5).count(); // get all items but do not end the source
+            let yielded: Vec<Sample> = replay.by_ref().take(2).collect();
+            // Note we do not get the last element, it has not been send yet
+            // due to buffering.
+            assert_eq!(&yielded, &SAMPLES[2..4]);
+
+            source.count(); // exhaust source
+            let yielded: Vec<Sample> = replay.collect();
+            assert_eq!(&yielded, &[SAMPLES[4]]);
+        }
+
+        #[test]
+        fn keeps_correct_amount_of_seconds() {
+            let input = StaticSamplesBuffer::new(nz!(16_000), nz!(1), &[0.0; 40_000]);
+
+            let (replay, mut source) = input.replayable(Duration::from_secs(2));
+
+            source.by_ref().count();
+            let n_yielded = replay.count();
+            assert_eq!(
+                n_yielded as u32,
+                source.sample_rate().get() * source.channels().get() as u32 * 2
+            );
+        }
+    }
 }

crates/call/src/call_impl/room.rs 🔗

@@ -1322,8 +1322,16 @@ impl Room {
             return Task::ready(Err(anyhow!("live-kit was not initialized")));
         };
 
+        let user_name = self
+            .user_store
+            .read(cx)
+            .current_user()
+            .map(|user| user.name.clone())
+            .flatten()
+            .unwrap_or_else(|| "unknown".to_string());
+
         cx.spawn(async move |this, cx| {
-            let publication = room.publish_local_microphone_track(cx).await;
+            let publication = room.publish_local_microphone_track(&user_name, cx).await;
             this.update(cx, |this, cx| {
                 let live_kit = this
                     .live_kit

crates/livekit_client/Cargo.toml 🔗

@@ -42,7 +42,6 @@ util.workspace = true
 workspace-hack.workspace = true
 
 rodio = { workspace = true, features = ["wav_output", "recording"] }
-dasp_sample = "0.11"
 
 [target.'cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))'.dependencies]
 libwebrtc = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks" }

crates/livekit_client/examples/test_app.rs 🔗

@@ -255,7 +255,10 @@ impl LivekitWindow {
         } else {
             let room = self.room.clone();
             cx.spawn_in(window, async move |this, cx| {
-                let (publication, stream) = room.publish_local_microphone_track(cx).await.unwrap();
+                let (publication, stream) = room
+                    .publish_local_microphone_track("test_user", cx)
+                    .await
+                    .unwrap();
                 this.update(cx, |this, cx| {
                     this.microphone_track = Some(publication);
                     this.microphone_stream = Some(stream);

crates/livekit_client/src/livekit_client.rs 🔗

@@ -97,9 +97,12 @@ impl Room {
 
     pub async fn publish_local_microphone_track(
         &self,
+        user_name: &str,
         cx: &mut AsyncApp,
     ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
-        let (track, stream) = self.playback.capture_local_microphone_track(&cx)?;
+        let (track, stream) = self
+            .playback
+            .capture_local_microphone_track(user_name, &cx)?;
         let publication = self
             .local_participant()
             .publish_track(

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -1,9 +1,7 @@
 use anyhow::{Context as _, Result};
 
 use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
-use cpal::Sample;
 use cpal::traits::{DeviceTrait, StreamTrait as _};
-use dasp_sample::ToSample;
 use futures::channel::mpsc::UnboundedSender;
 use futures::{Stream, StreamExt as _};
 use gpui::{
@@ -24,16 +22,13 @@ use livekit::webrtc::{
 use log::info;
 use parking_lot::Mutex;
 use rodio::Source;
-use rodio::source::{LimitSettings, UniformSourceIterator};
 use settings::Settings;
 use std::cell::RefCell;
-use std::num::NonZero;
 use std::sync::Weak;
 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
-use std::sync::mpsc::{TryRecvError, channel};
 use std::time::Duration;
 use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
-use util::{ResultExt as _, debug_panic, maybe};
+use util::{ResultExt as _, maybe};
 
 mod source;
 
@@ -51,14 +46,16 @@ pub(crate) fn play_remote_audio_track(
 ) -> Result<AudioStream> {
     let stop_handle = Arc::new(AtomicBool::new(false));
     let stop_handle_clone = stop_handle.clone();
-    let stream = source::LiveKitStream::new(cx.background_executor(), track)
+    let stream = source::LiveKitStream::new(cx.background_executor(), track);
+
+    let stream = stream
         .stoppable()
         .periodic_access(Duration::from_millis(50), move |s| {
             if stop_handle.load(Ordering::Relaxed) {
                 s.stop();
             }
         });
-    audio::Audio::play_source(stream, cx).context("Could not play audio")?;
+    audio::Audio::play_voip_stream(stream, track.name(), cx).context("Could not play audio")?;
 
     let on_drop = util::defer(move || {
         stop_handle_clone.store(true, Ordering::Relaxed);
@@ -144,6 +141,7 @@ impl AudioStack {
 
     pub(crate) fn capture_local_microphone_track(
         &self,
+        user_name: &str,
         cx: &AsyncApp,
     ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
         let source = NativeAudioSource::new(
@@ -155,7 +153,7 @@ impl AudioStack {
         );
 
         let track = track::LocalAudioTrack::create_audio_track(
-            "microphone",
+            user_name,
             RtcAudioSource::Native(source.clone()),
         );
 
@@ -172,13 +170,10 @@ impl AudioStack {
         let rodio_pipeline =
             AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
         let capture_task = if rodio_pipeline {
-            let apm = cx
-                .try_read_global::<audio::Audio, _>(|audio, _| Arc::clone(&audio.echo_canceller))
-                .unwrap(); // TODO fixme
-            self.executor.spawn(async move {
-                info!("Using experimental.rodio_audio audio pipeline");
-                Self::capture_input_rodio(apm, frame_tx).await
-            })
+            // TODO global might not yet have been initialized
+            info!("Using experimental.rodio_audio audio pipeline");
+            audio::Audio::open_microphone(cx.clone(), frame_tx)?;
+            Task::ready(Ok(()))
         } else {
             self.executor.spawn(async move {
                 Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
@@ -270,84 +265,6 @@ impl AudioStack {
         }
     }
 
-    async fn capture_input_rodio(
-        apm: Arc<Mutex<apm::AudioProcessingModule>>,
-        frame_tx: UnboundedSender<AudioFrame<'static>>,
-    ) -> Result<()> {
-        use audio::RodioExt;
-        const LIVEKIT_BUFFER_SIZE: usize =
-            (audio::SAMPLE_RATE.get() as usize / 100) * audio::CHANNEL_COUNT.get() as usize;
-
-        let (stream_error_tx, stream_error_rx) = channel();
-
-        thread::spawn(move || {
-            let stream = rodio::microphone::MicrophoneBuilder::new()
-                .default_device()?
-                .default_config()?
-                .prefer_sample_rates([
-                    SAMPLE_RATE,
-                    SAMPLE_RATE.saturating_mul(NonZero::new(2).expect("not zero")),
-                ])
-                .prefer_channel_counts([
-                    NonZero::new(1).expect("not zero"),
-                    NonZero::new(2).expect("not zero"),
-                ])
-                .prefer_buffer_sizes(512..)
-                .open_stream()?;
-            info!("Opened microphone: {:?}", stream.config());
-            let mut stream =
-                UniformSourceIterator::new(stream, audio::CHANNEL_COUNT, audio::SAMPLE_RATE)
-                    .limit(LimitSettings::live_performance())
-                    .process_buffer::<LIVEKIT_BUFFER_SIZE, _>(|buffer| {
-                        let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
-                        if let Err(e) = apm
-                            .lock()
-                            .process_stream(
-                                &mut int_buffer,
-                                audio::SAMPLE_RATE.get() as i32,
-                                audio::CHANNEL_COUNT.get() as i32,
-                            )
-                            .context("livekit audio processor error")
-                        {
-                            let _ = stream_error_tx.send(e);
-                        } else {
-                            for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
-                                *sample = (*processed).to_sample_();
-                            }
-                        }
-                    })
-                    .automatic_gain_control(1.0, 4.0, 0.0, 5.0);
-
-            loop {
-                let sampled: Vec<_> = stream
-                    .by_ref()
-                    .take(LIVEKIT_BUFFER_SIZE)
-                    .map(|s| s.to_sample())
-                    .collect();
-
-                match stream_error_rx.try_recv() {
-                    Ok(apm_error) => return Err::<(), _>(apm_error),
-                    Err(TryRecvError::Disconnected) => {
-                        debug_panic!("Stream should end on its own without sending an error")
-                    }
-                    Err(TryRecvError::Empty) => (),
-                }
-
-                frame_tx
-                    .unbounded_send(AudioFrame {
-                        sample_rate: SAMPLE_RATE.get(),
-                        num_channels: audio::CHANNEL_COUNT.get() as u32,
-                        samples_per_channel: sampled.len() as u32
-                            / audio::CHANNEL_COUNT.get() as u32,
-                        data: Cow::Owned(sampled),
-                    })
-                    .context("Failed to send audio frame")?
-            }
-        });
-
-        Ok(())
-    }
-
     async fn capture_input(
         apm: Arc<Mutex<apm::AudioProcessingModule>>,
         frame_tx: UnboundedSender<AudioFrame<'static>>,

crates/livekit_client/src/livekit_client/playback/source.rs 🔗

@@ -3,16 +3,18 @@ use std::num::NonZero;
 use futures::StreamExt;
 use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
 use livekit::track::RemoteAudioTrack;
-use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter};
+use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, nz};
 
-use crate::livekit_client::playback::{CHANNEL_COUNT, SAMPLE_RATE};
+use audio::{CHANNEL_COUNT, SAMPLE_RATE};
 
 fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
     let samples = frame.data.iter().copied();
     let samples = SampleTypeConverter::<_, _>::new(samples);
     let samples: Vec<f32> = samples.collect();
     SamplesBuffer::new(
-        NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"),
+        // here be dragons
+        // NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"),
+        nz!(2),
         NonZero::new(frame.sample_rate).expect("audio frame sample rate is nonzero"),
         samples,
     )
@@ -63,11 +65,17 @@ impl Source for LiveKitStream {
     }
 
     fn channels(&self) -> rodio::ChannelCount {
-        self.inner.channels()
+        // This must be hardcoded because the playback source assumes constant
+        // sample rate and channel count. The queue upon which this is build
+        // will however report different counts and rates. Even though we put in
+        // only items with our (constant) CHANNEL_COUNT & SAMPLE_RATE this will
+        // play silence on one channel and at 44100 which is not what our
+        // constants are.
+        CHANNEL_COUNT
     }
 
     fn sample_rate(&self) -> rodio::SampleRate {
-        self.inner.sample_rate()
+        SAMPLE_RATE // see comment on channels
     }
 
     fn total_duration(&self) -> Option<std::time::Duration> {

crates/livekit_client/src/record.rs 🔗

@@ -89,7 +89,7 @@ fn write_out(
         NonZero::new(config.sample_rate().0).expect("config sample_rate is never zero"),
         samples,
     );
-    match rodio::output_to_wav(&mut samples, path) {
+    match rodio::wav_to_file(&mut samples, path) {
         Ok(_) => Ok(()),
         Err(e) => Err(anyhow::anyhow!("Failed to write wav file: {}", e)),
     }

crates/livekit_client/src/test.rs 🔗

@@ -728,6 +728,7 @@ impl Room {
 
     pub async fn publish_local_microphone_track(
         &self,
+        _track_name: &str,
         cx: &mut AsyncApp,
     ) -> Result<(LocalTrackPublication, AudioStream)> {
         self.local_participant().publish_microphone_track(cx).await

crates/zed/src/zed.rs 🔗

@@ -13,6 +13,7 @@ use agent_ui::{AgentDiffToolbar, AgentPanelDelegate};
 use anyhow::Context as _;
 pub use app_menus::*;
 use assets::Assets;
+use audio::{AudioSettings, REPLAY_DURATION};
 use breadcrumbs::Breadcrumbs;
 use client::zed_urls;
 use collections::VecDeque;
@@ -128,7 +129,10 @@ actions!(
     dev,
     [
         /// Record 10s of audio from your current microphone
-        CaptureAudio
+        CaptureAudio,
+        /// Stores last 30s of audio from everyone on the current call
+        /// in a tar file in the current working directory.
+        CaptureRecentAudio,
     ]
 );
 
@@ -925,6 +929,9 @@ fn register_actions(
         })
         .register_action(|workspace, _: &CaptureAudio, window, cx| {
             capture_audio(workspace, window, cx);
+        })
+        .register_action(|workspace, _: &CaptureRecentAudio, window, cx| {
+            capture_recent_audio(workspace, window, cx);
         });
 
     if workspace.project().read(cx).is_via_remote_server() {
@@ -1937,6 +1944,85 @@ fn capture_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context<Wor
     });
 }
 
+// TODO dvdsk Move this and capture audio somewhere else?
+fn capture_recent_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context<Workspace>) {
+    struct CaptureRecentAudioNotification {
+        focus_handle: gpui::FocusHandle,
+        save_result: Option<Result<(PathBuf, Duration), anyhow::Error>>,
+        _save_task: Task<anyhow::Result<()>>,
+    }
+
+    impl gpui::EventEmitter<DismissEvent> for CaptureRecentAudioNotification {}
+    impl gpui::EventEmitter<SuppressEvent> for CaptureRecentAudioNotification {}
+    impl gpui::Focusable for CaptureRecentAudioNotification {
+        fn focus_handle(&self, _cx: &App) -> gpui::FocusHandle {
+            self.focus_handle.clone()
+        }
+    }
+    impl workspace::notifications::Notification for CaptureRecentAudioNotification {}
+
+    impl Render for CaptureRecentAudioNotification {
+        fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
+            let message = match &self.save_result {
+                None => format!(
+                    "Saving up to {} seconds of recent audio",
+                    REPLAY_DURATION.as_secs(),
+                ),
+                Some(Ok((path, duration))) => format!(
+                    "Saved {} seconds of all audio to {}",
+                    duration.as_secs(),
+                    path.display(),
+                ),
+                Some(Err(e)) => format!("Error saving audio replays: {e:?}"),
+            };
+
+            NotificationFrame::new()
+                .with_title(Some("Saved Audio"))
+                .show_suppress_button(false)
+                .on_close(cx.listener(|_, _, _, cx| {
+                    cx.emit(DismissEvent);
+                }))
+                .with_content(message)
+        }
+    }
+
+    impl CaptureRecentAudioNotification {
+        fn new(cx: &mut Context<Self>) -> Self {
+            if AudioSettings::get_global(cx).rodio_audio {
+                let executor = cx.background_executor().clone();
+                let save_task = cx.default_global::<audio::Audio>().save_replays(executor);
+                let _save_task = cx.spawn(async move |this, cx| {
+                    let res = save_task.await;
+                    this.update(cx, |this, cx| {
+                        this.save_result = Some(res);
+                        cx.notify();
+                    })
+                });
+
+                Self {
+                    focus_handle: cx.focus_handle(),
+                    _save_task,
+                    save_result: None,
+                }
+            } else {
+                Self {
+                    focus_handle: cx.focus_handle(),
+                    _save_task: Task::ready(Ok(())),
+                    save_result: Some(Err(anyhow::anyhow!(
+                        "Capturing recent audio is only supported on the experimental rodio audio pipeline"
+                    ))),
+                }
+            }
+        }
+    }
+
+    workspace.show_notification(
+        NotificationId::unique::<CaptureRecentAudioNotification>(),
+        cx,
+        |cx| cx.new(CaptureRecentAudioNotification::new),
+    );
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;