From 008311056768d7fbc741f536ae83ee2b9521ce3a Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 16 Mar 2026 11:46:53 +0100 Subject: [PATCH] wip: try to prevent deadlocks in livekit on reconnect Co-authored-by: Jakub Konka --- crates/call/src/call_impl/room.rs | 12 +++++++++--- .../livekit_client/src/livekit_client/playback.rs | 13 +++++++------ .../src/livekit_client/playback/source.rs | 13 ++++++++----- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/crates/call/src/call_impl/room.rs b/crates/call/src/call_impl/room.rs index 701d7dd65423f97b3f4d5cfa4a198083593211e6..e1fe269a4e58db275ceec2f1a1ca73b9de6a63a6 100644 --- a/crates/call/src/call_impl/room.rs +++ b/crates/call/src/call_impl/room.rs @@ -980,9 +980,15 @@ impl Room { participant_id: participant.peer_id, }); if let Some(live_kit) = self.live_kit.as_ref() { - let stream = live_kit.room.play_remote_audio_track(&track, cx)?; - participant.audio_tracks.insert(track_id, (track, stream)); - participant.muted = publication.is_muted(); + // Don't replace an existing stream for this track. During livekit + // reconnect, TrackSubscribed fires again for already-subscribed tracks. + // Replacing triggers concurrent add_sink/remove_sink on the same audio + // track's C++ mutex, deadlocking when the signaling thread is busy. + if !participant.audio_tracks.contains_key(&track_id) { + let stream = live_kit.room.play_remote_audio_track(&track, cx)?; + participant.audio_tracks.insert(track_id, (track, stream)); + participant.muted = publication.is_muted(); + } } } livekit_client::RemoteTrack::Video(track) => { diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index d6fc061321acd8d40a7df0e615bad0b8ecbb1f26..5e7e56d70eda6d4341207fb7de7bc2e496bfb3bf 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -105,15 +105,16 @@ impl AudioStack { }; self.mixer.lock().add_source(source.clone()); - let mut stream = NativeAudioStream::new( - track.rtc_track(), - source.sample_rate as i32, - source.num_channels as i32, - ); - let receive_task = self.executor.spawn_with_priority(Priority::RealtimeAudio, { let source = source.clone(); + let rtc_track = track.rtc_track(); + let sample_rate = source.sample_rate as i32; + let num_channels = source.num_channels as i32; async move { + // NativeAudioStream::new calls AudioTrack::add_sink which blocks on the WebRTC + // signaling thread. Doing this inside the background task avoids blocking the + // main thread when the signaling thread is busy (e.g. during reconnection). + let mut stream = NativeAudioStream::new(rtc_track, sample_rate, num_channels); while let Some(frame) = stream.next().await { source.receive(frame); } diff --git a/crates/livekit_client/src/livekit_client/playback/source.rs b/crates/livekit_client/src/livekit_client/playback/source.rs index b90c3613f8215481a4a535eb81c665fccae80e5c..c6b20a080133045d9971b4d9b1965b50d1cc5691 100644 --- a/crates/livekit_client/src/livekit_client/playback/source.rs +++ b/crates/livekit_client/src/livekit_client/playback/source.rs @@ -40,15 +40,18 @@ impl LiveKitStream { (CHANNEL_COUNT, SAMPLE_RATE) }; - let mut stream = NativeAudioStream::new( - track.rtc_track(), - sample_rate.get() as i32, - channel_count.get().into(), - ); let (queue_input, queue_output) = rodio::queue::queue(true); // spawn rtc stream let receiver_task = executor.spawn_with_priority(gpui::Priority::RealtimeAudio, { + let rtc_track = track.rtc_track(); + let sample_rate_i32 = sample_rate.get() as i32; + let channel_count_i32 = channel_count.get().into(); async move { + // NativeAudioStream::new calls AudioTrack::add_sink which blocks on the WebRTC + // signaling thread. Doing this inside the background task avoids blocking the + // main thread when the signaling thread is busy (e.g. during reconnection). + let mut stream = + NativeAudioStream::new(rtc_track, sample_rate_i32, channel_count_i32); while let Some(frame) = stream.next().await { let samples = frame_to_samplesbuffer(frame); queue_input.append(samples);