source.rs

 1use futures::StreamExt;
 2use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
 3use livekit::track::RemoteAudioTrack;
 4use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter};
 5
 6use crate::livekit_client::playback::{NUM_CHANNELS, SAMPLE_RATE};
 7
 8fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
 9    let samples = frame.data.iter().copied();
10    let samples = SampleTypeConverter::<_, _>::new(samples);
11    let samples: Vec<f32> = samples.collect();
12    SamplesBuffer::new(frame.num_channels as u16, frame.sample_rate, samples)
13}
14
15pub struct LiveKitStream {
16    // shared_buffer: SharedBuffer,
17    inner: rodio::queue::SourcesQueueOutput,
18    _receiver_task: gpui::Task<()>,
19}
20
21impl LiveKitStream {
22    pub fn new(executor: &gpui::BackgroundExecutor, track: &RemoteAudioTrack) -> Self {
23        let mut stream =
24            NativeAudioStream::new(track.rtc_track(), SAMPLE_RATE as i32, NUM_CHANNELS as i32);
25        let (queue_input, queue_output) = rodio::queue::queue(true);
26        // spawn rtc stream
27        let receiver_task = executor.spawn({
28            async move {
29                while let Some(frame) = stream.next().await {
30                    let samples = frame_to_samplesbuffer(frame);
31                    queue_input.append(samples);
32                }
33            }
34        });
35
36        LiveKitStream {
37            _receiver_task: receiver_task,
38            inner: queue_output,
39        }
40    }
41}
42
43impl Iterator for LiveKitStream {
44    type Item = rodio::Sample;
45
46    fn next(&mut self) -> Option<Self::Item> {
47        self.inner.next()
48    }
49}
50
51impl Source for LiveKitStream {
52    fn current_span_len(&self) -> Option<usize> {
53        self.inner.current_span_len()
54    }
55
56    fn channels(&self) -> rodio::ChannelCount {
57        self.inner.channels()
58    }
59
60    fn sample_rate(&self) -> rodio::SampleRate {
61        self.inner.sample_rate()
62    }
63
64    fn total_duration(&self) -> Option<std::time::Duration> {
65        self.inner.total_duration()
66    }
67}