1use std::num::NonZero;
2
3use futures::StreamExt;
4use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
5use livekit::track::RemoteAudioTrack;
6use rodio::{
7 ChannelCount, SampleRate, Source, buffer::SamplesBuffer, conversions::SampleTypeConverter,
8};
9
10use audio::{CHANNEL_COUNT, LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE, SAMPLE_RATE};
11
12fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
13 let samples = frame.data.iter().copied();
14 let samples = SampleTypeConverter::<_, _>::new(samples);
15 let samples: Vec<f32> = samples.collect();
16 SamplesBuffer::new(
17 NonZero::new(frame.num_channels as u16).expect("zero channels is nonsense"),
18 NonZero::new(frame.sample_rate).expect("samplerate zero is nonsense"),
19 samples,
20 )
21}
22
23pub struct LiveKitStream {
24 // shared_buffer: SharedBuffer,
25 inner: rodio::queue::SourcesQueueOutput,
26 _receiver_task: gpui::Task<()>,
27 channel_count: ChannelCount,
28 sample_rate: SampleRate,
29}
30
31impl LiveKitStream {
32 pub fn new(
33 executor: &gpui::BackgroundExecutor,
34 track: &RemoteAudioTrack,
35 legacy: bool,
36 ) -> Self {
37 let (channel_count, sample_rate) = if legacy {
38 (LEGACY_CHANNEL_COUNT, LEGACY_SAMPLE_RATE)
39 } else {
40 (CHANNEL_COUNT, SAMPLE_RATE)
41 };
42
43 let mut stream = NativeAudioStream::new(
44 track.rtc_track(),
45 sample_rate.get() as i32,
46 channel_count.get().into(),
47 );
48 let (queue_input, queue_output) = rodio::queue::queue(true);
49 // spawn rtc stream
50 let receiver_task = executor.spawn({
51 async move {
52 while let Some(frame) = stream.next().await {
53 let samples = frame_to_samplesbuffer(frame);
54 queue_input.append(samples);
55 }
56 }
57 });
58
59 LiveKitStream {
60 _receiver_task: receiver_task,
61 inner: queue_output,
62 sample_rate,
63 channel_count,
64 }
65 }
66}
67
68impl Iterator for LiveKitStream {
69 type Item = rodio::Sample;
70
71 fn next(&mut self) -> Option<Self::Item> {
72 self.inner.next()
73 }
74}
75
76impl Source for LiveKitStream {
77 fn current_span_len(&self) -> Option<usize> {
78 self.inner.current_span_len()
79 }
80
81 fn channels(&self) -> rodio::ChannelCount {
82 self.channel_count
83 }
84
85 fn sample_rate(&self) -> rodio::SampleRate {
86 self.sample_rate
87 }
88
89 fn total_duration(&self) -> Option<std::time::Duration> {
90 self.inner.total_duration()
91 }
92}