wip: try to prevent deadlocks in livekit on reconnect

Piotr Osiewicz and Jakub Konka created

Co-authored-by: Jakub Konka <kubkon@jakubkonka.com>

Change summary

crates/call/src/call_impl/room.rs                           | 12 ++++-
crates/livekit_client/src/livekit_client/playback.rs        | 13 +++---
crates/livekit_client/src/livekit_client/playback/source.rs | 13 ++++--
3 files changed, 24 insertions(+), 14 deletions(-)

Detailed changes

crates/call/src/call_impl/room.rs 🔗

@@ -980,9 +980,15 @@ impl Room {
                             participant_id: participant.peer_id,
                         });
                         if let Some(live_kit) = self.live_kit.as_ref() {
-                            let stream = live_kit.room.play_remote_audio_track(&track, cx)?;
-                            participant.audio_tracks.insert(track_id, (track, stream));
-                            participant.muted = publication.is_muted();
+                            // Don't replace an existing stream for this track. During livekit
+                            // reconnect, TrackSubscribed fires again for already-subscribed tracks.
+                            // Replacing triggers concurrent add_sink/remove_sink on the same audio
+                            // track's C++ mutex, deadlocking when the signaling thread is busy.
+                            if !participant.audio_tracks.contains_key(&track_id) {
+                                let stream = live_kit.room.play_remote_audio_track(&track, cx)?;
+                                participant.audio_tracks.insert(track_id, (track, stream));
+                                participant.muted = publication.is_muted();
+                            }
                         }
                     }
                     livekit_client::RemoteTrack::Video(track) => {

crates/livekit_client/src/livekit_client/playback.rs 🔗

@@ -105,15 +105,16 @@ impl AudioStack {
         };
         self.mixer.lock().add_source(source.clone());
 
-        let mut stream = NativeAudioStream::new(
-            track.rtc_track(),
-            source.sample_rate as i32,
-            source.num_channels as i32,
-        );
-
         let receive_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, {
             let source = source.clone();
+            let rtc_track = track.rtc_track();
+            let sample_rate = source.sample_rate as i32;
+            let num_channels = source.num_channels as i32;
             async move {
+                // NativeAudioStream::new calls AudioTrack::add_sink which blocks on the WebRTC
+                // signaling thread. Doing this inside the background task avoids blocking the
+                // main thread when the signaling thread is busy (e.g. during reconnection).
+                let mut stream = NativeAudioStream::new(rtc_track, sample_rate, num_channels);
                 while let Some(frame) = stream.next().await {
                     source.receive(frame);
                 }

crates/livekit_client/src/livekit_client/playback/source.rs 🔗

@@ -40,15 +40,18 @@ impl LiveKitStream {
             (CHANNEL_COUNT, SAMPLE_RATE)
         };
 
-        let mut stream = NativeAudioStream::new(
-            track.rtc_track(),
-            sample_rate.get() as i32,
-            channel_count.get().into(),
-        );
         let (queue_input, queue_output) = rodio::queue::queue(true);
         // spawn rtc stream
         let receiver_task = executor.spawn_with_priority(gpui::Priority::RealtimeAudio, {
+            let rtc_track = track.rtc_track();
+            let sample_rate_i32 = sample_rate.get() as i32;
+            let channel_count_i32 = channel_count.get().into();
             async move {
+                // NativeAudioStream::new calls AudioTrack::add_sink which blocks on the WebRTC
+                // signaling thread. Doing this inside the background task avoids blocking the
+                // main thread when the signaling thread is busy (e.g. during reconnection).
+                let mut stream =
+                    NativeAudioStream::new(rtc_track, sample_rate_i32, channel_count_i32);
                 while let Some(frame) = stream.next().await {
                     let samples = frame_to_samplesbuffer(frame);
                     queue_input.append(samples);