Add back room code to call2

Mikayla created

Change summary

crates/call2/src/participant.rs     |  29 -
crates/call2/src/room.rs            | 734 +++++++++++++++---------------
crates/live_kit_client2/src/prod.rs |   4 
crates/live_kit_client2/src/test.rs |   8 
4 files changed, 371 insertions(+), 404 deletions(-)

Detailed changes

crates/call2/src/participant.rs 🔗

@@ -1,10 +1,12 @@
 use anyhow::{anyhow, Result};
 use client2::ParticipantIndex;
 use client2::{proto, User};
+use collections::HashMap;
 use gpui2::WeakModel;
 pub use live_kit_client2::Frame;
+use live_kit_client2::{RemoteAudioTrack, RemoteVideoTrack};
 use project2::Project;
-use std::{fmt, sync::Arc};
+use std::sync::Arc;
 
 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
 pub enum ParticipantLocation {
@@ -45,27 +47,6 @@ pub struct RemoteParticipant {
     pub participant_index: ParticipantIndex,
     pub muted: bool,
     pub speaking: bool,
-    // pub video_tracks: HashMap<live_kit_client::Sid, Arc<RemoteVideoTrack>>,
-    // pub audio_tracks: HashMap<live_kit_client::Sid, Arc<RemoteAudioTrack>>,
-}
-
-#[derive(Clone)]
-pub struct RemoteVideoTrack {
-    pub(crate) live_kit_track: Arc<live_kit_client2::RemoteVideoTrack>,
-}
-
-unsafe impl Send for RemoteVideoTrack {}
-// todo!("remove this sync because it's not legit")
-unsafe impl Sync for RemoteVideoTrack {}
-
-impl fmt::Debug for RemoteVideoTrack {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("RemoteVideoTrack").finish()
-    }
-}
-
-impl RemoteVideoTrack {
-    pub fn frames(&self) -> async_broadcast::Receiver<Frame> {
-        self.live_kit_track.frames()
-    }
+    pub video_tracks: HashMap<live_kit_client2::Sid, Arc<RemoteVideoTrack>>,
+    pub audio_tracks: HashMap<live_kit_client2::Sid, Arc<RemoteAudioTrack>>,
 }

crates/call2/src/room.rs 🔗

@@ -1,9 +1,6 @@
-#![allow(dead_code, unused)]
-// todo!()
-
 use crate::{
     call_settings::CallSettings,
-    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
+    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
     IncomingCall,
 };
 use anyhow::{anyhow, Result};
@@ -19,12 +16,15 @@ use gpui2::{
     AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
 };
 use language2::LanguageRegistry;
-use live_kit_client2::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate};
+use live_kit_client2::{
+    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
+    RemoteVideoTrackUpdate,
+};
 use postage::{sink::Sink, stream::Stream, watch};
 use project2::Project;
 use settings2::Settings;
-use std::{future::Future, sync::Arc, time::Duration};
-use util::{ResultExt, TryFutureExt};
+use std::{future::Future, mem, sync::Arc, time::Duration};
+use util::{post_inc, ResultExt, TryFutureExt};
 
 pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 
