diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index 88ebdfd389498ae00ad434eb22726a84a5fe1e01..cbff89f0b7f6262cf27e31d5c1e0b0568e19ec79 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -3,7 +3,7 @@ use anyhow::{Context as _, Result}; use audio::{AudioSettings, CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE}; use cpal::DeviceId; use cpal::traits::{DeviceTrait, StreamTrait as _}; -use futures::channel::mpsc::UnboundedSender; +use futures::channel::mpsc::{Sender, UnboundedSender}; use futures::{Stream, StreamExt as _}; use gpui::{ AsyncApp, BackgroundExecutor, Priority, ScreenCaptureFrame, ScreenCaptureSource, @@ -201,10 +201,12 @@ impl AudioStack { let apm = self.apm.clone(); - let (frame_tx, mut frame_rx) = futures::channel::mpsc::unbounded(); + let (frame_tx, mut frame_rx) = + futures::channel::mpsc::channel::<(_, std::time::Instant)>(10); let transmit_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, { async move { - while let Some(frame) = frame_rx.next().await { + while let Some((frame, instant)) = frame_rx.next().await { + dbg!((instant.elapsed(), frame_rx.size_hint())); source.capture_frame(&frame).await.log_err(); } } @@ -344,7 +346,7 @@ impl AudioStack { async fn capture_input( executor: BackgroundExecutor, apm: Arc>, - frame_tx: UnboundedSender>, + frame_tx: Sender<(AudioFrame<'static>, std::time::Instant)>, sample_rate: u32, num_channels: u32, input_audio_device: Option, @@ -354,7 +356,7 @@ impl AudioStack { let (device, config) = crate::default_device(true, input_audio_device.as_ref())?; let (end_on_drop_tx, end_on_drop_rx) = std::sync::mpsc::channel::<()>(); let apm = apm.clone(); - let frame_tx = frame_tx.clone(); + let mut frame_tx = frame_tx.clone(); let mut resampler = audio_resampler::AudioResampler::default(); executor @@ -408,12 +410,15 @@ impl AudioStack { .log_err(); buf.clear(); frame_tx - .unbounded_send(AudioFrame { - data: Cow::Owned(sampled), - sample_rate, - num_channels, - samples_per_channel: sample_rate / 100, - }) + .try_send(( + AudioFrame { + data: Cow::Owned(sampled), + sample_rate, + num_channels, + samples_per_channel: sample_rate / 100, + }, + std::time::Instant::now(), + )) .ok(); } } @@ -445,7 +450,10 @@ pub struct Speaker { pub sends_legacy_audio: bool, } -fn send_to_livekit(frame_tx: UnboundedSender>, mut microphone: impl Source) { +fn send_to_livekit( + mut frame_tx: Sender<(AudioFrame<'static>, std::time::Instant)>, + mut microphone: impl Source, +) { use cpal::Sample; let sample_rate = microphone.sample_rate().get(); let num_channels = microphone.channels().get() as u32; @@ -459,16 +467,19 @@ fn send_to_livekit(frame_tx: UnboundedSender>, mut microphon .collect(); if frame_tx - .unbounded_send(AudioFrame { - sample_rate, - num_channels, - samples_per_channel: sampled.len() as u32 / num_channels, - data: Cow::Owned(sampled), - }) + .try_send(( + AudioFrame { + sample_rate, + num_channels, + samples_per_channel: sampled.len() as u32 / num_channels, + data: Cow::Owned(sampled), + }, + std::time::Instant::now(), + )) .is_err() { // must rx has dropped or is not consuming - break; + // break; } } }