diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index 88ebdfd389498ae00ad434eb22726a84a5fe1e01..d6fc061321acd8d40a7df0e615bad0b8ecbb1f26 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -3,7 +3,7 @@ use anyhow::{Context as _, Result}; use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE}; use cpal::DeviceId; use cpal::traits::{DeviceTrait, StreamTrait as _}; -use futures::channel::mpsc::UnboundedSender; +use futures::channel::mpsc::Sender; use futures::{Stream, StreamExt as _}; use gpui::{ AsyncApp, BackgroundExecutor, Priority, ScreenCaptureFrame, ScreenCaptureSource, @@ -201,7 +201,7 @@ impl AudioStack { let apm = self.apm.clone(); - let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded(); + let (frame_tx, mut frame_rx) = futures::channel::mpsc::channel(1); let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, { async move { while let Some(frame) = frame_rx.next().await { @@ -344,7 +344,7 @@ impl AudioStack { async fn capture_input( executor: BackgroundExecutor, apm: Arc>, - frame_tx: UnboundedSender>, + frame_tx: Sender>, sample_rate: u32, num_channels: u32, input_audio_device: Option, @@ -354,7 +354,7 @@ impl AudioStack { let (device, config) = crate::default_device(true, input_audio_device.as_ref())?; let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>(); let apm = apm.clone(); - let frame_tx = frame_tx.clone(); + let mut frame_tx = frame_tx.clone(); let mut resampler = audio_resampler::AudioResampler::default(); executor @@ -408,7 +408,7 @@ impl AudioStack { .log_err(); buf.clear(); frame_tx - .unbounded_send(AudioFrame { + .try_send(AudioFrame { data: Cow::Owned(sampled), sample_rate, num_channels, @@ -445,7 +445,7 @@ pub struct Speaker { pub sends_legacy_audio: bool, } -fn send_to_livekit(frame_tx: UnboundedSender>, mut microphone: impl Source) { +fn send_to_livekit(mut frame_tx: Sender>, mut microphone: impl Source) { use cpal::Sample; let sample_rate = microphone.sample_rate().get(); let num_channels = microphone.channels().get() as u32; @@ -458,17 +458,19 @@ fn send_to_livekit(frame_tx: UnboundedSender>, mut microphon .map(|s| s.to_sample()) .collect(); - if frame_tx - .unbounded_send(AudioFrame { - sample_rate, - num_channels, - samples_per_channel: sampled.len() as u32 / num_channels, - data: Cow::Owned(sampled), - }) - .is_err() - { - // must rx has dropped or is not consuming - break; + match frame_tx.try_send(AudioFrame { + sample_rate, + num_channels, + samples_per_channel: sampled.len() as u32 / num_channels, + data: Cow::Owned(sampled), + }) { + Ok(_) => {} + Err(err) => { + if !err.is_full() { + // must rx has dropped or is not consuming + break; + } + } } } }