@@ -95,15 +95,14 @@ impl Room {
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn is_connected(&self) -> bool {
-        false
-        // if let Some(live_kit) = self.live_kit.as_ref() {
-        //     matches!(
-        //         *live_kit.room.status().borrow(),
-        //         live_kit_client::ConnectionState::Connected { .. }
-        //     )
-        // } else {
-        //     false
-        // }
+        if let Some(live_kit) = self.live_kit.as_ref() {
+            matches!(
+                *live_kit.room.status().borrow(),
+                live_kit_client2::ConnectionState::Connected { .. }
+            )
+        } else {
+            false
+        }
     }
 
     fn new(
@@ -423,7 +422,7 @@ impl Room {
         self.pending_participants.clear();
         self.participant_user_ids.clear();
         self.client_subscriptions.clear();
-        // self.live_kit.take();
+        self.live_kit.take();
         self.pending_room_update.take();
         self.maintain_connection.take();
     }
@@ -799,43 +798,43 @@ impl Room {
                                     location,
                                     muted: true,
                                     speaking: false,
-                                    // video_tracks: Default::default(),
-                                    // audio_tracks: Default::default(),
+                                    video_tracks: Default::default(),
+                                    audio_tracks: Default::default(),
                                 },
                             );
 
                             Audio::play_sound(Sound::Joined, cx);
 
-                            // if let Some(live_kit) = this.live_kit.as_ref() {
-                            //     let video_tracks =
-                            //         live_kit.room.remote_video_tracks(&user.id.to_string());
-                            //     let audio_tracks =
-                            //         live_kit.room.remote_audio_tracks(&user.id.to_string());
-                            //     let publications = live_kit
-                            //         .room
-                            //         .remote_audio_track_publications(&user.id.to_string());
-
-                            //     for track in video_tracks {
-                            //         this.remote_video_track_updated(
-                            //             RemoteVideoTrackUpdate::Subscribed(track),
-                            //             cx,
-                            //         )
-                            //         .log_err();
-                            //     }
-
-                            //     for (track, publication) in
-                            //         audio_tracks.iter().zip(publications.iter())
-                            //     {
-                            //         this.remote_audio_track_updated(
-                            //             RemoteAudioTrackUpdate::Subscribed(
-                            //                 track.clone(),
-                            //                 publication.clone(),
-                            //             ),
-                            //             cx,
-                            //         )
-                            //         .log_err();
-                            //     }
-                            // }
+                            if let Some(live_kit) = this.live_kit.as_ref() {
+                                let video_tracks =
+                                    live_kit.room.remote_video_tracks(&user.id.to_string());
+                                let audio_tracks =
+                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
+                                let publications = live_kit
+                                    .room
+                                    .remote_audio_track_publications(&user.id.to_string());
+
+                                for track in video_tracks {
+                                    this.remote_video_track_updated(
+                                        RemoteVideoTrackUpdate::Subscribed(track),
+                                        cx,
+                                    )
+                                    .log_err();
+                                }
+
+                                for (track, publication) in
+                                    audio_tracks.iter().zip(publications.iter())
+                                {
+                                    this.remote_audio_track_updated(
+                                        RemoteAudioTrackUpdate::Subscribed(
+                                            track.clone(),
+                                            publication.clone(),
+                                        ),
+                                        cx,
+                                    )
+                                    .log_err();
+                                }
+                            }
                         }
                     }
 
