1use futures::StreamExt;
2use libwebrtc::{audio_stream::native::NativeAudioStream, prelude::AudioFrame};
3use livekit::track::RemoteAudioTrack;
4use rodio::{Source, buffer::SamplesBuffer, conversions::SampleTypeConverter};
5
6use crate::livekit_client::playback::{NUM_CHANNELS, SAMPLE_RATE};
7
8fn frame_to_samplesbuffer(frame: AudioFrame) -> SamplesBuffer {
9 let samples = frame.data.iter().copied();
10 let samples = SampleTypeConverter::<_, _>::new(samples);
11 let samples: Vec<f32> = samples.collect();
12 SamplesBuffer::new(frame.num_channels as u16, frame.sample_rate, samples)
13}
14
15pub struct LiveKitStream {
16 // shared_buffer: SharedBuffer,
17 inner: rodio::queue::SourcesQueueOutput,
18 _receiver_task: gpui::Task<()>,
19}
20
21impl LiveKitStream {
22 pub fn new(executor: &gpui::BackgroundExecutor, track: &RemoteAudioTrack) -> Self {
23 let mut stream =
24 NativeAudioStream::new(track.rtc_track(), SAMPLE_RATE as i32, NUM_CHANNELS as i32);
25 let (queue_input, queue_output) = rodio::queue::queue(true);
26 // spawn rtc stream
27 let receiver_task = executor.spawn({
28 async move {
29 while let Some(frame) = stream.next().await {
30 let samples = frame_to_samplesbuffer(frame);
31 queue_input.append(samples);
32 }
33 }
34 });
35
36 LiveKitStream {
37 _receiver_task: receiver_task,
38 inner: queue_output,
39 }
40 }
41}
42
43impl Iterator for LiveKitStream {
44 type Item = rodio::Sample;
45
46 fn next(&mut self) -> Option<Self::Item> {
47 self.inner.next()
48 }
49}
50
51impl Source for LiveKitStream {
52 fn current_span_len(&self) -> Option<usize> {
53 self.inner.current_span_len()
54 }
55
56 fn channels(&self) -> rodio::ChannelCount {
57 self.inner.channels()
58 }
59
60 fn sample_rate(&self) -> rodio::SampleRate {
61 self.inner.sample_rate()
62 }
63
64 fn total_duration(&self) -> Option<std::time::Duration> {
65 self.inner.total_duration()
66 }
67}