Rodio audio (#37786)

David Kleingeld created

Adds input to the experimental rodio_audio pipeline.

Enable with:
```json
"audio": {
  "experimental.rodio_audio": true
}
```

Additionally enables automatic volume 
control for incoming audio:
```json
"audio": {
  "experimental.control_output_volume": true
}
```

Release Notes:

- N/A

Change summary

.config/hakari.toml                                         |   2 
Cargo.lock                                                  | 123 +
Cargo.toml                                                  |   3 
crates/audio/Cargo.toml                                     |  12 
crates/audio/src/audio.rs                                   | 193 ++
crates/audio/src/audio_settings.rs                          |  70 
crates/audio/src/replays.rs                                 |  77 
crates/audio/src/rodio_ext.rs                               | 593 +++++++
crates/call/Cargo.toml                                      |   1 
crates/call/src/call_impl/room.rs                           |  13 
crates/gpui/src/app/async_context.rs                        |  17 
crates/livekit_client/Cargo.toml                            |   9 
crates/livekit_client/examples/test_app.rs                  |   5 
crates/livekit_client/src/livekit_client.rs                 |   8 
crates/livekit_client/src/livekit_client/playback.rs        |  98 
crates/livekit_client/src/livekit_client/playback/source.rs |  31 
crates/livekit_client/src/record.rs                         |   9 
crates/livekit_client/src/test.rs                           |   2 
crates/zed/Cargo.toml                                       |   1 
crates/zed/src/zed.rs                                       | 132 
tooling/workspace-hack/Cargo.toml                           |  14 
21 files changed, 1,274 insertions(+), 139 deletions(-)

Detailed changes

.config/hakari.toml 🔗

@@ -26,7 +26,7 @@ third-party = [
     # build of remote_server should not include scap / its x11 dependency
     { name = "scap", git = "https://github.com/zed-industries/scap", rev = "808aa5c45b41e8f44729d02e38fd00a2fe2722e7" },
     # build of remote_server should not need to include on libalsa through rodio
-    { name = "rodio" },
+    { name = "rodio", git = "https://github.com/RustAudio/rodio", branch = "better_wav_output"},
 ]
 
 [final-excludes]

Cargo.lock 🔗

@@ -1383,12 +1383,18 @@ name = "audio"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-tar",
  "collections",
+ "crossbeam",
  "gpui",
+ "libwebrtc",
+ "log",
+ "parking_lot",
  "rodio",
  "schemars",
  "serde",
  "settings",
+ "smol",
  "util",
  "workspace-hack",
 ]
