source.rs

 1use std::num::NonZero;
 2
 3use futures::StreamExt;
 4use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
 5use livekit::track::RemoteAudioTrack;
 6use rodio::{
 7    ChannelCount, SampleRate, Source, buffer::SamplesBuffer, conversions::SampleTypeConverter,
 8};
 9
10use audio::{CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
11
12fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
13    let samples = frame.data.iter().copied();
14    let samples = SampleTypeConverter::<_, _>::new(samples);
15    let samples: Vec<f32> = samples.collect();
16    SamplesBuffer::new(
17        NonZero::new(frame.num_channels as u16).expect("zero channels is nonsense"),
18        NonZero::new(frame.sample_rate).expect("samplerate zero is nonsense"),
19        samples,
20    )
21}
22
23pub struct LiveKitStream {
24    // shared_buffer: SharedBuffer,
25    inner: rodio::queue::SourcesQueueOutput,
26    _receiver_task: gpui::Task<()>,
27    channel_count: ChannelCount,
28    sample_rate: SampleRate,
29}
30
31impl LiveKitStream {
32    pub fn new(
33        executor: &gpui::BackgroundExecutor,
34        track: &RemoteAudioTrack,
35        legacy: bool,
36    ) -> Self {
37        let (channel_count, sample_rate) = if legacy {
38            (LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
39        } else {
40            (CHANNEL_COUNT, SAMPLE_RATE)
41        };
42
43        let mut stream = NativeAudioStream::new(
44            track.rtc_track(),
45            sample_rate.get() as i32,
46            channel_count.get().into(),
47        );
48        let (queue_input, queue_output) = rodio::queue::queue(true);
49        // spawn rtc stream
50        let receiver_task = executor.spawn({
51            async move {
52                while let Some(frame) = stream.next().await {
53                    let samples = frame_to_samplesbuffer(frame);
54                    queue_input.append(samples);
55                }
56            }
57        });
58
59        LiveKitStream {
60            _receiver_task: receiver_task,
61            inner: queue_output,
62            sample_rate,
63            channel_count,
64        }
65    }
66}
67
68impl Iterator for LiveKitStream {
69    type Item = rodio::Sample;
70
71    fn next(&mut self) -> Option<Self::Item> {
72        self.inner.next()
73    }
74}
75
76impl Source for LiveKitStream {
77    fn current_span_len(&self) -> Option<usize> {
78        self.inner.current_span_len()
79    }
80
81    fn channels(&self) -> rodio::ChannelCount {
82        self.channel_count
83    }
84
85    fn sample_rate(&self) -> rodio::SampleRate {
86        self.sample_rate
87    }
88
89    fn total_duration(&self) -> Option<std::time::Duration> {
90        self.inner.total_duration()
91    }
92}