@@ -923,7 +922,6 @@ impl Room {
         change: RemoteVideoTrackUpdate,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        todo!();
         match change {
             RemoteVideoTrackUpdate::Subscribed(track) => {
                 let user_id = track.publisher_id().parse()?;
@@ -932,12 +930,7 @@ impl Room {
                     .remote_participants
                     .get_mut(&user_id)
                     .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
-                // participant.video_tracks.insert(
-                //     track_id.clone(),
-                //     Arc::new(RemoteVideoTrack {
-                //         live_kit_track: track,
-                //     }),
-                // );
+                participant.video_tracks.insert(track_id.clone(), track);
                 cx.emit(Event::RemoteVideoTracksChanged {
                     participant_id: participant.peer_id,
                 });
@@ -951,7 +944,7 @@ impl Room {
                     .remote_participants
                     .get_mut(&user_id)
                     .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
-                // participant.video_tracks.remove(&track_id);
+                participant.video_tracks.remove(&track_id);
                 cx.emit(Event::RemoteVideoTracksChanged {
                     participant_id: participant.peer_id,
                 });
@@ -981,65 +974,61 @@ impl Room {
                         participant.speaking = false;
                     }
                 }
-                // todo!()
-                // if let Some(id) = self.client.user_id() {
-                // if let Some(room) = &mut self.live_kit {
-                //     if let Ok(_) = speaker_ids.binary_search(&id) {
-                //         room.speaking = true;
-                //     } else {
-                //         room.speaking = false;
-                //     }
-                // }
-                // }
+                if let Some(id) = self.client.user_id() {
+                    if let Some(room) = &mut self.live_kit {
+                        if let Ok(_) = speaker_ids.binary_search(&id) {
+                            room.speaking = true;
+                        } else {
+                            room.speaking = false;
+                        }
+                    }
+                }
                 cx.notify();
             }
             RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
-                // todo!()
-                // let mut found = false;
-                // for participant in &mut self.remote_participants.values_mut() {
-                //     for track in participant.audio_tracks.values() {
-                //         if track.sid() == track_id {
-                //             found = true;
-                //             break;
-                //         }
-                //     }
-                //     if found {
-                //         participant.muted = muted;
-                //         break;
-                //     }
-                // }
+                let mut found = false;
+                for participant in &mut self.remote_participants.values_mut() {
+                    for track in participant.audio_tracks.values() {
+                        if track.sid() == track_id {
+                            found = true;
+                            break;
+                        }
+                    }
+                    if found {
+                        participant.muted = muted;
+                        break;
+                    }
+                }
 
                 cx.notify();
             }
             RemoteAudioTrackUpdate::Subscribed(track, publication) => {
-                // todo!()
-                // let user_id = track.publisher_id().parse()?;
-                // let track_id = track.sid().to_string();
-                // let participant = self
-                //     .remote_participants
-                //     .get_mut(&user_id)
-                //     .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
-                // // participant.audio_tracks.insert(track_id.clone(), track);
-                // participant.muted = publication.is_muted();
-
-                // cx.emit(Event::RemoteAudioTracksChanged {
-                //     participant_id: participant.peer_id,
-                // });
+                let user_id = track.publisher_id().parse()?;
+                let track_id = track.sid().to_string();
+                let participant = self
+                    .remote_participants
+                    .get_mut(&user_id)
+                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
+                participant.audio_tracks.insert(track_id.clone(), track);
+                participant.muted = publication.is_muted();
+
+                cx.emit(Event::RemoteAudioTracksChanged {
+                    participant_id: participant.peer_id,
+                });
             }
             RemoteAudioTrackUpdate::Unsubscribed {
                 publisher_id,
                 track_id,
             } => {
-                // todo!()
-                // let user_id = publisher_id.parse()?;
-                // let participant = self
-                //     .remote_participants
-                //     .get_mut(&user_id)
-                //     .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
-                // participant.audio_tracks.remove(&track_id);
-                // cx.emit(Event::RemoteAudioTracksChanged {
-                //     participant_id: participant.peer_id,
-                // });
+                let user_id = publisher_id.parse()?;
+                let participant = self
+                    .remote_participants
+                    .get_mut(&user_id)
+                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
+                participant.audio_tracks.remove(&track_id);
+                cx.emit(Event::RemoteAudioTracksChanged {
+                    participant_id: participant.peer_id,
+                });
             }
         }
 
@@ -1220,278 +1209,269 @@ impl Room {
     }
 
     pub fn is_screen_sharing(&self) -> bool {
-        todo!()
-        // self.live_kit.as_ref().map_or(false, |live_kit| {
-        //     !matches!(live_kit.screen_track, LocalTrack::None)
-        // })
+        self.live_kit.as_ref().map_or(false, |live_kit| {
+            !matches!(live_kit.screen_track, LocalTrack::None)
+        })
     }
 
     pub fn is_sharing_mic(&self) -> bool {
-        todo!()
-        // self.live_kit.as_ref().map_or(false, |live_kit| {
-        //     !matches!(live_kit.microphone_track, LocalTrack::None)
-        // })
+        self.live_kit.as_ref().map_or(false, |live_kit| {
+            !matches!(live_kit.microphone_track, LocalTrack::None)
+        })
     }
 
     pub fn is_muted(&self, cx: &AppContext) -> bool {
-        todo!()
-        // self.live_kit
-        //     .as_ref()
-        //     .and_then(|live_kit| match &live_kit.microphone_track {
-        //         LocalTrack::None => Some(Self::mute_on_join(cx)),
-        //         LocalTrack::Pending { muted, .. } => Some(*muted),
-        //         LocalTrack::Published { muted, .. } => Some(*muted),
-        //     })
-        //     .unwrap_or(false)
+        self.live_kit
+            .as_ref()
+            .and_then(|live_kit| match &live_kit.microphone_track {
+                LocalTrack::None => Some(Self::mute_on_join(cx)),
+                LocalTrack::Pending { muted, .. } => Some(*muted),
+                LocalTrack::Published { muted, .. } => Some(*muted),
+            })
+            .unwrap_or(false)
     }
 
     pub fn is_speaking(&self) -> bool {
-        todo!()
-        // self.live_kit
-        //     .as_ref()
-        //     .map_or(false, |live_kit| live_kit.speaking)
+        self.live_kit
+            .as_ref()
+            .map_or(false, |live_kit| live_kit.speaking)
     }
 
     pub fn is_deafened(&self) -> Option<bool> {
-        // self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
-        todo!()
+        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
     }
 
     #[track_caller]
     pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
-        todo!()
-        // if self.status.is_offline() {
-        //     return Task::ready(Err(anyhow!("room is offline")));
-        // } else if self.is_sharing_mic() {
-        //     return Task::ready(Err(anyhow!("microphone was already shared")));
-        // }
-
-        // let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
-        //     let publish_id = post_inc(&mut live_kit.next_publish_id);
-        //     live_kit.microphone_track = LocalTrack::Pending {
-        //         publish_id,
-        //         muted: false,
-        //     };
-        //     cx.notify();
-        //     publish_id
-        // } else {
-        // return Task::ready(Err(anyhow!("live-kit was not initialized")));
-        // };
-
-        // cx.spawn(move |this, mut cx| async move {
-        //     let publish_track = async {
-        //         let track = LocalAudioTrack::create();
-        //         this.upgrade()
-        //             .ok_or_else(|| anyhow!("room was dropped"))?
-        //             .update(&mut cx, |this, _| {
-        //                 this.live_kit
-        //                     .as_ref()
-        //                     .map(|live_kit| live_kit.room.publish_audio_track(track))
-        //             })?
-        //             .ok_or_else(|| anyhow!("live-kit was not initialized"))?
-        //             .await
-        //     };
-
-        //     let publication = publish_track.await;
-        //     this.upgrade()
-        //         .ok_or_else(|| anyhow!("room was dropped"))?
-        //         .update(&mut cx, |this, cx| {
-        //             let live_kit = this
-        //                 .live_kit
-        //                 .as_mut()
-        //                 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
-
-        //             let (canceled, muted) = if let LocalTrack::Pending {
-        //                 publish_id: cur_publish_id,
-        //                 muted,
-        //             } = &live_kit.microphone_track
-        //             {
-        //                 (*cur_publish_id != publish_id, *muted)
-        //             } else {
-        //                 (true, false)
-        //             };
-
-        //             match publication {
-        //                 Ok(publication) => {
-        //                     if canceled {
-        //                         live_kit.room.unpublish_track(publication);
-        //                     } else {
-        //                         if muted {
-        //                             cx.executor().spawn(publication.set_mute(muted)).detach();
-        //                         }
-        //                         live_kit.microphone_track = LocalTrack::Published {
-        //                             track_publication: publication,
-        //                             muted,
-        //                         };
-        //                         cx.notify();
-        //                     }
-        //                     Ok(())
-        //                 }
-        //                 Err(error) => {
-        //                     if canceled {
-        //                         Ok(())
-        //                     } else {
-        //                         live_kit.microphone_track = LocalTrack::None;
-        //                         cx.notify();
-        //                         Err(error)
-        //                     }
-        //                 }
-        //             }
-        //         })?
-        // })
+        if self.status.is_offline() {
+            return Task::ready(Err(anyhow!("room is offline")));
+        } else if self.is_sharing_mic() {
+            return Task::ready(Err(anyhow!("microphone was already shared")));
+        }
+
+        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
+            let publish_id = post_inc(&mut live_kit.next_publish_id);
+            live_kit.microphone_track = LocalTrack::Pending {
+                publish_id,
+                muted: false,
+            };
+            cx.notify();
+            publish_id
+        } else {
+            return Task::ready(Err(anyhow!("live-kit was not initialized")));
+        };
+
+        cx.spawn(move |this, mut cx| async move {
+            let publish_track = async {
+                let track = LocalAudioTrack::create();
+                this.upgrade()
+                    .ok_or_else(|| anyhow!("room was dropped"))?
+                    .update(&mut cx, |this, _| {
+                        this.live_kit
+                            .as_ref()
+                            .map(|live_kit| live_kit.room.publish_audio_track(track))
+                    })?
+                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
+                    .await
+            };
+
+            let publication = publish_track.await;
+            this.upgrade()
+                .ok_or_else(|| anyhow!("room was dropped"))?
+                .update(&mut cx, |this, cx| {
+                    let live_kit = this
+                        .live_kit
+                        .as_mut()
+                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
+
+                    let (canceled, muted) = if let LocalTrack::Pending {
+                        publish_id: cur_publish_id,
+                        muted,
+                    } = &live_kit.microphone_track
+                    {
+                        (*cur_publish_id != publish_id, *muted)
+                    } else {
+                        (true, false)
+                    };
+
+                    match publication {
+                        Ok(publication) => {
+                            if canceled {
+                                live_kit.room.unpublish_track(publication);
+                            } else {
+                                if muted {
+                                    cx.executor().spawn(publication.set_mute(muted)).detach();
+                                }
+                                live_kit.microphone_track = LocalTrack::Published {
+                                    track_publication: publication,
+                                    muted,
+                                };
+                                cx.notify();
+                            }
+                            Ok(())
+                        }
+                        Err(error) => {
+                            if canceled {
+                                Ok(())
+                            } else {
+                                live_kit.microphone_track = LocalTrack::None;
+                                cx.notify();
+                                Err(error)
+                            }
+                        }
+                    }
+                })?
+        })
     }
 
     pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