@@ -2609,6 +2615,7 @@ dependencies = [
  "audio",
  "client",
  "collections",
+ "feature_flags",
  "fs",
  "futures 0.3.31",
  "gpui",
@@ -4138,6 +4145,19 @@ dependencies = [
  "itertools 0.10.5",
 ]
 
+[[package]]
+name = "crossbeam"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-epoch",
+ "crossbeam-queue",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-channel"
 version = "0.5.15"
@@ -9659,6 +9679,7 @@ dependencies = [
  "scap",
  "serde",
  "serde_json",
+ "serde_urlencoded",
  "settings",
  "sha2",
  "simplelog",
@@ -13862,15 +13883,15 @@ dependencies = [
 [[package]]
 name = "rodio"
 version = "0.21.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e40ecf59e742e03336be6a3d53755e789fd05a059fa22dfa0ed624722319e183"
+source = "git+https://github.com/RustAudio/rodio?branch=better_wav_output#82514bd1f2c6cfd9a1a885019b26a8ffea75bc5c"
 dependencies = [
  "cpal",
  "dasp_sample",
  "hound",
  "num-rational",
+ "rtrb",
  "symphonia",
- "tracing",
+ "thiserror 2.0.12",
 ]
 
 [[package]]
@@ -13944,6 +13965,12 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "rtrb"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ad8388ea1a9e0ea807e442e8263a699e7edcb320ecbcd21b4fa8ff859acce3ba"
+
 [[package]]
 name = "rules_library"
 version = "0.1.0"
@@ -15908,12 +15935,53 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "815c942ae7ee74737bb00f965fa5b5a2ac2ce7b6c01c0cc169bbeaf7abd5f5a9"
 dependencies = [
  "lazy_static",
+ "symphonia-bundle-flac",
+ "symphonia-bundle-mp3",
+ "symphonia-codec-aac",
  "symphonia-codec-pcm",
+ "symphonia-codec-vorbis",
  "symphonia-core",
+ "symphonia-format-isomp4",
+ "symphonia-format-ogg",
  "symphonia-format-riff",
  "symphonia-metadata",
 ]
 
+[[package]]
+name = "symphonia-bundle-flac"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72e34f34298a7308d4397a6c7fbf5b84c5d491231ce3dd379707ba673ab3bd97"
+dependencies = [
+ "log",
+ "symphonia-core",
+ "symphonia-metadata",
+ "symphonia-utils-xiph",
+]
+
+[[package]]
+name = "symphonia-bundle-mp3"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c01c2aae70f0f1fb096b6f0ff112a930b1fb3626178fba3ae68b09dce71706d4"
+dependencies = [
+ "lazy_static",
+ "log",
+ "symphonia-core",
+ "symphonia-metadata",
+]
+
+[[package]]
+name = "symphonia-codec-aac"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cdbf25b545ad0d3ee3e891ea643ad115aff4ca92f6aec472086b957a58522f70"
+dependencies = [
+ "lazy_static",
+ "log",
+ "symphonia-core",
+]
+
 [[package]]
 name = "symphonia-codec-pcm"
 version = "0.5.4"
@@ -15924,6 +15992,17 @@ dependencies = [
  "symphonia-core",
 ]
 
+[[package]]
+name = "symphonia-codec-vorbis"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a98765fb46a0a6732b007f7e2870c2129b6f78d87db7987e6533c8f164a9f30"
+dependencies = [
+ "log",
+ "symphonia-core",
+ "symphonia-utils-xiph",
+]
+
 [[package]]
 name = "symphonia-core"
 version = "0.5.4"
@@ -15937,6 +16016,31 @@ dependencies = [
  "log",
 ]
 
+[[package]]
+name = "symphonia-format-isomp4"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abfdf178d697e50ce1e5d9b982ba1b94c47218e03ec35022d9f0e071a16dc844"
+dependencies = [
+ "encoding_rs",
+ "log",
+ "symphonia-core",
+ "symphonia-metadata",
+ "symphonia-utils-xiph",
+]
+
+[[package]]
+name = "symphonia-format-ogg"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ada3505789516bcf00fc1157c67729eded428b455c27ca370e41f4d785bfa931"
+dependencies = [
+ "log",
+ "symphonia-core",
+ "symphonia-metadata",
+ "symphonia-utils-xiph",
+]
+
 [[package]]
 name = "symphonia-format-riff"
 version = "0.5.4"
@@ -15961,6 +16065,16 @@ dependencies = [
  "symphonia-core",
 ]
 
+[[package]]
+name = "symphonia-utils-xiph"
+version = "0.5.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "484472580fa49991afda5f6550ece662237b00c6f562c7d9638d1b086ed010fe"
+dependencies = [
+ "symphonia-core",
+ "symphonia-metadata",
+]
+
 [[package]]
 name = "syn"
 version = "1.0.109"
@@ -19850,6 +19964,7 @@ dependencies = [
  "core-foundation-sys",
  "cranelift-codegen",
  "crc32fast",
+ "crossbeam-channel",
  "crossbeam-epoch",
  "crossbeam-utils",
  "crypto-common",
@@ -19893,6 +20008,7 @@ dependencies = [
  "libsqlite3-sys",
  "linux-raw-sys 0.4.15",
  "linux-raw-sys 0.9.4",
+ "livekit-runtime",
  "log",
  "lyon",
  "lyon_path",
@@ -20447,7 +20563,6 @@ dependencies = [
  "languages",
  "libc",
  "line_ending_selector",
- "livekit_client",
  "log",
  "markdown",
  "markdown_preview",

Cargo.toml 🔗

@@ -277,6 +277,7 @@ context_server = { path = "crates/context_server" }
 copilot = { path = "crates/copilot" }
 crashes = { path = "crates/crashes" }
 credentials_provider = { path = "crates/credentials_provider" }
+crossbeam = "0.8.4"
 dap = { path = "crates/dap" }
 dap_adapters = { path = "crates/dap_adapters" }
 db = { path = "crates/db" }
@@ -369,7 +370,7 @@ remote_server = { path = "crates/remote_server" }
 repl = { path = "crates/repl" }
 reqwest_client = { path = "crates/reqwest_client" }
 rich_text = { path = "crates/rich_text" }
-rodio = { version = "0.21.1", default-features = false }
+rodio = { git = "https://github.com/RustAudio/rodio", branch = "better_wav_output"}
 rope = { path = "crates/rope" }
 rpc = { path = "crates/rpc" }
 rules_library = { path = "crates/rules_library" }

crates/audio/Cargo.toml 🔗

@@ -14,11 +14,19 @@ doctest = false
 
 [dependencies]
 anyhow.workspace = true
+async-tar.workspace = true
 collections.workspace = true
+crossbeam.workspace = true
 gpui.workspace = true
-settings.workspace = true
+log.workspace = true
+parking_lot.workspace = true
+rodio = { workspace = true, features = [ "wav", "playback", "wav_output" ] }
 schemars.workspace = true
 serde.workspace = true
-rodio = { workspace = true, features = [ "wav", "playback", "tracing" ] }
+settings.workspace = true
+smol.workspace = true
 util.workspace = true
 workspace-hack.workspace = true
+
+[target.'cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))'.dependencies]
+libwebrtc = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks" }

crates/audio/src/audio.rs 🔗

@@ -1,19 +1,54 @@
-use anyhow::{Context as _, Result, anyhow};
+use anyhow::{Context as _, Result};
 use collections::HashMap;
-use gpui::{App, BorrowAppContext, Global};
-use rodio::{Decoder, OutputStream, OutputStreamBuilder, Source, source::Buffered};
+use gpui::{App, AsyncApp, BackgroundExecutor, BorrowAppContext, Global};
+use libwebrtc::native::apm;
+use log::info;
+use parking_lot::Mutex;
+use rodio::{
+    Decoder, OutputStream, OutputStreamBuilder, Source,
+    cpal::Sample,
+    mixer::Mixer,
+    nz,
+    source::{Buffered, LimitSettings, UniformSourceIterator},
+};
 use settings::Settings;
-use std::io::Cursor;
+use std::{
+    io::Cursor,
+    num::NonZero,
+    path::PathBuf,
+    sync::{Arc, atomic::Ordering},
+    time::Duration,
+};
 use util::ResultExt;
 
 mod audio_settings;
+mod replays;
+mod rodio_ext;
 pub use audio_settings::AudioSettings;
+pub use rodio_ext::RodioExt;
+
+use crate::audio_settings::LIVE_SETTINGS;
+
+// NOTE: We used to use WebRTC's mixer which only supported
+// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
+// for audio output devices like speakers/bluetooth, we just hard-code
+// this; and downsample when we need to.
+//
+// Since most noise cancelling requires 16kHz we will move to
+// that in the future.
+pub const SAMPLE_RATE: NonZero<u32> = nz!(48000);
+pub const CHANNEL_COUNT: NonZero<u16> = nz!(2);
+pub const BUFFER_SIZE: usize = // echo canceller and livekit want 10ms of audio
+    (SAMPLE_RATE.get() as usize / 100) * CHANNEL_COUNT.get() as usize;
+
+pub const REPLAY_DURATION: Duration = Duration::from_secs(30);
 
 pub fn init(cx: &mut App) {
     AudioSettings::register(cx);
+    LIVE_SETTINGS.initialize(cx);
 }
 
-#[derive(Copy, Clone, Eq, Hash, PartialEq)]
+#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
 pub enum Sound {
     Joined,
     Leave,
@@ -38,32 +73,138 @@ impl Sound {
     }
 }
 
-#[derive(Default)]
 pub struct Audio {
     output_handle: Option<OutputStream>,
+    output_mixer: Option<Mixer>,
+    pub echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
     source_cache: HashMap<Sound, Buffered<Decoder<Cursor<Vec<u8>>>>>,
+    replays: replays::Replays,
+}
+
+impl Default for Audio {
+    fn default() -> Self {
+        Self {
+            output_handle: Default::default(),
+            output_mixer: Default::default(),
+            echo_canceller: Arc::new(Mutex::new(apm::AudioProcessingModule::new(
+                true, false, false, false,
+            ))),
+            source_cache: Default::default(),
+            replays: Default::default(),
+        }
+    }
 }
 
 impl Global for Audio {}
 
 impl Audio {
-    fn ensure_output_exists(&mut self) -> Option<&OutputStream> {
+    fn ensure_output_exists(&mut self) -> Result<&Mixer> {
         if self.output_handle.is_none() {
-            self.output_handle = OutputStreamBuilder::open_default_stream().log_err();
+            self.output_handle = Some(
+                OutputStreamBuilder::open_default_stream()
+                    .context("Could not open default output stream")?,
+            );
+            if let Some(output_handle) = &self.output_handle {
+                let (mixer, source) = rodio::mixer::mixer(CHANNEL_COUNT, SAMPLE_RATE);
+                // or the mixer will end immediately as its empty.
+                mixer.add(rodio::source::Zero::new(CHANNEL_COUNT, SAMPLE_RATE));
+                self.output_mixer = Some(mixer);
+
+                let echo_canceller = Arc::clone(&self.echo_canceller);
+                let source = source.inspect_buffer::<BUFFER_SIZE, _>(move |buffer| {
+                    let mut buf: [i16; _] = buffer.map(|s| s.to_sample());
+                    echo_canceller
+                        .lock()
+                        .process_reverse_stream(
+                            &mut buf,
+                            SAMPLE_RATE.get() as i32,
+                            CHANNEL_COUNT.get().into(),
+                        )
+                        .expect("Audio input and output threads should not panic");
+                });
+                output_handle.mixer().add(source);
+            }
         }
 
-        self.output_handle.as_ref()
+        Ok(self
+            .output_mixer
+            .as_ref()
+            .expect("we only get here if opening the outputstream succeeded"))
+    }
+
+    pub fn save_replays(
+        &self,
+        executor: BackgroundExecutor,
+    ) -> gpui::Task<anyhow::Result<(PathBuf, Duration)>> {
+        self.replays.replays_to_tar(executor)
+    }
+
+    pub fn open_microphone(voip_parts: VoipParts) -> anyhow::Result<impl Source> {
+        let stream = rodio::microphone::MicrophoneBuilder::new()
+            .default_device()?
+            .default_config()?
+            .prefer_sample_rates([SAMPLE_RATE, SAMPLE_RATE.saturating_mul(nz!(2))])
+            .prefer_channel_counts([nz!(1), nz!(2)])
+            .prefer_buffer_sizes(512..)
+            .open_stream()?;
+        info!("Opened microphone: {:?}", stream.config());
+
+        let (replay, stream) = UniformSourceIterator::new(stream, CHANNEL_COUNT, SAMPLE_RATE)
+            .limit(LimitSettings::live_performance())
+            .process_buffer::<BUFFER_SIZE, _>(move |buffer| {
+                let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
+                if voip_parts
+                    .echo_canceller
+                    .lock()
+                    .process_stream(
+                        &mut int_buffer,
+                        SAMPLE_RATE.get() as i32,
+                        CHANNEL_COUNT.get() as i32,
+                    )
+                    .context("livekit audio processor error")
+                    .log_err()
+                    .is_some()
+                {
+                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
+                        *sample = (*processed).to_sample();
+                    }
+                }
+            })
+            .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
+            .periodic_access(Duration::from_millis(100), move |agc_source| {
+                agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
+            })
+            .replayable(REPLAY_DURATION)
+            .expect("REPLAY_DURATION is longer then 100ms");
+
+        voip_parts
+            .replays
+            .add_voip_stream("local microphone".to_string(), replay);
+        Ok(stream)
     }
 
-    pub fn play_source(
+    pub fn play_voip_stream(
         source: impl rodio::Source + Send + 'static,
+        speaker_name: String,
+        is_staff: bool,
         cx: &mut App,
     ) -> anyhow::Result<()> {
+        let (replay_source, source) = source
+            .automatic_gain_control(1.0, 4.0, 0.0, 5.0)
+            .periodic_access(Duration::from_millis(100), move |agc_source| {
+                agc_source.set_enabled(LIVE_SETTINGS.control_input_volume.load(Ordering::Relaxed));
+            })
+            .replayable(REPLAY_DURATION)
+            .expect("REPLAY_DURATION is longer then 100ms");
+
         cx.update_default_global(|this: &mut Self, _cx| {
-            let output_handle = this
+            let output_mixer = this
                 .ensure_output_exists()
-                .ok_or_else(|| anyhow!("Could not open audio output"))?;
-            output_handle.mixer().add(source);
+                .context("Could not get output mixer")?;
+            output_mixer.add(source);
+            if is_staff {
+                this.replays.add_voip_stream(speaker_name, replay_source);
+            }
             Ok(())
         })
     }
@@ -71,8 +212,12 @@ impl Audio {
     pub fn play_sound(sound: Sound, cx: &mut App) {
         cx.update_default_global(|this: &mut Self, cx| {
             let source = this.sound_source(sound, cx).log_err()?;
-            let output_handle = this.ensure_output_exists()?;
-            output_handle.mixer().add(source);
+            let output_mixer = this
+                .ensure_output_exists()
+                .context("Could not get output mixer")
+                .log_err()?;
+
+            output_mixer.add(source);
             Some(())
         });
     }
@@ -103,3 +248,21 @@ impl Audio {
         Ok(source)
     }
 }
+
+pub struct VoipParts {
+    echo_canceller: Arc<Mutex<apm::AudioProcessingModule>>,
+    replays: replays::Replays,
+}
+
+impl VoipParts {
+    pub fn new(cx: &AsyncApp) -> anyhow::Result<Self> {
+        let (apm, replays) = cx.try_read_default_global::<Audio, _>(|audio, _| {
+            (Arc::clone(&audio.echo_canceller), audio.replays.clone())
+        })?;
+
+        Ok(Self {
+            echo_canceller: apm,
+            replays,
+        })
+    }
+}

crates/audio/src/audio_settings.rs 🔗

@@ -1,14 +1,29 @@
+use std::sync::atomic::{AtomicBool, Ordering};
+
 use anyhow::Result;
 use gpui::App;
 use schemars::JsonSchema;
 use serde::{Deserialize, Serialize};
-use settings::{Settings, SettingsKey, SettingsSources, SettingsUi};
+use settings::{Settings, SettingsKey, SettingsSources, SettingsStore, SettingsUi};
 
 #[derive(Clone, Default, Serialize, Deserialize, JsonSchema, Debug, SettingsUi)]
 pub struct AudioSettings {
     /// Opt into the new audio system.
     #[serde(rename = "experimental.rodio_audio", default)]
     pub rodio_audio: bool, // default is false
+    /// Requires 'rodio_audio: true'
+    ///
+    /// Use the new audio systems automatic gain control for your microphone.
+    /// This affects how loud you sound to others.
+    #[serde(rename = "experimental.control_input_volume", default)]
+    pub control_input_volume: bool,
+    /// Requires 'rodio_audio: true'
+    ///
+    /// Use the new audio systems automatic gain control on everyone in the
+    /// call. This makes call members who are too quite louder and those who are
+    /// too loud quieter. This only affects how things sound for you.
+    #[serde(rename = "experimental.control_output_volume", default)]
+    pub control_output_volume: bool,
 }
 
 /// Configuration of audio in Zed.
@@ -16,9 +31,22 @@ pub struct AudioSettings {
 #[serde(default)]
 #[settings_key(key = "audio")]
 pub struct AudioSettingsContent {
-    /// Whether to use the experimental audio system
+    /// Opt into the new audio system.
     #[serde(rename = "experimental.rodio_audio", default)]
-    pub rodio_audio: bool,
+    pub rodio_audio: bool, // default is false
+    /// Requires 'rodio_audio: true'
+    ///
+    /// Use the new audio systems automatic gain control for your microphone.
+    /// This affects how loud you sound to others.
+    #[serde(rename = "experimental.control_input_volume", default)]
+    pub control_input_volume: bool,
+    /// Requires 'rodio_audio: true'
+    ///
+    /// Use the new audio systems automatic gain control on everyone in the
+    /// call. This makes call members who are too quite louder and those who are
+    /// too loud quieter. This only affects how things sound for you.
+    #[serde(rename = "experimental.control_output_volume", default)]
+    pub control_output_volume: bool,
 }
 
 impl Settings for AudioSettings {
@@ -30,3 +58,39 @@ impl Settings for AudioSettings {
 
     fn import_from_vscode(_vscode: &settings::VsCodeSettings, _current: &mut Self::FileContent) {}
 }
+
+pub(crate) struct LiveSettings {
+    pub(crate) control_input_volume: AtomicBool,
+    pub(crate) control_output_volume: AtomicBool,
+}
+
+impl LiveSettings {
+    pub(crate) fn initialize(&self, cx: &mut App) {
+        cx.observe_global::<SettingsStore>(move |cx| {
+            LIVE_SETTINGS.control_input_volume.store(
+                AudioSettings::get_global(cx).control_input_volume,
+                Ordering::Relaxed,
+            );
+            LIVE_SETTINGS.control_output_volume.store(
+                AudioSettings::get_global(cx).control_output_volume,
+                Ordering::Relaxed,
+            );
+        })
+        .detach();
+
+        let init_settings = AudioSettings::get_global(cx);
+        LIVE_SETTINGS
+            .control_input_volume
+            .store(init_settings.control_input_volume, Ordering::Relaxed);
+        LIVE_SETTINGS
+            .control_output_volume
+            .store(init_settings.control_output_volume, Ordering::Relaxed);
+    }
+}
+
+/// Allows access to settings from the audio thread. Updated by
+/// observer of SettingsStore.
+pub(crate) static LIVE_SETTINGS: LiveSettings = LiveSettings {
+    control_input_volume: AtomicBool::new(true),
+    control_output_volume: AtomicBool::new(true),
+};

crates/audio/src/replays.rs 🔗

@@ -0,0 +1,77 @@
+use anyhow::{Context, anyhow};
+use async_tar::{Builder, Header};
+use gpui::{BackgroundExecutor, Task};
+
+use collections::HashMap;
+use parking_lot::Mutex;
+use rodio::Source;
+use smol::fs::File;
+use std::{io, path::PathBuf, sync::Arc, time::Duration};
+
+use crate::{REPLAY_DURATION, rodio_ext::Replay};
+
+#[derive(Default, Clone)]
+pub(crate) struct Replays(Arc<Mutex<HashMap<String, Replay>>>);
+
+impl Replays {
+    pub(crate) fn add_voip_stream(&self, stream_name: String, source: Replay) {
+        let mut map = self.0.lock();
+        map.retain(|_, replay| replay.source_is_active());
+        map.insert(stream_name, source);
+    }
+
+    pub(crate) fn replays_to_tar(
+        &self,
+        executor: BackgroundExecutor,
+    ) -> Task<anyhow::Result<(PathBuf, Duration)>> {
+        let map = Arc::clone(&self.0);
+        executor.spawn(async move {
+            let recordings: Vec<_> = map
+                .lock()
+                .iter_mut()
+                .map(|(name, replay)| {
+                    let queued = REPLAY_DURATION.min(replay.duration_ready());
+                    (name.clone(), replay.take_duration(queued).record())
+                })
+                .collect();
+            let longest = recordings
+                .iter()
+                .map(|(_, r)| {
+                    r.total_duration()
+                        .expect("SamplesBuffer always returns a total duration")
+                })
+                .max()
+                .ok_or(anyhow!("There is no audio to capture"))?;
+
+            let path = std::env::current_dir()
+                .context("Could not get current dir")?
+                .join("replays.tar");
+            let tar = File::create(&path)
+                .await
+                .context("Could not create file for tar")?;
+
+            let mut tar = Builder::new(tar);
+
+            for (name, recording) in recordings {
+                let mut writer = io::Cursor::new(Vec::new());
+                rodio::wav_to_writer(recording, &mut writer).context("failed to encode wav")?;
+                let wav_data = writer.into_inner();
+                let path = name.replace(' ', "_") + ".wav";
+                let mut header = Header::new_gnu();
+                // rw permissions for everyone
+                header.set_mode(0o666);
+                header.set_size(wav_data.len() as u64);
+                tar.append_data(&mut header, path, wav_data.as_slice())
+                    .await
+                    .context("failed to apped wav to tar")?;
+            }
+            tar.into_inner()
+                .await
+                .context("Could not finish writing tar")?
+                .sync_all()
+                .await
+                .context("Could not flush tar file to disk")?;
+            Ok((path, longest))
+        })
+    }
+}

crates/audio/src/rodio_ext.rs 🔗

@@ -0,0 +1,593 @@
+use std::{
+    sync::{
+        Arc, Mutex,
+        atomic::{AtomicBool, Ordering},
+    },
+    time::Duration,
+};
+
+use crossbeam::queue::ArrayQueue;
+use rodio::{ChannelCount, Sample, SampleRate, Source};
+
+#[derive(Debug)]
+pub struct ReplayDurationTooShort;
+
+pub trait RodioExt: Source + Sized {
+    fn process_buffer<const N: usize, F>(self, callback: F) -> ProcessBuffer<N, Self, F>
+    where
+        F: FnMut(&mut [Sample; N]);
+    fn inspect_buffer<const N: usize, F>(self, callback: F) -> InspectBuffer<N, Self, F>
+    where
+        F: FnMut(&[Sample; N]);
+    fn replayable(
+        self,
+        duration: Duration,
+    ) -> Result<(Replay, Replayable<Self>), ReplayDurationTooShort>;
+    fn take_samples(self, n: usize) -> TakeSamples<Self>;
+}
+
+impl<S: Source> RodioExt for S {
+    fn process_buffer<const N: usize, F>(self, callback: F) -> ProcessBuffer<N, Self, F>
+    where
+        F: FnMut(&mut [Sample; N]),
+    {
+        ProcessBuffer {
+            inner: self,
+            callback,
+            buffer: [0.0; N],
+            next: N,
+        }
+    }
+    fn inspect_buffer<const N: usize, F>(self, callback: F) -> InspectBuffer<N, Self, F>
+    where
+        F: FnMut(&[Sample; N]),
+    {
+        InspectBuffer {
+            inner: self,
+            callback,
+            buffer: [0.0; N],
+            free: 0,
+        }
+    }
+    /// Maintains a live replay with a history of at least `duration` seconds.
+    ///
+    /// Note:
+    /// History can be 100ms longer if the source drops before or while the
+    /// replay is being read
+    ///
+    /// # Errors
+    /// If duration is smaller then 100ms
+    fn replayable(
+        self,
+        duration: Duration,
+    ) -> Result<(Replay, Replayable<Self>), ReplayDurationTooShort> {
+        if duration < Duration::from_millis(100) {
+            return Err(ReplayDurationTooShort);
+        }
+
+        let samples_per_second = self.sample_rate().get() as usize * self.channels().get() as usize;
+        let samples_to_queue = duration.as_secs_f64() * samples_per_second as f64;
+        let samples_to_queue =
+            (samples_to_queue as usize).next_multiple_of(self.channels().get().into());
+
+        let chunk_size =
+            (samples_per_second.div_ceil(10)).next_multiple_of(self.channels().get() as usize);
+        let chunks_to_queue = samples_to_queue.div_ceil(chunk_size);
+
+        let is_active = Arc::new(AtomicBool::new(true));
+        let queue = Arc::new(ReplayQueue::new(chunks_to_queue, chunk_size));
+        Ok((
+            Replay {
+                rx: Arc::clone(&queue),
+                buffer: Vec::new().into_iter(),
+                sleep_duration: duration / 2,
+                sample_rate: self.sample_rate(),
+                channel_count: self.channels(),
+                source_is_active: is_active.clone(),
+            },
+            Replayable {
+                tx: queue,
+                inner: self,
+                buffer: Vec::with_capacity(chunk_size),
+                chunk_size,
+                is_active,
+            },
+        ))
+    }
+    fn take_samples(self, n: usize) -> TakeSamples<S> {
+        TakeSamples {
+            inner: self,
+            left_to_take: n,
+        }
+    }
+}
+
+pub struct TakeSamples<S> {
+    inner: S,
+    left_to_take: usize,
+}
+
+impl<S: Source> Iterator for TakeSamples<S> {
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.left_to_take == 0 {
+            None
+        } else {
+            self.left_to_take -= 1;
+            self.inner.next()
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, Some(self.left_to_take))
+    }
+}
+
+impl<S: Source> Source for TakeSamples<S> {
+    fn current_span_len(&self) -> Option<usize> {
+        None // does not support spans
+    }
+
+    fn channels(&self) -> ChannelCount {
+        self.inner.channels()
+    }
+
+    fn sample_rate(&self) -> SampleRate {
+        self.inner.sample_rate()
+    }
+
+    fn total_duration(&self) -> Option<Duration> {
+        Some(Duration::from_secs_f64(
+            self.left_to_take as f64
+                / self.sample_rate().get() as f64
+                / self.channels().get() as f64,
+        ))
+    }
+}
+
+#[derive(Debug)]
+struct ReplayQueue {
+    inner: ArrayQueue<Vec<Sample>>,
+    normal_chunk_len: usize,
+    /// The last chunk in the queue may be smaller then
+    /// the normal chunk size. This is always equal to the
+    /// size of the last element in the queue.
+    /// (so normally chunk_size)
+    last_chunk: Mutex<Vec<Sample>>,
+}
+
+impl ReplayQueue {
+    fn new(queue_len: usize, chunk_size: usize) -> Self {
+        Self {
+            inner: ArrayQueue::new(queue_len),
+            normal_chunk_len: chunk_size,
+            last_chunk: Mutex::new(Vec::new()),
+        }
+    }
+    /// Returns the length in samples
+    fn len(&self) -> usize {
+        self.inner.len().saturating_sub(1) * self.normal_chunk_len
+            + self
+                .last_chunk
+                .lock()
+                .expect("Self::push_last can not poison this lock")
+                .len()
+    }
+
+    fn pop(&self) -> Option<Vec<Sample>> {
+        self.inner.pop() // removes element that was inserted first
+    }
+
+    fn push_last(&self, mut samples: Vec<Sample>) {
+        let mut last_chunk = self
+            .last_chunk
+            .lock()
+            .expect("Self::len can not poison this lock");
+        std::mem::swap(&mut *last_chunk, &mut samples);
+    }
+
+    fn push_normal(&self, samples: Vec<Sample>) {
+        let _pushed_out_of_ringbuf = self.inner.force_push(samples);
+    }
+}
+
+pub struct ProcessBuffer<const N: usize, S, F>
+where
+    S: Source + Sized,
+    F: FnMut(&mut [Sample; N]),
+{
+    inner: S,
+    callback: F,
+    /// Buffer used for both input and output.
+    buffer: [Sample; N],
+    /// Next already processed sample is at this index
+    /// in buffer.
+    ///
+    /// If this is equal to the length of the buffer we have no more samples and
+    /// we must get new ones and process them
+    next: usize,
+}
+
+impl<const N: usize, S, F> Iterator for ProcessBuffer<N, S, F>
+where
+    S: Source + Sized,
+    F: FnMut(&mut [Sample; N]),
+{
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.next += 1;
+        if self.next < self.buffer.len() {
+            let sample = self.buffer[self.next];
+            return Some(sample);
+        }
+
+        for sample in &mut self.buffer {
+            *sample = self.inner.next()?
+        }
+        (self.callback)(&mut self.buffer);
+
+        self.next = 0;
+        Some(self.buffer[0])
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+impl<const N: usize, S, F> Source for ProcessBuffer<N, S, F>
+where
+    S: Source + Sized,
+    F: FnMut(&mut [Sample; N]),
+{
+    fn current_span_len(&self) -> Option<usize> {
+        None
+    }
+
+    fn channels(&self) -> rodio::ChannelCount {
+        self.inner.channels()
+    }
+
+    fn sample_rate(&self) -> rodio::SampleRate {
+        self.inner.sample_rate()
+    }
+
+    fn total_duration(&self) -> Option<std::time::Duration> {
+        self.inner.total_duration()
+    }
+}
+
+pub struct InspectBuffer<const N: usize, S, F>
+where
+    S: Source + Sized,
+    F: FnMut(&[Sample; N]),
+{
+    inner: S,
+    callback: F,
+    /// Stores already emitted samples, once its full we call the callback.
+    buffer: [Sample; N],
+    /// Next free element in buffer. If this is equal to the buffer length
+    /// we have no more free lements.
+    free: usize,
+}
+
+impl<const N: usize, S, F> Iterator for InspectBuffer<N, S, F>
+where
+    S: Source + Sized,
+    F: FnMut(&[Sample; N]),
+{
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let Some(sample) = self.inner.next() else {
+            return None;
+        };
+
+        self.buffer[self.free] = sample;
+        self.free += 1;
+
+        if self.free == self.buffer.len() {
+            (self.callback)(&self.buffer);
+            self.free = 0
+        }
+
+        Some(sample)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+impl<const N: usize, S, F> Source for InspectBuffer<N, S, F>
+where
+    S: Source + Sized,
+    F: FnMut(&[Sample; N]),
+{
+    fn current_span_len(&self) -> Option<usize> {
+        None
+    }
+
+    fn channels(&self) -> rodio::ChannelCount {
+        self.inner.channels()
+    }
+
+    fn sample_rate(&self) -> rodio::SampleRate {
+        self.inner.sample_rate()
+    }
+
+    fn total_duration(&self) -> Option<std::time::Duration> {
+        self.inner.total_duration()
+    }
+}
+
+#[derive(Debug)]
+pub struct Replayable<S: Source> {
+    inner: S,
+    buffer: Vec<Sample>,
+    chunk_size: usize,
+    tx: Arc<ReplayQueue>,
+    is_active: Arc<AtomicBool>,
+}
+
+impl<S: Source> Iterator for Replayable<S> {
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(sample) = self.inner.next() {
+            self.buffer.push(sample);
+            if self.buffer.len() == self.chunk_size {
+                self.tx.push_normal(std::mem::take(&mut self.buffer));
+            }
+            Some(sample)
+        } else {
+            let last_chunk = std::mem::take(&mut self.buffer);
+            self.tx.push_last(last_chunk);
+            self.is_active.store(false, Ordering::Relaxed);
+            None
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+impl<S: Source> Source for Replayable<S> {
+    fn current_span_len(&self) -> Option<usize> {
+        self.inner.current_span_len()
+    }
+
+    fn channels(&self) -> ChannelCount {
+        self.inner.channels()
+    }
+
+    fn sample_rate(&self) -> SampleRate {
+        self.inner.sample_rate()
+    }
+
+    fn total_duration(&self) -> Option<Duration> {
+        self.inner.total_duration()
+    }
+}
+
+#[derive(Debug)]
+pub struct Replay {
+    rx: Arc<ReplayQueue>,
+    buffer: std::vec::IntoIter<Sample>,
+    sleep_duration: Duration,
+    sample_rate: SampleRate,
+    channel_count: ChannelCount,
+    source_is_active: Arc<AtomicBool>,
+}
+
+impl Replay {
+    pub fn source_is_active(&self) -> bool {
+        // - source could return None and not drop
+        // - source could be dropped before returning None
+        self.source_is_active.load(Ordering::Relaxed) && Arc::strong_count(&self.rx) < 2
+    }
+
+    /// Duration of what is in the buffer and can be returned without blocking.
+    pub fn duration_ready(&self) -> Duration {
+        let samples_per_second = self.channels().get() as u32 * self.sample_rate().get();
+
+        let seconds_queued = self.samples_ready() as f64 / samples_per_second as f64;
+        Duration::from_secs_f64(seconds_queued)
+    }
+
+    /// Number of samples in the buffer and can be returned without blocking.
+    pub fn samples_ready(&self) -> usize {
+        self.rx.len() + self.buffer.len()
+    }
+}
+
+impl Iterator for Replay {
+    type Item = Sample;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(sample) = self.buffer.next() {
+            return Some(sample);
+        }
+
+        loop {
+            if let Some(new_buffer) = self.rx.pop() {
+                self.buffer = new_buffer.into_iter();
+                return self.buffer.next();
+            }
+
+            if !self.source_is_active() {
+                return None;
+            }
+
+            std::thread::sleep(self.sleep_duration);
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        ((self.rx.len() + self.buffer.len()), None)
+    }
+}
+
+impl Source for Replay {
+    fn current_span_len(&self) -> Option<usize> {
+        None // source is not compatible with spans
+    }
+
+    fn channels(&self) -> ChannelCount {
+        self.channel_count
+    }
+
+    fn sample_rate(&self) -> SampleRate {
+        self.sample_rate
+    }
+
+    fn total_duration(&self) -> Option<Duration> {
+        None
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use rodio::{nz, static_buffer::StaticSamplesBuffer};
+
+    use super::*;
+
+    const SAMPLES: [Sample; 5] = [0.0, 1.0, 2.0, 3.0, 4.0];
+
+    fn test_source() -> StaticSamplesBuffer {
+        StaticSamplesBuffer::new(nz!(1), nz!(1), &SAMPLES)
+    }
+
+    mod process_buffer {
+        use super::*;
+
+        #[test]
+        fn callback_gets_all_samples() {
+            let input = test_source();
+
+            let _ = input
+                .process_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES))
+                .count();
+        }
+        #[test]
+        fn callback_modifies_yielded() {
+            let input = test_source();
+
+            let yielded: Vec<_> = input
+                .process_buffer::<{ SAMPLES.len() }, _>(|buffer| {
+                    for sample in buffer {
+                        *sample += 1.0;
+                    }
+                })
+                .collect();
+            assert_eq!(
+                yielded,
+                SAMPLES.into_iter().map(|s| s + 1.0).collect::<Vec<_>>()
+            )
+        }
+        #[test]
+        fn source_truncates_to_whole_buffers() {
+            let input = test_source();
+
+            let yielded = input
+                .process_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3]))
+                .count();
+            assert_eq!(yielded, 3)
+        }
+    }
+
+    mod inspect_buffer {
+        use super::*;
+
+        #[test]
+        fn callback_gets_all_samples() {
+            let input = test_source();
+
+            let _ = input
+                .inspect_buffer::<{ SAMPLES.len() }, _>(|buffer| assert_eq!(*buffer, SAMPLES))
+                .count();
+        }
+        #[test]
+        fn source_does_not_truncate() {
+            let input = test_source();
+
+            let yielded = input
+                .inspect_buffer::<3, _>(|buffer| assert_eq!(buffer, &SAMPLES[..3]))
+                .count();
+            assert_eq!(yielded, SAMPLES.len())
+        }
+    }
+
+    mod instant_replay {
+        use super::*;
+
+        #[test]
+        fn continues_after_history() {
+            let input = test_source();
+
+            let (mut replay, mut source) = input
+                .replayable(Duration::from_secs(3))
+                .expect("longer then 100ms");
+
+            source.by_ref().take(3).count();
+            let yielded: Vec<Sample> = replay.by_ref().take(3).collect();
+            assert_eq!(&yielded, &SAMPLES[0..3],);
+
+            source.count();
+            let yielded: Vec<Sample> = replay.collect();
+            assert_eq!(&yielded, &SAMPLES[3..5],);
+        }
+
+        #[test]
+        fn keeps_only_latest() {
+            let input = test_source();
+
+            let (mut replay, mut source) = input
+                .replayable(Duration::from_secs(2))
+                .expect("longer then 100ms");
+
+            source.by_ref().take(5).count(); // get all items but do not end the source
+            let yielded: Vec<Sample> = replay.by_ref().take(2).collect();
+            assert_eq!(&yielded, &SAMPLES[3..5]);
+            source.count(); // exhaust source
+            assert_eq!(replay.next(), None);
+        }
+
+        #[test]
+        fn keeps_correct_amount_of_seconds() {
+            let input = StaticSamplesBuffer::new(nz!(1), nz!(16_000), &[0.0; 40_000]);
+
+            let (replay, mut source) = input
+                .replayable(Duration::from_secs(2))
+                .expect("longer then 100ms");
+
+            // exhaust but do not yet end source
+            source.by_ref().take(40_000).count();
+
+            // take all samples we can without blocking
+            let ready = replay.samples_ready();
+            let n_yielded = replay.take_samples(ready).count();
+
+            let max = source.sample_rate().get() * source.channels().get() as u32 * 2;
+            let margin = 16_000 / 10; // 100ms
+            assert!(n_yielded as u32 >= max - margin);
+        }
+
+        #[test]
+        fn samples_ready() {
+            let input = StaticSamplesBuffer::new(nz!(1), nz!(16_000), &[0.0; 40_000]);
+            let (mut replay, source) = input
+                .replayable(Duration::from_secs(2))
+                .expect("longer then 100ms");
+            assert_eq!(replay.by_ref().samples_ready(), 0);
+
+            source.take(8000).count(); // half a second
+            let margin = 16_000 / 10; // 100ms
+            let ready = replay.samples_ready();
+            assert!(ready >= 8000 - margin);
+        }
+    }
+}

crates/call/Cargo.toml 🔗

@@ -29,6 +29,7 @@ client.workspace = true
 collections.workspace = true
 fs.workspace = true
 futures.workspace = true
+feature_flags.workspace = true
 gpui = { workspace = true, features = ["screen-capture"] }
 language.workspace = true
 log.workspace = true

crates/call/src/call_impl/room.rs 🔗

@@ -9,6 +9,7 @@ use client::{
     proto::{self, PeerId},
 };
 use collections::{BTreeMap, HashMap, HashSet};
+use feature_flags::FeatureFlagAppExt;
 use fs::Fs;
 use futures::StreamExt;
 use gpui::{
@@ -1322,8 +1323,18 @@ impl Room {
             return Task::ready(Err(anyhow!("live-kit was not initialized")));
         };
 
+        let is_staff = cx.is_staff();
+        let user_name = self
+            .user_store
+            .read(cx)
+            .current_user()
+            .and_then(|user| user.name.clone())
+            .unwrap_or_else(|| "unknown".to_string());
+
         cx.spawn(async move |this, cx| {
-            let publication = room.publish_local_microphone_track(cx).await;
+            let publication = room
+                .publish_local_microphone_track(user_name, is_staff, cx)
+                .await;
             this.update(cx, |this, cx| {
                 let live_kit = this
                     .live_kit

crates/gpui/src/app/async_context.rs 🔗

@@ -218,6 +218,23 @@ impl AsyncApp {
         Some(read(app.try_global()?, &app))
     }
 
+    /// Reads the global state of the specified type, passing it to the given callback.
+    /// A default value is assigned if a global of this type has not yet been assigned.
+    ///
+    /// # Errors
+    /// If the app has ben dropped this returns an error.
+    pub fn try_read_default_global<G: Global + Default, R>(
+        &self,
+        read: impl FnOnce(&G, &App) -> R,
+    ) -> Result<R> {
+        let app = self.app.upgrade().context("app was released")?;
+        let mut app = app.borrow_mut();
+        app.update(|cx| {
+            cx.default_global::<G>();
+        });
+        Ok(read(app.try_global().context("app was released")?, &app))
+    }
+
     /// A convenience method for [`App::update_global`](BorrowAppContext::update_global)
     /// for updating the global state of the specified type.
     pub fn update_global<G: Global, R>(

crates/livekit_client/Cargo.toml 🔗

@@ -22,10 +22,10 @@ test-support = ["collections/test-support", "gpui/test-support"]
 [dependencies]
 anyhow.workspace = true
 async-trait.workspace = true
+audio.workspace = true
 collections.workspace = true
 cpal.workspace = true
 futures.workspace = true
-audio.workspace = true
 gpui = { workspace = true, features = ["screen-capture", "x11", "wayland", "windows-manifest"] }
 gpui_tokio.workspace = true
 http_client_tls.workspace = true
@@ -35,14 +35,15 @@ log.workspace = true
 nanoid.workspace = true
 parking_lot.workspace = true
 postage.workspace = true
-smallvec.workspace = true
+rodio = { workspace = true, features = ["wav_output", "recording"] }
+serde.workspace = true
+serde_urlencoded.workspace = true
 settings.workspace = true
+smallvec.workspace = true
 tokio-tungstenite.workspace = true
 util.workspace = true
 workspace-hack.workspace = true
 
-rodio = { workspace = true, features = ["wav_output"] }
-
 [target.'cfg(not(any(all(target_os = "windows", target_env = "gnu"), target_os = "freebsd")))'.dependencies]
 libwebrtc = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks" }
 livekit = { rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d", git = "https://github.com/zed-industries/livekit-rust-sdks", features = [

crates/livekit_client/examples/test_app.rs 🔗

@@ -255,7 +255,10 @@ impl LivekitWindow {
         } else {
             let room = self.room.clone();
             cx.spawn_in(window, async move |this, cx| {
-                let (publication, stream) = room.publish_local_microphone_track(cx).await.unwrap();
+                let (publication, stream) = room
+                    .publish_local_microphone_track("test_user".to_string(), false, cx)
+                    .await
+                    .unwrap();
                 this.update(cx, |this, cx| {
                     this.microphone_track = Some(publication);
                     this.microphone_stream = Some(stream);

crates/livekit_client/src/livekit_client.rs 🔗

@@ -97,9 +97,13 @@ impl Room {
 
     pub async fn publish_local_microphone_track(
         &self,
+        user_name: String,
+        is_staff: bool,
         cx: &mut AsyncApp,
     ) -> Result<(LocalTrackPublication, playback::AudioStream)> {
-        let (track, stream) = self.playback.capture_local_microphone_track()?;
+        let (track, stream) = self
+            .playback
+            .capture_local_microphone_track(user_name, is_staff, &cx)?;
         let publication = self
             .local_participant()
             .publish_track(
@@ -129,7 +133,7 @@ impl Room {
         cx: &mut App,
     ) -> Result<playback::AudioStream> {
         if AudioSettings::get_global(cx).rodio_audio {
-            info!("Using experimental.rodio_audio audio pipeline");
+            info!("Using experimental.rodio_audio audio pipeline for output");
             playback::play_remote_audio_track(&track.0, cx)
         } else {
             Ok(self.playback.play_remote_audio_track(&track.0))

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -1,10 +1,12 @@
 use anyhow::{Context as _, Result};
 
+use audio::{AudioSettings, CHANNEL_COUNT, SAMPLE_RATE};
 use cpal::traits::{DeviceTrait, StreamTrait as _};
 use futures::channel::mpsc::UnboundedSender;
 use futures::{Stream, StreamExt as _};
 use gpui::{
-    BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream, Task,
+    AsyncApp, BackgroundExecutor, ScreenCaptureFrame, ScreenCaptureSource, ScreenCaptureStream,
+    Task,
 };
 use libwebrtc::native::{apm, audio_mixer, audio_resampler};
 use livekit::track;
@@ -17,8 +19,11 @@ use livekit::webrtc::{
     video_source::{RtcVideoSource, VideoResolution, native::NativeVideoSource},
     video_stream::native::NativeVideoStream,
 };
+use log::info;
 use parking_lot::Mutex;
 use rodio::Source;
+use serde::{Deserialize, Serialize};
+use settings::Settings;
 use std::cell::RefCell;
 use std::sync::Weak;
 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
@@ -36,27 +41,28 @@ pub(crate) struct AudioStack {
     next_ssrc: AtomicI32,
 }
 
-// NOTE: We use WebRTC's mixer which only supports
-// 16kHz, 32kHz and 48kHz. As 48 is the most common "next step up"
-// for audio output devices like speakers/bluetooth, we just hard-code
-// this; and downsample when we need to.
-const SAMPLE_RATE: u32 = 48000;
-const NUM_CHANNELS: u32 = 2;
-
 pub(crate) fn play_remote_audio_track(
     track: &livekit::track::RemoteAudioTrack,
     cx: &mut gpui::App,
 ) -> Result<AudioStream> {
     let stop_handle = Arc::new(AtomicBool::new(false));
     let stop_handle_clone = stop_handle.clone();
-    let stream = source::LiveKitStream::new(cx.background_executor(), track)
+    let stream = source::LiveKitStream::new(cx.background_executor(), track);
+
+    let stream = stream
         .stoppable()
         .periodic_access(Duration::from_millis(50), move |s| {
             if stop_handle.load(Ordering::Relaxed) {
                 s.stop();
             }
         });
-    audio::Audio::play_source(stream, cx).context("Could not play audio")?;
+
+    let speaker: Speaker = serde_urlencoded::from_str(&track.name()).unwrap_or_else(|_| Speaker {
+        name: track.name(),
+        is_staff: false,
+    });
+    audio::Audio::play_voip_stream(stream, speaker.name, speaker.is_staff, cx)
+        .context("Could not play audio")?;
 
     let on_drop = util::defer(move || {
         stop_handle_clone.store(true, Ordering::Relaxed);
@@ -90,8 +96,8 @@ impl AudioStack {
         let next_ssrc = self.next_ssrc.fetch_add(1, Ordering::Relaxed);
         let source = AudioMixerSource {
             ssrc: next_ssrc,
-            sample_rate: SAMPLE_RATE,
-            num_channels: NUM_CHANNELS,
+            sample_rate: SAMPLE_RATE.get(),
+            num_channels: CHANNEL_COUNT.get() as u32,
             buffer: Arc::default(),
         };
         self.mixer.lock().add_source(source.clone());
@@ -131,7 +137,7 @@ impl AudioStack {
             let apm = self.apm.clone();
             let mixer = self.mixer.clone();
             async move {
-                Self::play_output(apm, mixer, SAMPLE_RATE, NUM_CHANNELS)
+                Self::play_output(apm, mixer, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
                     .await
                     .log_err();
             }
@@ -142,17 +148,26 @@ impl AudioStack {
 
     pub(crate) fn capture_local_microphone_track(
         &self,
+        user_name: String,
+        is_staff: bool,
+        cx: &AsyncApp,
     ) -> Result<(crate::LocalAudioTrack, AudioStream)> {
         let source = NativeAudioSource::new(
             // n.b. this struct's options are always ignored, noise cancellation is provided by apm.
             AudioSourceOptions::default(),
-            SAMPLE_RATE,
-            NUM_CHANNELS,
+            SAMPLE_RATE.get(),
+            CHANNEL_COUNT.get().into(),
             10,
         );
 
+        let track_name = serde_urlencoded::to_string(Speaker {
+            name: user_name,
+            is_staff,
+        })
+        .context("Could not encode user information in track name")?;
+
         let track = track::LocalAudioTrack::create_audio_track(
-            "microphone",
+            &track_name,
             RtcAudioSource::Native(source.clone()),
         );
 
@@ -166,9 +181,24 @@ impl AudioStack {
                 }
             }
         });
-        let capture_task = self.executor.spawn(async move {
-            Self::capture_input(apm, frame_tx, SAMPLE_RATE, NUM_CHANNELS).await
-        });
+        let rodio_pipeline =
+            AudioSettings::try_read_global(cx, |setting| setting.rodio_audio).unwrap_or_default();
+        let capture_task = if rodio_pipeline {
+            info!("Using experimental.rodio_audio audio pipeline");
+            let voip_parts = audio::VoipParts::new(cx)?;
+            thread::spawn(move || {
+                // microphone is non send on mac
+                let microphone = audio::Audio::open_microphone(voip_parts)?;
+                send_to_livekit(frame_tx, microphone);
+                Ok::<(), anyhow::Error>(())
+            });
+            Task::ready(Ok(()))
+        } else {
+            self.executor.spawn(async move {
+                Self::capture_input(apm, frame_tx, SAMPLE_RATE.get(), CHANNEL_COUNT.get().into())
+                    .await
+            })
+        };
 
         let on_drop = util::defer(|| {
             drop(transmit_task);
@@ -346,6 +376,36 @@ impl AudioStack {
     }
 }
 
+#[derive(Serialize, Deserialize)]
+struct Speaker {
+    name: String,
+    is_staff: bool,
+}
+
+fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
+    use cpal::Sample;
+    loop {
+        let sampled: Vec<_> = microphone
+            .by_ref()
+            .take(audio::BUFFER_SIZE)
+            .map(|s| s.to_sample())
+            .collect();
+
+        if frame_tx
+            .unbounded_send(AudioFrame {
+                sample_rate: SAMPLE_RATE.get(),
+                num_channels: CHANNEL_COUNT.get() as u32,
+                samples_per_channel: sampled.len() as u32 / CHANNEL_COUNT.get() as u32,
+                data: Cow::Owned(sampled),
+            })
+            .is_err()
+        {
+            // must rx has dropped or is not consuming
+            break;
+        }
+    }
+}
+
 use super::LocalVideoTrack;
 
 pub enum AudioStream {

crates/livekit_client/src/livekit_client/playback/source.rs 🔗

@@ -1,15 +1,23 @@
+use std::num::NonZero;
+
 use futures::StreamExt;
 use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
 use livekit::track::RemoteAudioTrack;
-use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter};
+use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, nz};
 
-use crate::livekit_client::playback::{NUM_CHANNELS, SAMPLE_RATE};
+use audio::{CHANNEL_COUNT, SAMPLE_RATE};
 
 fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
     let samples = frame.data.iter().copied();
     let samples = SampleTypeConverter::<_, _>::new(samples);
     let samples: Vec<f32> = samples.collect();
-    SamplesBuffer::new(frame.num_channels as u16, frame.sample_rate, samples)
+    SamplesBuffer::new(
+        // here be dragons
+        // NonZero::new(frame.num_channels as u16).expect("audio frame channels is nonzero"),
+        nz!(2),
+        NonZero::new(frame.sample_rate).expect("audio frame sample rate is nonzero"),
+        samples,
+    )
 }
 
 pub struct LiveKitStream {
@@ -20,8 +28,11 @@ pub struct LiveKitStream {
 
 impl LiveKitStream {
     pub fn new(executor: &gpui::BackgroundExecutor, track: &RemoteAudioTrack) -> Self {
-        let mut stream =
-            NativeAudioStream::new(track.rtc_track(), SAMPLE_RATE as i32, NUM_CHANNELS as i32);
+        let mut stream = NativeAudioStream::new(
+            track.rtc_track(),
+            SAMPLE_RATE.get() as i32,
+            CHANNEL_COUNT.get().into(),
+        );
         let (queue_input, queue_output) = rodio::queue::queue(true);
         // spawn rtc stream
         let receiver_task = executor.spawn({
@@ -54,11 +65,17 @@ impl Source for LiveKitStream {
     }
 
     fn channels(&self) -> rodio::ChannelCount {
-        self.inner.channels()
+        // This must be hardcoded because the playback source assumes constant
+        // sample rate and channel count. The queue upon which this is build
+        // will however report different counts and rates. Even though we put in
+        // only items with our (constant) CHANNEL_COUNT & SAMPLE_RATE this will
+        // play silence on one channel and at 44100 which is not what our
+        // constants are.
+        CHANNEL_COUNT
     }
 
     fn sample_rate(&self) -> rodio::SampleRate {
-        self.inner.sample_rate()
+        SAMPLE_RATE // see comment on channels
     }
 
     fn total_duration(&self) -> Option<std::time::Duration> {

crates/livekit_client/src/record.rs 🔗

@@ -1,5 +1,6 @@
 use std::{
     env,
+    num::NonZero,
     path::{Path, PathBuf},
     sync::{Arc, Mutex},
     time::Duration,
@@ -83,8 +84,12 @@ fn write_out(
             .expect("Stream has ended, callback cant hold the lock"),
     );
     let samples: Vec<f32> = SampleTypeConverter::<_, f32>::new(samples.into_iter()).collect();
-    let mut samples = SamplesBuffer::new(config.channels(), config.sample_rate().0, samples);
-    match rodio::output_to_wav(&mut samples, path) {
+    let mut samples = SamplesBuffer::new(
+        NonZero::new(config.channels()).expect("config channel is never zero"),
+        NonZero::new(config.sample_rate().0).expect("config sample_rate is never zero"),
+        samples,
+    );
+    match rodio::wav_to_file(&mut samples, path) {
         Ok(_) => Ok(()),
         Err(e) => Err(anyhow::anyhow!("Failed to write wav file: {}", e)),
     }

crates/livekit_client/src/test.rs 🔗

@@ -728,6 +728,8 @@ impl Room {
 
     pub async fn publish_local_microphone_track(
         &self,
+        _track_name: String,
+        _is_staff: bool,
         cx: &mut AsyncApp,
     ) -> Result<(LocalTrackPublication, AudioStream)> {
         self.local_participant().publish_microphone_track(cx).await

crates/zed/Cargo.toml 🔗

@@ -84,7 +84,6 @@ inspector_ui.workspace = true
 install_cli.workspace = true
 jj_ui.workspace = true
 journal.workspace = true
-livekit_client.workspace = true
 language.workspace = true
 language_extension.workspace = true
 language_model.workspace = true

crates/zed/src/zed.rs 🔗

@@ -13,6 +13,7 @@ use agent_ui::{AgentDiffToolbar, AgentPanelDelegate};
 use anyhow::Context as _;
 pub use app_menus::*;
 use assets::Assets;
+use audio::{AudioSettings, REPLAY_DURATION};
 use breadcrumbs::Breadcrumbs;
 use client::zed_urls;
 use collections::VecDeque;
@@ -59,7 +60,7 @@ use settings::{
     initial_local_debug_tasks_content, initial_project_settings_content, initial_tasks_content,
     update_settings_file,
 };
-use std::time::{Duration, Instant};
+use std::time::Duration;
 use std::{
     borrow::Cow,
     path::{Path, PathBuf},
@@ -128,8 +129,10 @@ actions!(
 actions!(
     dev,
     [
-        /// Record 10s of audio from your current microphone
-        CaptureAudio
+        /// Stores last 30s of audio from zed staff using the experimental rodio
+        /// audio system (including yourself) on the current call in a tar file
+        /// in the current working directory.
+        CaptureRecentAudio,
     ]
 );
 
@@ -910,8 +913,8 @@ fn register_actions(
                 }
             }
         })
-        .register_action(|workspace, _: &CaptureAudio, window, cx| {
-            capture_audio(workspace, window, cx);
+        .register_action(|workspace, _: &CaptureRecentAudio, window, cx| {
+            capture_recent_audio(workspace, window, cx);
         });
 
     #[cfg(not(target_os = "windows"))]
@@ -1845,50 +1848,39 @@ fn open_settings_file(
     .detach_and_log_err(cx);
 }
 
-fn capture_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context<Workspace>) {
-    #[derive(Default)]
-    enum State {
-        Recording(livekit_client::CaptureInput),
-        Failed(String),
-        Finished(PathBuf),
-        // Used during state switch. Should never occur naturally.
-        #[default]
-        Invalid,
-    }
-
-    struct CaptureAudioNotification {
+fn capture_recent_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context<Workspace>) {
+    struct CaptureRecentAudioNotification {
         focus_handle: gpui::FocusHandle,
-        start_time: Instant,
-        state: State,
+        save_result: Option<Result<(PathBuf, Duration), anyhow::Error>>,
+        _save_task: Task<anyhow::Result<()>>,
     }
 
-    impl gpui::EventEmitter<DismissEvent> for CaptureAudioNotification {}
-    impl gpui::EventEmitter<SuppressEvent> for CaptureAudioNotification {}
-    impl gpui::Focusable for CaptureAudioNotification {
+    impl gpui::EventEmitter<DismissEvent> for CaptureRecentAudioNotification {}
+    impl gpui::EventEmitter<SuppressEvent> for CaptureRecentAudioNotification {}
+    impl gpui::Focusable for CaptureRecentAudioNotification {
         fn focus_handle(&self, _cx: &App) -> gpui::FocusHandle {
             self.focus_handle.clone()
         }
     }
-    impl workspace::notifications::Notification for CaptureAudioNotification {}
+    impl workspace::notifications::Notification for CaptureRecentAudioNotification {}
 
-    const AUDIO_RECORDING_TIME_SECS: u64 = 10;
-
-    impl Render for CaptureAudioNotification {
+    impl Render for CaptureRecentAudioNotification {
         fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
-            let elapsed = self.start_time.elapsed().as_secs();
-            let message = match &self.state {
-                State::Recording(capture) => format!(
-                    "Recording {} seconds of audio from input: '{}'",
-                    AUDIO_RECORDING_TIME_SECS - elapsed,
-                    capture.name,
+            let message = match &self.save_result {
+                None => format!(
+                    "Saving up to {} seconds of recent audio",
+                    REPLAY_DURATION.as_secs(),
+                ),
+                Some(Ok((path, duration))) => format!(
+                    "Saved {} seconds of all audio to {}",
+                    duration.as_secs(),
+                    path.display(),
                 ),
-                State::Failed(e) => format!("Error capturing audio: {e}"),
-                State::Finished(path) => format!("Audio recorded to {}", path.display()),
-                State::Invalid => "Error invalid state".to_string(),
+                Some(Err(e)) => format!("Error saving audio replays: {e:?}"),
             };
 
             NotificationFrame::new()
-                .with_title(Some("Recording Audio"))
+                .with_title(Some("Saved Audio"))
                 .show_suppress_button(false)
                 .on_close(cx.listener(|_, _, _, cx| {
                     cx.emit(DismissEvent);
@@ -1897,53 +1889,41 @@ fn capture_audio(workspace: &mut Workspace, _: &mut Window, cx: &mut Context<Wor
         }
     }
 
-    impl CaptureAudioNotification {
-        fn finish(&mut self) {
-            let state = std::mem::take(&mut self.state);
-            self.state = if let State::Recording(capture) = state {
-                match capture.finish() {
-                    Ok(path) => State::Finished(path),
-                    Err(e) => State::Failed(e.to_string()),
-                }
-            } else {
-                state
-            };
-        }
-
+    impl CaptureRecentAudioNotification {
         fn new(cx: &mut Context<Self>) -> Self {
-            cx.spawn(async move |this, cx| {
-                for _ in 0..10 {
-                    cx.background_executor().timer(Duration::from_secs(1)).await;
-                    this.update(cx, |_, cx| {
+            if AudioSettings::get_global(cx).rodio_audio {
+                let executor = cx.background_executor().clone();
+                let save_task = cx.default_global::<audio::Audio>().save_replays(executor);
+                let _save_task = cx.spawn(async move |this, cx| {
+                    let res = save_task.await;
+                    this.update(cx, |this, cx| {
+                        this.save_result = Some(res);
                         cx.notify();
-                    })?;
-                }
-
-                this.update(cx, |this, cx| {
-                    this.finish();
-                    cx.notify();
-                })?;
-
-                anyhow::Ok(())
-            })
-            .detach();
-
-            let state = match livekit_client::CaptureInput::start() {
-                Ok(capture_input) => State::Recording(capture_input),
-                Err(err) => State::Failed(format!("Error starting audio capture: {}", err)),
-            };
+                    })
+                });
 
-            Self {
-                focus_handle: cx.focus_handle(),
-                start_time: Instant::now(),
-                state,
+                Self {
+                    focus_handle: cx.focus_handle(),
+                    _save_task,
+                    save_result: None,
+                }
+            } else {
+                Self {
+                    focus_handle: cx.focus_handle(),
+                    _save_task: Task::ready(Ok(())),
+                    save_result: Some(Err(anyhow::anyhow!(
+                        "Capturing recent audio is only supported on the experimental rodio audio pipeline"
+                    ))),
+                }
             }
         }
     }
 
-    workspace.show_notification(NotificationId::unique::<CaptureAudio>(), cx, |cx| {
-        cx.new(CaptureAudioNotification::new)
-    });
+    workspace.show_notification(
+        NotificationId::unique::<CaptureRecentAudioNotification>(),
+        cx,
+        |cx| cx.new(CaptureRecentAudioNotification::new),
+    );
 }
 
 #[cfg(test)]

tooling/workspace-hack/Cargo.toml 🔗

@@ -47,6 +47,7 @@ clap_builder = { version = "4", default-features = false, features = ["cargo", "
 concurrent-queue = { version = "2" }
 cranelift-codegen = { version = "0.116", default-features = false, features = ["host-arch", "incremental-cache", "std", "timing", "unwind"] }
 crc32fast = { version = "1" }
+crossbeam-channel = { version = "0.5" }
 crossbeam-epoch = { version = "0.9" }
 crossbeam-utils = { version = "0.8" }
 deranged = { version = "0.4", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -176,6 +177,7 @@ clap_builder = { version = "4", default-features = false, features = ["cargo", "
 concurrent-queue = { version = "2" }
 cranelift-codegen = { version = "0.116", default-features = false, features = ["host-arch", "incremental-cache", "std", "timing", "unwind"] }
 crc32fast = { version = "1" }
+crossbeam-channel = { version = "0.5" }
 crossbeam-epoch = { version = "0.9" }
 crossbeam-utils = { version = "0.8" }
 deranged = { version = "0.4", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -291,6 +293,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3", default-f
 gimli = { version = "0.31", default-features = false, features = ["read", "std", "write"] }
 hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "tls12"] }
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 naga = { version = "25", features = ["msl-out", "wgsl-in"] }
 nix-b73a96c0a5f6a7d9 = { package = "nix", version = "0.29", features = ["fs", "pthread", "signal", "user"] }
 objc2 = { version = "0.6" }
@@ -319,6 +322,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3", default-f
 gimli = { version = "0.31", default-features = false, features = ["read", "std", "write"] }
 hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "tls12"] }
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 naga = { version = "25", features = ["msl-out", "wgsl-in"] }
 nix-b73a96c0a5f6a7d9 = { package = "nix", version = "0.29", features = ["fs", "pthread", "signal", "user"] }
 objc2 = { version = "0.6" }
@@ -348,6 +352,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3", default-f
 gimli = { version = "0.31", default-features = false, features = ["read", "std", "write"] }
 hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "tls12"] }
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 naga = { version = "25", features = ["msl-out", "wgsl-in"] }
 nix-b73a96c0a5f6a7d9 = { package = "nix", version = "0.29", features = ["fs", "pthread", "signal", "user"] }
 objc2 = { version = "0.6" }
@@ -376,6 +381,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3", default-f
 gimli = { version = "0.31", default-features = false, features = ["read", "std", "write"] }
 hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "tls12"] }
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 naga = { version = "25", features = ["msl-out", "wgsl-in"] }
 nix-b73a96c0a5f6a7d9 = { package = "nix", version = "0.29", features = ["fs", "pthread", "signal", "user"] }
 objc2 = { version = "0.6" }
@@ -413,6 +419,7 @@ inout = { version = "0.1", default-features = false, features = ["block-padding"
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
 linux-raw-sys-274715c4dabd11b0 = { package = "linux-raw-sys", version = "0.9", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "xdp"] }
 linux-raw-sys-9fbad63c4bcf4a8f = { package = "linux-raw-sys", version = "0.4", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "system", "xdp"] }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 mio = { version = "1", features = ["net", "os-ext"] }
 naga = { version = "25", features = ["spv-out", "wgsl-in"] }
 nix-1f5adca70f036a62 = { package = "nix", version = "0.28", features = ["fs", "mman", "ptrace", "signal", "term", "user"] }
@@ -453,6 +460,7 @@ inout = { version = "0.1", default-features = false, features = ["block-padding"
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
 linux-raw-sys-274715c4dabd11b0 = { package = "linux-raw-sys", version = "0.9", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "xdp"] }
 linux-raw-sys-9fbad63c4bcf4a8f = { package = "linux-raw-sys", version = "0.4", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "system", "xdp"] }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 mio = { version = "1", features = ["net", "os-ext"] }
 naga = { version = "25", features = ["spv-out", "wgsl-in"] }
 nix-1f5adca70f036a62 = { package = "nix", version = "0.28", features = ["fs", "mman", "ptrace", "signal", "term", "user"] }
@@ -491,6 +499,7 @@ inout = { version = "0.1", default-features = false, features = ["block-padding"
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
 linux-raw-sys-274715c4dabd11b0 = { package = "linux-raw-sys", version = "0.9", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "xdp"] }
 linux-raw-sys-9fbad63c4bcf4a8f = { package = "linux-raw-sys", version = "0.4", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "system", "xdp"] }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 mio = { version = "1", features = ["net", "os-ext"] }
 naga = { version = "25", features = ["spv-out", "wgsl-in"] }
 nix-1f5adca70f036a62 = { package = "nix", version = "0.28", features = ["fs", "mman", "ptrace", "signal", "term", "user"] }
@@ -531,6 +540,7 @@ inout = { version = "0.1", default-features = false, features = ["block-padding"
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
 linux-raw-sys-274715c4dabd11b0 = { package = "linux-raw-sys", version = "0.9", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "xdp"] }
 linux-raw-sys-9fbad63c4bcf4a8f = { package = "linux-raw-sys", version = "0.4", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "system", "xdp"] }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 mio = { version = "1", features = ["net", "os-ext"] }
 naga = { version = "25", features = ["spv-out", "wgsl-in"] }
 nix-1f5adca70f036a62 = { package = "nix", version = "0.28", features = ["fs", "mman", "ptrace", "signal", "term", "user"] }
@@ -560,6 +570,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3", default-f
 getrandom-6f8ce4dd05d13bba = { package = "getrandom", version = "0.2", default-features = false, features = ["js", "rdrand"] }
 hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "tls12"] }
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 ring = { version = "0.17", features = ["std"] }
 rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["event"] }
 scopeguard = { version = "1" }
@@ -583,6 +594,7 @@ getrandom-468e82937335b1c9 = { package = "getrandom", version = "0.3", default-f
 getrandom-6f8ce4dd05d13bba = { package = "getrandom", version = "0.2", default-features = false, features = ["js", "rdrand"] }
 hyper-rustls = { version = "0.27", default-features = false, features = ["http1", "http2", "native-tokio", "ring", "tls12"] }
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 proc-macro2 = { version = "1", default-features = false, features = ["span-locations"] }
 ring = { version = "0.17", features = ["std"] }
 rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["event"] }
@@ -616,6 +628,7 @@ inout = { version = "0.1", default-features = false, features = ["block-padding"
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
 linux-raw-sys-274715c4dabd11b0 = { package = "linux-raw-sys", version = "0.9", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "xdp"] }
 linux-raw-sys-9fbad63c4bcf4a8f = { package = "linux-raw-sys", version = "0.4", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "system", "xdp"] }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 mio = { version = "1", features = ["net", "os-ext"] }
 naga = { version = "25", features = ["spv-out", "wgsl-in"] }
 nix-1f5adca70f036a62 = { package = "nix", version = "0.28", features = ["fs", "mman", "ptrace", "signal", "term", "user"] }
@@ -656,6 +669,7 @@ inout = { version = "0.1", default-features = false, features = ["block-padding"
 itertools-5ef9efb8ec2df382 = { package = "itertools", version = "0.12" }
 linux-raw-sys-274715c4dabd11b0 = { package = "linux-raw-sys", version = "0.9", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "xdp"] }
 linux-raw-sys-9fbad63c4bcf4a8f = { package = "linux-raw-sys", version = "0.4", default-features = false, features = ["elf", "errno", "general", "if_ether", "ioctl", "net", "netlink", "no_std", "prctl", "system", "xdp"] }
+livekit-runtime = { git = "https://github.com/zed-industries/livekit-rust-sdks", rev = "5f04705ac3f356350ae31534ffbc476abc9ea83d" }
 mio = { version = "1", features = ["net", "os-ext"] }
 naga = { version = "25", features = ["spv-out", "wgsl-in"] }
 nix-1f5adca70f036a62 = { package = "nix", version = "0.28", features = ["fs", "mman", "ptrace", "signal", "term", "user"] }