try_send on a bounded channel

Piotr Osiewicz created

Change summary

crates/livekit_client/src/livekit_client/playback.rs | 49 ++++++++-----
1 file changed, 30 insertions(+), 19 deletions(-)

Detailed changes

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -3,7 +3,7 @@ use anyhow::{Context as _, Result};
 use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
 use cpal::DeviceId;
 use cpal::traits::{DeviceTrait, StreamTrait as _};
-use futures::channel::mpsc::UnboundedSender;
+use futures::channel::mpsc::{Sender, UnboundedSender};
 use futures::{Stream, StreamExt as _};
 use gpui::{
     AsyncApp, BackgroundExecutor, Priority, ScreenCaptureFrame, ScreenCaptureSource,
@@ -201,10 +201,12 @@ impl AudioStack {
 
         let apm = self.apm.clone();
 
-        let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded();
+        let (frame_tx, mut frame_rx) =
+            futures::channel::mpsc::channel::<(_, std::time::Instant)>(10);
         let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
             async move {
-                while let Some(frame) = frame_rx.next().await {
+                while let Some((frame, instant)) = frame_rx.next().await {
+                    dbg!((instant.elapsed(), frame_rx.size_hint()));
                     source.capture_frame(&frame).await.log_err();
                 }
             }
@@ -344,7 +346,7 @@ impl AudioStack {
     async fn capture_input(
         executor: BackgroundExecutor,
         apm: Arc<Mutex<apm::AudioProcessingModule>>,
-        frame_tx: UnboundedSender<AudioFrame<'static>>,
+        frame_tx: Sender<(AudioFrame<'static>, std::time::Instant)>,
         sample_rate: u32,
         num_channels: u32,
         input_audio_device: Option<DeviceId>,
@@ -354,7 +356,7 @@ impl AudioStack {
             let (device, config) = crate::default_device(true, input_audio_device.as_ref())?;
             let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>();
             let apm = apm.clone();
-            let frame_tx = frame_tx.clone();
+            let mut frame_tx = frame_tx.clone();
             let mut resampler = audio_resampler::AudioResampler::default();
 
             executor
@@ -408,12 +410,15 @@ impl AudioStack {
                                                 .log_err();
                                             buf.clear();
                                             frame_tx
-                                                .unbounded_send(AudioFrame {
-                                                    data: Cow::Owned(sampled),
-                                                    sample_rate,
-                                                    num_channels,
-                                                    samples_per_channel: sample_rate / 100,
-                                                })
+                                                .try_send((
+                                                    AudioFrame {
+                                                        data: Cow::Owned(sampled),
+                                                        sample_rate,
+                                                        num_channels,
+                                                        samples_per_channel: sample_rate / 100,
+                                                    },
+                                                    std::time::Instant::now(),
+                                                ))
                                                 .ok();
                                         }
                                     }
@@ -445,7 +450,10 @@ pub struct Speaker {
     pub sends_legacy_audio: bool,
 }
 
-fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphone: impl Source) {
+fn send_to_livekit(
+    mut frame_tx: Sender<(AudioFrame<'static>, std::time::Instant)>,
+    mut microphone: impl Source,
+) {
     use cpal::Sample;
     let sample_rate = microphone.sample_rate().get();
     let num_channels = microphone.channels().get() as u32;
@@ -459,16 +467,19 @@ fn send_to_livekit(frame_tx: UnboundedSender<AudioFrame<'static>>, mut microphon
             .collect();
 
         if frame_tx
-            .unbounded_send(AudioFrame {
-                sample_rate,
-                num_channels,
-                samples_per_channel: sampled.len() as u32 / num_channels,
-                data: Cow::Owned(sampled),
-            })
+            .try_send((
+                AudioFrame {
+                    sample_rate,
+                    num_channels,
+                    samples_per_channel: sampled.len() as u32 / num_channels,
+                    data: Cow::Owned(sampled),
+                },
+                std::time::Instant::now(),
+            ))
             .is_err()
         {
             // must rx has dropped or is not consuming
-            break;
+            // break;
         }
     }
 }