-        todo!()
-        // if self.status.is_offline() {
-        //     return Task::ready(Err(anyhow!("room is offline")));
-        // } else if self.is_screen_sharing() {
-        //     return Task::ready(Err(anyhow!("screen was already shared")));
-        // }
-
-        // let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
-        //     let publish_id = post_inc(&mut live_kit.next_publish_id);
-        //     live_kit.screen_track = LocalTrack::Pending {
-        //         publish_id,
-        //         muted: false,
-        //     };
-        //     cx.notify();
-        //     (live_kit.room.display_sources(), publish_id)
-        // } else {
-        //     return Task::ready(Err(anyhow!("live-kit was not initialized")));
-        // };
-
-        // cx.spawn(move |this, mut cx| async move {
-        //     let publish_track = async {
-        //         let displays = displays.await?;
-        //         let display = displays
-        //             .first()
-        //             .ok_or_else(|| anyhow!("no display found"))?;
-        //         let track = LocalVideoTrack::screen_share_for_display(&display);
-        //         this.upgrade()
-        //             .ok_or_else(|| anyhow!("room was dropped"))?
-        //             .update(&mut cx, |this, _| {
-        //                 this.live_kit
-        //                     .as_ref()
-        //                     .map(|live_kit| live_kit.room.publish_video_track(track))
-        //             })?
-        //             .ok_or_else(|| anyhow!("live-kit was not initialized"))?
-        //             .await
-        //     };
-
-        //     let publication = publish_track.await;
-        //     this.upgrade()
-        //         .ok_or_else(|| anyhow!("room was dropped"))?
-        //         .update(&mut cx, |this, cx| {
-        //             let live_kit = this
-        //                 .live_kit
-        //                 .as_mut()
-        //                 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
-
-        //             let (canceled, muted) = if let LocalTrack::Pending {
-        //                 publish_id: cur_publish_id,
-        //                 muted,
-        //             } = &live_kit.screen_track
-        //             {
-        //                 (*cur_publish_id != publish_id, *muted)
-        //             } else {
-        //                 (true, false)
-        //             };
-
-        //             match publication {
-        //                 Ok(publication) => {
-        //                     if canceled {
-        //                         live_kit.room.unpublish_track(publication);
-        //                     } else {
-        //                         if muted {
-        //                             cx.executor().spawn(publication.set_mute(muted)).detach();
-        //                         }
-        //                         live_kit.screen_track = LocalTrack::Published {
-        //                             track_publication: publication,
-        //                             muted,
-        //                         };
-        //                         cx.notify();
-        //                     }
-
-        //                     Audio::play_sound(Sound::StartScreenshare, cx);
-
-        //                     Ok(())
-        //                 }
-        //                 Err(error) => {
-        //                     if canceled {
-        //                         Ok(())
-        //                     } else {
-        //                         live_kit.screen_track = LocalTrack::None;
-        //                         cx.notify();
-        //                         Err(error)
-        //                     }
-        //                 }
-        //             }
-        //         })?
-        // })
+        if self.status.is_offline() {
+            return Task::ready(Err(anyhow!("room is offline")));
+        } else if self.is_screen_sharing() {
+            return Task::ready(Err(anyhow!("screen was already shared")));
+        }
+
+        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
+            let publish_id = post_inc(&mut live_kit.next_publish_id);
+            live_kit.screen_track = LocalTrack::Pending {
+                publish_id,
+                muted: false,
+            };
+            cx.notify();
+            (live_kit.room.display_sources(), publish_id)
+        } else {
+            return Task::ready(Err(anyhow!("live-kit was not initialized")));
+        };
+
+        cx.spawn_on_main(move |this, mut cx| async move {
+            let publish_track = async {
+                let displays = displays.await?;
+                let display = displays
+                    .first()
+                    .ok_or_else(|| anyhow!("no display found"))?;
+                let track = LocalVideoTrack::screen_share_for_display(&display);
+                this.upgrade()
+                    .ok_or_else(|| anyhow!("room was dropped"))?
+                    .update(&mut cx, |this, _| {
+                        this.live_kit
+                            .as_ref()
+                            .map(|live_kit| live_kit.room.publish_video_track(track))
+                    })?
+                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
+                    .await
+            };
+
+            let publication = publish_track.await;
+            this.upgrade()
+                .ok_or_else(|| anyhow!("room was dropped"))?
+                .update(&mut cx, |this, cx| {
+                    let live_kit = this
+                        .live_kit
+                        .as_mut()
+                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
+
+                    let (canceled, muted) = if let LocalTrack::Pending {
+                        publish_id: cur_publish_id,
+                        muted,
+                    } = &live_kit.screen_track
+                    {
+                        (*cur_publish_id != publish_id, *muted)
+                    } else {
+                        (true, false)
+                    };
+
+                    match publication {
+                        Ok(publication) => {
+                            if canceled {
+                                live_kit.room.unpublish_track(publication);
+                            } else {
+                                if muted {
+                                    cx.executor().spawn(publication.set_mute(muted)).detach();
+                                }
+                                live_kit.screen_track = LocalTrack::Published {
+                                    track_publication: publication,
+                                    muted,
+                                };
+                                cx.notify();
+                            }
+
+                            Audio::play_sound(Sound::StartScreenshare, cx);
+
+                            Ok(())
+                        }
+                        Err(error) => {
+                            if canceled {
+                                Ok(())
+                            } else {
+                                live_kit.screen_track = LocalTrack::None;
+                                cx.notify();
+                                Err(error)
+                            }
+                        }
+                    }
+                })?
+        })
     }
 
     pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
