audio: Remove unbounded input queue in favor of 1-element queue (#51647)

Piotr Osiewicz and Jakub Konka created

Co-authored-by: Jakub Konka <kubkon@jakubkonka.com>

Closes #ISSUE

Before you mark this PR as ready for review, make sure that you have:
- [ ] Added a solid test coverage and/or screenshots from doing manual
testing
- [ ] Done a self-review taking into account security and performance
aspects
- [ ] Aligned any UI changes with the [UI
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)

Release Notes:

- Improve recovery of audio latency after CPU-intensive period.

Co-authored-by: Jakub Konka <kubkon@jakubkonka.com>

Change summary

crates/livekit_client/src/livekit_client/playback.rs | 36 +++++++------
1 file changed, 19 insertions(+), 17 deletions(-)

Detailed changes

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -3,7 +3,7 @@ use anyhow::{Context as _, Result};
 use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
 use cpal::DeviceId;
 use cpal::traits::{DeviceTrait, StreamTrait as _};
-use futures::channel::mpsc::UnboundedSender;
+use futures::channel::mpsc::Sender;
 use futures::{Stream, StreamExt as _};
 use gpui::{
     AsyncApp, BackgroundExecutor, Priority, ScreenCaptureFrame, ScreenCaptureSource,
@@ -201,7 +201,7 @@ impl AudioStack {
 
         let apm = self.apm.clone();
 
-        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
+        let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel(1);
         let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
             async move {
                 while let Some(frame) = frame_rx.next().await {
@@ -344,7 +344,7 @@ impl AudioStack {
     async fn capture_input(
         executor: BackgroundExecutor,
         apm: Arc<Mutex<apm::AudioProcessingModule>>,
-        frame_tx: UnboundedSender<AudioFrame<'static>>,
+        frame_tx: Sender<AudioFrame<'static>>,
         sample_rate: u32,
         num_channels: u32,
         input_audio_device: Option<DeviceId>,
@@ -354,7 +354,7 @@ impl AudioStack {
             let (device, config) = crate::default_device(true, input_audio_device.as_ref())?;
             let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
             let apm = apm.clone();
-            let frame_tx = frame_tx.clone();
+            let mut frame_tx = frame_tx.clone();
             let mut resampler = audio_resampler::AudioResampler::default();
 
             executor
@@ -408,7 +408,7 @@ impl AudioStack {
                                                 .log_err();
                                             buf.clear();
                                             frame_tx
-                                                .unbounded_send(AudioFrame {
+                                                .try_send(AudioFrame {
                                                     data: Cow::Owned(sampled),
                                                     sample_rate,
                                                     num_channels,
@@ -445,7 +445,7 @@ pub struct Speaker {
     pub sends_legacy_audio: bool,
 }
 
-fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
+fn send_to_livekit(mut frame_tx: Sender<AudioFrame<'static>>, mut microphone: impl Source) {
     use cpal::Sample;
     let sample_rate = microphone.sample_rate().get();
     let num_channels = microphone.channels().get() as u32;
@@ -458,17 +458,19 @@ fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphon
             .map(|s| s.to_sample())
             .collect();
 
-        if frame_tx
-            .unbounded_send(AudioFrame {
-                sample_rate,
-                num_channels,
-                samples_per_channel: sampled.len() as u32 / num_channels,
-                data: Cow::Owned(sampled),
-            })
-            .is_err()
-        {
-            // must rx has dropped or is not consuming
-            break;
+        match frame_tx.try_send(AudioFrame {
+            sample_rate,
+            num_channels,
+            samples_per_channel: sampled.len() as u32 / num_channels,
+            data: Cow::Owned(sampled),
+        }) {
+            Ok(_) => {}
+            Err(err) => {
+                if !err.is_full() {
+                    // must rx has dropped or is not consuming
+                    break;
+                }
+            }
         }
     }
 }