source.rs

 1use std::num::NonZero;
 2
 3use futures::StreamExt;
 4use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
 5use livekit::track::RemoteAudioTrack;
 6use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter, nz};
 7
 8use audio::{CHANNEL_COUNT, SAMPLE_RATE};
 9
10fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
11    let samples = frame.data.iter().copied();
12    let samples = SampleTypeConverter::<_, _>::new(samples);
13    let samples: Vec<f32> = samples.collect();
14    SamplesBuffer::new(
15        nz!(2), // frame always has two channels
16        NonZero::new(frame.sample_rate).expect("audio frame sample rate is nonzero"),
17        samples,
18    )
19}
20
21pub struct LiveKitStream {
22    // shared_buffer: SharedBuffer,
23    inner: rodio::queue::SourcesQueueOutput,
24    _receiver_task: gpui::Task<()>,
25}
26
27impl LiveKitStream {
28    pub fn new(executor: &gpui::BackgroundExecutor, track: &RemoteAudioTrack) -> Self {
29        let mut stream = NativeAudioStream::new(
30            track.rtc_track(),
31            SAMPLE_RATE.get() as i32,
32            CHANNEL_COUNT.get().into(),
33        );
34        let (queue_input, queue_output) = rodio::queue::queue(true);
35        // spawn rtc stream
36        let receiver_task = executor.spawn({
37            async move {
38                while let Some(frame) = stream.next().await {
39                    let samples = frame_to_samplesbuffer(frame);
40                    queue_input.append(samples);
41                }
42            }
43        });
44
45        LiveKitStream {
46            _receiver_task: receiver_task,
47            inner: queue_output,
48        }
49    }
50}
51
52impl Iterator for LiveKitStream {
53    type Item = rodio::Sample;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        self.inner.next()
57    }
58}
59
60impl Source for LiveKitStream {
61    fn current_span_len(&self) -> Option<usize> {
62        self.inner.current_span_len()
63    }
64
65    fn channels(&self) -> rodio::ChannelCount {
66        // This must be hardcoded because the playback source assumes constant
67        // sample rate and channel count. The queue upon which this is build
68        // will however report different counts and rates. Even though we put in
69        // only items with our (constant) CHANNEL_COUNT & SAMPLE_RATE this will
70        // play silence on one channel and at 44100 which is not what our
71        // constants are.
72        CHANNEL_COUNT
73    }
74
75    fn sample_rate(&self) -> rodio::SampleRate {
76        SAMPLE_RATE // see comment on channels
77    }
78
79    fn total_duration(&self) -> Option<std::time::Duration> {
80        self.inner.total_duration()
81    }
82}