-        todo!()
-        // let should_mute = !self.is_muted(cx);
-        // if let Some(live_kit) = self.live_kit.as_mut() {
-        //     if matches!(live_kit.microphone_track, LocalTrack::None) {
-        //         return Ok(self.share_microphone(cx));
-        //     }
-
-        //     let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
-        //     live_kit.muted_by_user = should_mute;
-
-        //     if old_muted == true && live_kit.deafened == true {
-        //         if let Some(task) = self.toggle_deafen(cx).ok() {
-        //             task.detach();
-        //         }
-        //     }
-
-        //     Ok(ret_task)
-        // } else {
-        //     Err(anyhow!("LiveKit not started"))
-        // }
+        let should_mute = !self.is_muted(cx);
+        if let Some(live_kit) = self.live_kit.as_mut() {
+            if matches!(live_kit.microphone_track, LocalTrack::None) {
+                return Ok(self.share_microphone(cx));
+            }
+
+            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
+            live_kit.muted_by_user = should_mute;
+
+            if old_muted == true && live_kit.deafened == true {
+                if let Some(task) = self.toggle_deafen(cx).ok() {
+                    task.detach();
+                }
+            }
+
+            Ok(ret_task)
+        } else {
+            Err(anyhow!("LiveKit not started"))
+        }
     }
 
     pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
