adds error handling to audio input pipeline

David Kleingeld created

Change summary

crates/livekit_client/src/livekit_client/playback.rs        | 36 ++++--
crates/livekit_client/src/livekit_client/playback/source.rs |  4 
2 files changed, 26 insertions(+), 14 deletions(-)

Detailed changes

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -26,9 +26,10 @@ use std::cell::RefCell;
 use std::num::NonZero;
 use std::sync::Weak;
 use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
+use std::sync::mpsc::{TryRecvError, channel};
 use std::time::Duration;
 use std::{borrow::Cow, collections::VecDeque, sync::Arc, thread};
-use util::{ResultExt as _, maybe};
+use util::{ResultExt as _, debug_panic, maybe};
 
 mod source;
 
@@ -268,6 +269,8 @@ impl AudioStack {
         const NUM_CHANNELS: usize = 1;
         const LIVEKIT_BUFFER_SIZE: usize = (SAMPLE_RATE as usize / 100) * NUM_CHANNELS as usize;
 
+        let (stream_error_tx, stream_error_rx) = channel();
+
         thread::spawn(move || {
             let stream = rodio::microphone::MicrophoneBuilder::new()
                 .default_device()?
@@ -281,11 +284,16 @@ impl AudioStack {
             .limit(LimitSettings::live_performance())
             .process_buffer::<LIVEKIT_BUFFER_SIZE, _>(|buffer| {
                 let mut int_buffer: [i16; _] = buffer.map(|s| s.to_sample());
-                apm.lock()
-                    .process_stream(&mut int_buffer, sample_rate as i32, num_channels as i32)
-                    .unwrap(); // TODO dvdsk fix this
-                for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
-                    *sample = (*processed).to_sample_();
+                if let Err(e) = apm
+                    .lock()
+                    .process_stream(&mut int_buffer, SAMPLE_RATE as i32, NUM_CHANNELS as i32)
+                    .context("livekit audio processor error")
+                {
+                    let _ = stream_error_tx.send(e);
+                } else {
+                    for (sample, processed) in buffer.iter_mut().zip(&int_buffer) {
+                        *sample = (*processed).to_sample_();
+                    }
                 }
             })
             .automatic_gain_control(1.0, 4.0, 0.0, 5.0);
@@ -297,19 +305,23 @@ impl AudioStack {
                     .map(|s| s.to_sample())
                     .collect();
 
-                if frame_tx
+                match stream_error_rx.try_recv() {
+                    Ok(apm_error) => return Err::<(), _>(apm_error),
+                    Err(TryRecvError::Disconnected) => {
+                        debug_panic!("Stream should end on its own without sending an error")
+                    }
+                    Err(TryRecvError::Empty) => (),
+                }
+
+                frame_tx
                     .unbounded_send(AudioFrame {
                         data: Cow::Owned(sampled),
                         sample_rate,
                         num_channels,
                         samples_per_channel: sample_rate / 100,
                     })
-                    .is_err()
-                {
-                    break;
-                }
+                    .context("Failed to send audio frame")?
             }
-            Ok::<(), anyhow::Error>(())
         });
 
         Ok(())

crates/livekit_client/src/livekit_client/playback/source.rs 🔗

@@ -103,7 +103,7 @@ where
     next: usize,
 }
 
-impl<const N: usize, S, F> Iterator for ProcessBuffer<S, F, N>
+impl<const N: usize, S, F> Iterator for ProcessBuffer<N, S, F>
 where
     S: Source + Sized,
     F: FnMut(&mut [rodio::Sample; N]),
@@ -127,13 +127,13 @@ where
     }
 }
 
-// TODO dvdsk this should be a spanless Source
 impl<const N: usize, S, F> Source for ProcessBuffer<N, S, F>
 where
     S: Source + Sized,
     F: FnMut(&mut [rodio::Sample; N]),
 {
     fn current_span_len(&self) -> Option<usize> {
+        // TODO dvdsk this should be a spanless Source
         None
     }