-        todo!()
-        // if let Some(live_kit) = self.live_kit.as_mut() {
-        //     (*live_kit).deafened = !live_kit.deafened;
-
-        //     let mut tasks = Vec::with_capacity(self.remote_participants.len());
-        //     // Context notification is sent within set_mute itself.
-        //     let mut mute_task = None;
-        //     // When deafening, mute user's mic as well.
-        //     // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
-        //     if live_kit.deafened || !live_kit.muted_by_user {
-        //         mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
-        //     };
-        //     for participant in self.remote_participants.values() {
-        //         for track in live_kit
-        //             .room
-        //             .remote_audio_track_publications(&participant.user.id.to_string())
-        //         {
-        //             let deafened = live_kit.deafened;
-        //             tasks.push(
-        //                 cx.executor()
-        //                     .spawn_on_main(move || track.set_enabled(!deafened)),
-        //             );
-        //         }
-        //     }
-
-        //     Ok(cx.executor().spawn_on_main(|| async {
-        //         if let Some(mute_task) = mute_task {
-        //             mute_task.await?;
-        //         }
-        //         for task in tasks {
-        //             task.await?;
-        //         }
-        //         Ok(())
-        //     }))
-        // } else {
-        //     Err(anyhow!("LiveKit not started"))
-        // }
+        if let Some(live_kit) = self.live_kit.as_mut() {
+            (*live_kit).deafened = !live_kit.deafened;
+
+            let mut tasks = Vec::with_capacity(self.remote_participants.len());
+            // Context notification is sent within set_mute itself.
+            let mut mute_task = None;
+            // When deafening, mute user's mic as well.
+            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
+            if live_kit.deafened || !live_kit.muted_by_user {
+                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
+            };
+            for participant in self.remote_participants.values() {
+                for track in live_kit
+                    .room
+                    .remote_audio_track_publications(&participant.user.id.to_string())
+                {
+                    let deafened = live_kit.deafened;
+                    tasks.push(
+                        cx.executor()
+                            .spawn_on_main(move || track.set_enabled(!deafened)),
+                    );
+                }
+            }
+
+            Ok(cx.executor().spawn_on_main(|| async {
+                if let Some(mute_task) = mute_task {
+                    mute_task.await?;
+                }
+                for task in tasks {
+                    task.await?;
+                }
+                Ok(())
+            }))
+        } else {
+            Err(anyhow!("LiveKit not started"))
+        }
     }
 
     pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
@@ -1499,37 +1479,35 @@ impl Room {
             return Err(anyhow!("room is offline"));
         }
 
-        todo!()
-        // let live_kit = self
-        //     .live_kit
-        //     .as_mut()
-        //     .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
-        // match mem::take(&mut live_kit.screen_track) {
-        //     LocalTrack::None => Err(anyhow!("screen was not shared")),
-        //     LocalTrack::Pending { .. } => {
-        //         cx.notify();
-        //         Ok(())
-        //     }
-        //     LocalTrack::Published {
-        //         track_publication, ..
-        //     } => {
-        //         live_kit.room.unpublish_track(track_publication);
-        //         cx.notify();
-
-        //         Audio::play_sound(Sound::StopScreenshare, cx);
-        //         Ok(())
-        //     }
-        // }
+        let live_kit = self
+            .live_kit
+            .as_mut()
+            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
+        match mem::take(&mut live_kit.screen_track) {
+            LocalTrack::None => Err(anyhow!("screen was not shared")),
+            LocalTrack::Pending { .. } => {
+                cx.notify();
+                Ok(())
+            }
+            LocalTrack::Published {
+                track_publication, ..
+            } => {
+                live_kit.room.unpublish_track(track_publication);
+                cx.notify();
+
+                Audio::play_sound(Sound::StopScreenshare, cx);
+                Ok(())
+            }
+        }
     }
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn set_display_sources(&self, sources: Vec<live_kit_client2::MacOSDisplay>) {
-        todo!()
-        // self.live_kit
-        //     .as_ref()
-        //     .unwrap()
-        //     .room
-        //     .set_display_sources(sources);
+        self.live_kit
+            .as_ref()
+            .unwrap()
+            .room
+            .set_display_sources(sources);
     }
 }
 

crates/live_kit_client2/src/prod.rs 🔗

@@ -499,6 +499,10 @@ impl Room {
             rx,
         )
     }
+
+    pub fn set_display_sources(&self, _: Vec<MacOSDisplay>) {
+        unreachable!("This is a test-only function")
+    }
 }
 
 impl Drop for Room {

crates/live_kit_client2/src/test.rs 🔗

@@ -1,4 +1,4 @@
-use anyhow::{anyhow, Result, Context};
+use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
 use collections::{BTreeMap, HashMap};
 use futures::Stream;
@@ -364,7 +364,10 @@ impl Room {
         let token = token.to_string();
         async move {
             let server = TestServer::get(&url)?;
-            server.join_room(token.clone(), this.clone()).await.context("room join")?;
+            server
+                .join_room(token.clone(), this.clone())
+                .await
+                .context("room join")?;
             *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token };
             Ok(())
         }
@@ -547,6 +550,7 @@ impl LocalAudioTrack {
     }
 }
 
+#[derive(Debug)]
 pub struct RemoteVideoTrack {
     sid: Sid,
     publisher_id: Sid,