room.rs

   1#![allow(dead_code, unused)]
   2// todo!()
   3
   4use crate::{
   5    call_settings::CallSettings,
   6    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   7    IncomingCall,
   8};
   9use anyhow::{anyhow, Result};
  10use audio2::{Audio, Sound};
  11use client2::{
  12    proto::{self, PeerId},
  13    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  14};
  15use collections::{BTreeMap, HashMap, HashSet};
  16use fs2::Fs;
  17use futures::{FutureExt, StreamExt};
  18use gpui2::{
  19    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  20};
  21use language2::LanguageRegistry;
  22use live_kit_client::{LocalTrackPublication, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate};
  23use postage::{sink::Sink, stream::Stream, watch};
  24use project2::Project;
  25use settings2::Settings;
  26use std::{future::Future, sync::Arc, time::Duration};
  27use util::{ResultExt, TryFutureExt};
  28
  29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  30
  31#[derive(Clone, Debug, PartialEq, Eq)]
  32pub enum Event {
  33    ParticipantLocationChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteVideoTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteAudioTracksChanged {
  40        participant_id: proto::PeerId,
  41    },
  42    RemoteProjectShared {
  43        owner: Arc<User>,
  44        project_id: u64,
  45        worktree_root_names: Vec<String>,
  46    },
  47    RemoteProjectUnshared {
  48        project_id: u64,
  49    },
  50    RemoteProjectJoined {
  51        project_id: u64,
  52    },
  53    RemoteProjectInvitationDiscarded {
  54        project_id: u64,
  55    },
  56    Left,
  57}
  58
  59pub struct Room {
  60    id: u64,
  61    channel_id: Option<u64>,
  62    // live_kit: Option<LiveKitRoom>,
  63    status: RoomStatus,
  64    shared_projects: HashSet<WeakModel<Project>>,
  65    joined_projects: HashSet<WeakModel<Project>>,
  66    local_participant: LocalParticipant,
  67    remote_participants: BTreeMap<u64, RemoteParticipant>,
  68    pending_participants: Vec<Arc<User>>,
  69    participant_user_ids: HashSet<u64>,
  70    pending_call_count: usize,
  71    leave_when_empty: bool,
  72    client: Arc<Client>,
  73    user_store: Model<UserStore>,
  74    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  75    client_subscriptions: Vec<client2::Subscription>,
  76    _subscriptions: Vec<gpui2::Subscription>,
  77    room_update_completed_tx: watch::Sender<Option<()>>,
  78    room_update_completed_rx: watch::Receiver<Option<()>>,
  79    pending_room_update: Option<Task<()>>,
  80    maintain_connection: Option<Task<Option<()>>>,
  81}
  82
  83impl EventEmitter for Room {
  84    type Event = Event;
  85}
  86
  87impl Room {
  88    pub fn channel_id(&self) -> Option<u64> {
  89        self.channel_id
  90    }
  91
  92    pub fn is_sharing_project(&self) -> bool {
  93        !self.shared_projects.is_empty()
  94    }
  95
  96    #[cfg(any(test, feature = "test-support"))]
  97    pub fn is_connected(&self) -> bool {
  98        false
  99        // if let Some(live_kit) = self.live_kit.as_ref() {
 100        //     matches!(
 101        //         *live_kit.room.status().borrow(),
 102        //         live_kit_client::ConnectionState::Connected { .. }
 103        //     )
 104        // } else {
 105        //     false
 106        // }
 107    }
 108
 109    fn new(
 110        id: u64,
 111        channel_id: Option<u64>,
 112        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 113        client: Arc<Client>,
 114        user_store: Model<UserStore>,
 115        cx: &mut ModelContext<Self>,
 116    ) -> Self {
 117        todo!()
 118        // let _live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 119        //     let room = live_kit_client::Room::new();
 120        //     let mut status = room.status();
 121        //     // Consume the initial status of the room.
 122        //     let _ = status.try_recv();
 123        //     let _maintain_room = cx.spawn(|this, mut cx| async move {
 124        //         while let Some(status) = status.next().await {
 125        //             let this = if let Some(this) = this.upgrade() {
 126        //                 this
 127        //             } else {
 128        //                 break;
 129        //             };
 130
 131        //             if status == live_kit_client::ConnectionState::Disconnected {
 132        //                 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 133        //                     .ok();
 134        //                 break;
 135        //             }
 136        //         }
 137        //     });
 138
 139        //     let mut track_video_changes = room.remote_video_track_updates();
 140        //     let _maintain_video_tracks = cx.spawn(|this, mut cx| async move {
 141        //         while let Some(track_change) = track_video_changes.next().await {
 142        //             let this = if let Some(this) = this.upgrade() {
 143        //                 this
 144        //             } else {
 145        //                 break;
 146        //             };
 147
 148        //             this.update(&mut cx, |this, cx| {
 149        //                 this.remote_video_track_updated(track_change, cx).log_err()
 150        //             })
 151        //             .ok();
 152        //         }
 153        //     });
 154
 155        //     let mut track_audio_changes = room.remote_audio_track_updates();
 156        //     let _maintain_audio_tracks = cx.spawn(|this, mut cx| async move {
 157        //         while let Some(track_change) = track_audio_changes.next().await {
 158        //             let this = if let Some(this) = this.upgrade() {
 159        //                 this
 160        //             } else {
 161        //                 break;
 162        //             };
 163
 164        //             this.update(&mut cx, |this, cx| {
 165        //                 this.remote_audio_track_updated(track_change, cx).log_err()
 166        //             })
 167        //             .ok();
 168        //         }
 169        //     });
 170
 171        //     let connect = room.connect(&connection_info.server_url, &connection_info.token);
 172        //     cx.spawn(|this, mut cx| async move {
 173        //         connect.await?;
 174
 175        //         if !cx.update(|cx| Self::mute_on_join(cx))? {
 176        //             this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 177        //                 .await?;
 178        //         }
 179
 180        //         anyhow::Ok(())
 181        //     })
 182        //     .detach_and_log_err(cx);
 183
 184        //     Some(LiveKitRoom {
 185        //         room,
 186        //         screen_track: LocalTrack::None,
 187        //         microphone_track: LocalTrack::None,
 188        //         next_publish_id: 0,
 189        //         muted_by_user: false,
 190        //         deafened: false,
 191        //         speaking: false,
 192        //         _maintain_room,
 193        //         _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 194        //     })
 195        // } else {
 196        //     None
 197        // };
 198
 199        // let maintain_connection = cx.spawn({
 200        //     let client = client.clone();
 201        //     move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 202        // });
 203
 204        // Audio::play_sound(Sound::Joined, cx);
 205
 206        // let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 207
 208        // Self {
 209        //     id,
 210        //     channel_id,
 211        //     // live_kit: live_kit_room,
 212        //     status: RoomStatus::Online,
 213        //     shared_projects: Default::default(),
 214        //     joined_projects: Default::default(),
 215        //     participant_user_ids: Default::default(),
 216        //     local_participant: Default::default(),
 217        //     remote_participants: Default::default(),
 218        //     pending_participants: Default::default(),
 219        //     pending_call_count: 0,
 220        //     client_subscriptions: vec![
 221        //         client.add_message_handler(cx.weak_handle(), Self::handle_room_updated)
 222        //     ],
 223        //     _subscriptions: vec![
 224        //         cx.on_release(Self::released),
 225        //         cx.on_app_quit(Self::app_will_quit),
 226        //     ],
 227        //     leave_when_empty: false,
 228        //     pending_room_update: None,
 229        //     client,
 230        //     user_store,
 231        //     follows_by_leader_id_project_id: Default::default(),
 232        //     maintain_connection: Some(maintain_connection),
 233        //     room_update_completed_tx,
 234        //     room_update_completed_rx,
 235        // }
 236    }
 237
 238    pub(crate) fn create(
 239        called_user_id: u64,
 240        initial_project: Option<Model<Project>>,
 241        client: Arc<Client>,
 242        user_store: Model<UserStore>,
 243        cx: &mut AppContext,
 244    ) -> Task<Result<Model<Self>>> {
 245        cx.spawn(move |mut cx| async move {
 246            let response = client.request(proto::CreateRoom {}).await?;
 247            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 248            let room = cx.build_model(|cx| {
 249                Self::new(
 250                    room_proto.id,
 251                    None,
 252                    response.live_kit_connection_info,
 253                    client,
 254                    user_store,
 255                    cx,
 256                )
 257            })?;
 258
 259            let initial_project_id = if let Some(initial_project) = initial_project {
 260                let initial_project_id = room
 261                    .update(&mut cx, |room, cx| {
 262                        room.share_project(initial_project.clone(), cx)
 263                    })?
 264                    .await?;
 265                Some(initial_project_id)
 266            } else {
 267                None
 268            };
 269
 270            match room
 271                .update(&mut cx, |room, cx| {
 272                    room.leave_when_empty = true;
 273                    room.call(called_user_id, initial_project_id, cx)
 274                })?
 275                .await
 276            {
 277                Ok(()) => Ok(room),
 278                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 279            }
 280        })
 281    }
 282
 283    pub(crate) fn join_channel(
 284        channel_id: u64,
 285        client: Arc<Client>,
 286        user_store: Model<UserStore>,
 287        cx: &mut AppContext,
 288    ) -> Task<Result<Model<Self>>> {
 289        cx.spawn(move |cx| async move {
 290            Self::from_join_response(
 291                client.request(proto::JoinChannel { channel_id }).await?,
 292                client,
 293                user_store,
 294                cx,
 295            )
 296        })
 297    }
 298
 299    pub(crate) fn join(
 300        call: &IncomingCall,
 301        client: Arc<Client>,
 302        user_store: Model<UserStore>,
 303        cx: &mut AppContext,
 304    ) -> Task<Result<Model<Self>>> {
 305        let id = call.room_id;
 306        cx.spawn(move |cx| async move {
 307            Self::from_join_response(
 308                client.request(proto::JoinRoom { id }).await?,
 309                client,
 310                user_store,
 311                cx,
 312            )
 313        })
 314    }
 315
 316    fn released(&mut self, cx: &mut AppContext) {
 317        if self.status.is_online() {
 318            self.leave_internal(cx).detach_and_log_err(cx);
 319        }
 320    }
 321
 322    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 323        let task = if self.status.is_online() {
 324            let leave = self.leave_internal(cx);
 325            Some(cx.executor().spawn(async move {
 326                leave.await.log_err();
 327            }))
 328        } else {
 329            None
 330        };
 331
 332        async move {
 333            if let Some(task) = task {
 334                task.await;
 335            }
 336        }
 337    }
 338
 339    pub fn mute_on_join(cx: &AppContext) -> bool {
 340        CallSettings::get_global(cx).mute_on_join || client2::IMPERSONATE_LOGIN.is_some()
 341    }
 342
 343    fn from_join_response(
 344        response: proto::JoinRoomResponse,
 345        client: Arc<Client>,
 346        user_store: Model<UserStore>,
 347        mut cx: AsyncAppContext,
 348    ) -> Result<Model<Self>> {
 349        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 350        let room = cx.build_model(|cx| {
 351            Self::new(
 352                room_proto.id,
 353                response.channel_id,
 354                response.live_kit_connection_info,
 355                client,
 356                user_store,
 357                cx,
 358            )
 359        })?;
 360        room.update(&mut cx, |room, cx| {
 361            room.leave_when_empty = room.channel_id.is_none();
 362            room.apply_room_update(room_proto, cx)?;
 363            anyhow::Ok(())
 364        })??;
 365        Ok(room)
 366    }
 367
 368    fn should_leave(&self) -> bool {
 369        self.leave_when_empty
 370            && self.pending_room_update.is_none()
 371            && self.pending_participants.is_empty()
 372            && self.remote_participants.is_empty()
 373            && self.pending_call_count == 0
 374    }
 375
 376    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 377        cx.notify();
 378        cx.emit(Event::Left);
 379        self.leave_internal(cx)
 380    }
 381
 382    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 383        if self.status.is_offline() {
 384            return Task::ready(Err(anyhow!("room is offline")));
 385        }
 386
 387        log::info!("leaving room");
 388        Audio::play_sound(Sound::Leave, cx);
 389
 390        self.clear_state(cx);
 391
 392        let leave_room = self.client.request(proto::LeaveRoom {});
 393        cx.executor().spawn(async move {
 394            leave_room.await?;
 395            anyhow::Ok(())
 396        })
 397    }
 398
 399    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 400        for project in self.shared_projects.drain() {
 401            if let Some(project) = project.upgrade() {
 402                project.update(cx, |project, cx| {
 403                    project.unshare(cx).log_err();
 404                });
 405            }
 406        }
 407        for project in self.joined_projects.drain() {
 408            if let Some(project) = project.upgrade() {
 409                project.update(cx, |project, cx| {
 410                    project.disconnected_from_host(cx);
 411                    project.close(cx);
 412                });
 413            }
 414        }
 415
 416        self.status = RoomStatus::Offline;
 417        self.remote_participants.clear();
 418        self.pending_participants.clear();
 419        self.participant_user_ids.clear();
 420        self.client_subscriptions.clear();
 421        // self.live_kit.take();
 422        self.pending_room_update.take();
 423        self.maintain_connection.take();
 424    }
 425
 426    async fn maintain_connection(
 427        this: WeakModel<Self>,
 428        client: Arc<Client>,
 429        mut cx: AsyncAppContext,
 430    ) -> Result<()> {
 431        let mut client_status = client.status();
 432        loop {
 433            let _ = client_status.try_recv();
 434            let is_connected = client_status.borrow().is_connected();
 435            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 436            if !is_connected || client_status.next().await.is_some() {
 437                log::info!("detected client disconnection");
 438
 439                this.upgrade()
 440                    .ok_or_else(|| anyhow!("room was dropped"))?
 441                    .update(&mut cx, |this, cx| {
 442                        this.status = RoomStatus::Rejoining;
 443                        cx.notify();
 444                    })?;
 445
 446                // Wait for client to re-establish a connection to the server.
 447                {
 448                    let mut reconnection_timeout = cx.executor().timer(RECONNECT_TIMEOUT).fuse();
 449                    let client_reconnection = async {
 450                        let mut remaining_attempts = 3;
 451                        while remaining_attempts > 0 {
 452                            if client_status.borrow().is_connected() {
 453                                log::info!("client reconnected, attempting to rejoin room");
 454
 455                                let Some(this) = this.upgrade() else { break };
 456                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 457                                    Ok(task) => {
 458                                        if task.await.log_err().is_some() {
 459                                            return true;
 460                                        } else {
 461                                            remaining_attempts -= 1;
 462                                        }
 463                                    }
 464                                    Err(_app_dropped) => return false,
 465                                }
 466                            } else if client_status.borrow().is_signed_out() {
 467                                return false;
 468                            }
 469
 470                            log::info!(
 471                                "waiting for client status change, remaining attempts {}",
 472                                remaining_attempts
 473                            );
 474                            client_status.next().await;
 475                        }
 476                        false
 477                    }
 478                    .fuse();
 479                    futures::pin_mut!(client_reconnection);
 480
 481                    futures::select_biased! {
 482                        reconnected = client_reconnection => {
 483                            if reconnected {
 484                                log::info!("successfully reconnected to room");
 485                                // If we successfully joined the room, go back around the loop
 486                                // waiting for future connection status changes.
 487                                continue;
 488                            }
 489                        }
 490                        _ = reconnection_timeout => {
 491                            log::info!("room reconnection timeout expired");
 492                        }
 493                    }
 494                }
 495
 496                break;
 497            }
 498        }
 499
 500        // The client failed to re-establish a connection to the server
 501        // or an error occurred while trying to re-join the room. Either way
 502        // we leave the room and return an error.
 503        if let Some(this) = this.upgrade() {
 504            log::info!("reconnection failed, leaving room");
 505            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 506        }
 507        Err(anyhow!(
 508            "can't reconnect to room: client failed to re-establish connection"
 509        ))
 510    }
 511
 512    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 513        let mut projects = HashMap::default();
 514        let mut reshared_projects = Vec::new();
 515        let mut rejoined_projects = Vec::new();
 516        self.shared_projects.retain(|project| {
 517            if let Some(handle) = project.upgrade() {
 518                let project = handle.read(cx);
 519                if let Some(project_id) = project.remote_id() {
 520                    projects.insert(project_id, handle.clone());
 521                    reshared_projects.push(proto::UpdateProject {
 522                        project_id,
 523                        worktrees: project.worktree_metadata_protos(cx),
 524                    });
 525                    return true;
 526                }
 527            }
 528            false
 529        });
 530        self.joined_projects.retain(|project| {
 531            if let Some(handle) = project.upgrade() {
 532                let project = handle.read(cx);
 533                if let Some(project_id) = project.remote_id() {
 534                    projects.insert(project_id, handle.clone());
 535                    rejoined_projects.push(proto::RejoinProject {
 536                        id: project_id,
 537                        worktrees: project
 538                            .worktrees()
 539                            .map(|worktree| {
 540                                let worktree = worktree.read(cx);
 541                                proto::RejoinWorktree {
 542                                    id: worktree.id().to_proto(),
 543                                    scan_id: worktree.completed_scan_id() as u64,
 544                                }
 545                            })
 546                            .collect(),
 547                    });
 548                }
 549                return true;
 550            }
 551            false
 552        });
 553
 554        let response = self.client.request_envelope(proto::RejoinRoom {
 555            id: self.id,
 556            reshared_projects,
 557            rejoined_projects,
 558        });
 559
 560        cx.spawn(|this, mut cx| async move {
 561            let response = response.await?;
 562            let message_id = response.message_id;
 563            let response = response.payload;
 564            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 565            this.update(&mut cx, |this, cx| {
 566                this.status = RoomStatus::Online;
 567                this.apply_room_update(room_proto, cx)?;
 568
 569                for reshared_project in response.reshared_projects {
 570                    if let Some(project) = projects.get(&reshared_project.id) {
 571                        project.update(cx, |project, cx| {
 572                            project.reshared(reshared_project, cx).log_err();
 573                        });
 574                    }
 575                }
 576
 577                for rejoined_project in response.rejoined_projects {
 578                    if let Some(project) = projects.get(&rejoined_project.id) {
 579                        project.update(cx, |project, cx| {
 580                            project.rejoined(rejoined_project, message_id, cx).log_err();
 581                        });
 582                    }
 583                }
 584
 585                anyhow::Ok(())
 586            })?
 587        })
 588    }
 589
 590    pub fn id(&self) -> u64 {
 591        self.id
 592    }
 593
 594    pub fn status(&self) -> RoomStatus {
 595        self.status
 596    }
 597
 598    pub fn local_participant(&self) -> &LocalParticipant {
 599        &self.local_participant
 600    }
 601
 602    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 603        &self.remote_participants
 604    }
 605
 606    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 607        self.remote_participants
 608            .values()
 609            .find(|p| p.peer_id == peer_id)
 610    }
 611
 612    pub fn pending_participants(&self) -> &[Arc<User>] {
 613        &self.pending_participants
 614    }
 615
 616    pub fn contains_participant(&self, user_id: u64) -> bool {
 617        self.participant_user_ids.contains(&user_id)
 618    }
 619
 620    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 621        self.follows_by_leader_id_project_id
 622            .get(&(leader_id, project_id))
 623            .map_or(&[], |v| v.as_slice())
 624    }
 625
 626    /// Returns the most 'active' projects, defined as most people in the project
 627    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 628        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 629        for participant in self.remote_participants.values() {
 630            match participant.location {
 631                ParticipantLocation::SharedProject { project_id } => {
 632                    project_hosts_and_guest_counts
 633                        .entry(project_id)
 634                        .or_default()
 635                        .1 += 1;
 636                }
 637                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 638            }
 639            for project in &participant.projects {
 640                project_hosts_and_guest_counts
 641                    .entry(project.id)
 642                    .or_default()
 643                    .0 = Some(participant.user.id);
 644            }
 645        }
 646
 647        if let Some(user) = self.user_store.read(cx).current_user() {
 648            for project in &self.local_participant.projects {
 649                project_hosts_and_guest_counts
 650                    .entry(project.id)
 651                    .or_default()
 652                    .0 = Some(user.id);
 653            }
 654        }
 655
 656        project_hosts_and_guest_counts
 657            .into_iter()
 658            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 659            .max_by_key(|(_, _, guest_count)| *guest_count)
 660            .map(|(id, host, _)| (id, host))
 661    }
 662
 663    async fn handle_room_updated(
 664        this: Model<Self>,
 665        envelope: TypedEnvelope<proto::RoomUpdated>,
 666        _: Arc<Client>,
 667        mut cx: AsyncAppContext,
 668    ) -> Result<()> {
 669        let room = envelope
 670            .payload
 671            .room
 672            .ok_or_else(|| anyhow!("invalid room"))?;
 673        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 674    }
 675
 676    fn apply_room_update(
 677        &mut self,
 678        mut room: proto::Room,
 679        cx: &mut ModelContext<Self>,
 680    ) -> Result<()> {
 681        // Filter ourselves out from the room's participants.
 682        let local_participant_ix = room
 683            .participants
 684            .iter()
 685            .position(|participant| Some(participant.user_id) == self.client.user_id());
 686        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 687
 688        let pending_participant_user_ids = room
 689            .pending_participants
 690            .iter()
 691            .map(|p| p.user_id)
 692            .collect::<Vec<_>>();
 693
 694        let remote_participant_user_ids = room
 695            .participants
 696            .iter()
 697            .map(|p| p.user_id)
 698            .collect::<Vec<_>>();
 699
 700        let (remote_participants, pending_participants) =
 701            self.user_store.update(cx, move |user_store, cx| {
 702                (
 703                    user_store.get_users(remote_participant_user_ids, cx),
 704                    user_store.get_users(pending_participant_user_ids, cx),
 705                )
 706            });
 707
 708        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 709            let (remote_participants, pending_participants) =
 710                futures::join!(remote_participants, pending_participants);
 711
 712            this.update(&mut cx, |this, cx| {
 713                this.participant_user_ids.clear();
 714
 715                if let Some(participant) = local_participant {
 716                    this.local_participant.projects = participant.projects;
 717                } else {
 718                    this.local_participant.projects.clear();
 719                }
 720
 721                if let Some(participants) = remote_participants.log_err() {
 722                    for (participant, user) in room.participants.into_iter().zip(participants) {
 723                        let Some(peer_id) = participant.peer_id else {
 724                            continue;
 725                        };
 726                        let participant_index = ParticipantIndex(participant.participant_index);
 727                        this.participant_user_ids.insert(participant.user_id);
 728
 729                        let old_projects = this
 730                            .remote_participants
 731                            .get(&participant.user_id)
 732                            .into_iter()
 733                            .flat_map(|existing| &existing.projects)
 734                            .map(|project| project.id)
 735                            .collect::<HashSet<_>>();
 736                        let new_projects = participant
 737                            .projects
 738                            .iter()
 739                            .map(|project| project.id)
 740                            .collect::<HashSet<_>>();
 741
 742                        for project in &participant.projects {
 743                            if !old_projects.contains(&project.id) {
 744                                cx.emit(Event::RemoteProjectShared {
 745                                    owner: user.clone(),
 746                                    project_id: project.id,
 747                                    worktree_root_names: project.worktree_root_names.clone(),
 748                                });
 749                            }
 750                        }
 751
 752                        for unshared_project_id in old_projects.difference(&new_projects) {
 753                            this.joined_projects.retain(|project| {
 754                                if let Some(project) = project.upgrade() {
 755                                    project.update(cx, |project, cx| {
 756                                        if project.remote_id() == Some(*unshared_project_id) {
 757                                            project.disconnected_from_host(cx);
 758                                            false
 759                                        } else {
 760                                            true
 761                                        }
 762                                    })
 763                                } else {
 764                                    false
 765                                }
 766                            });
 767                            cx.emit(Event::RemoteProjectUnshared {
 768                                project_id: *unshared_project_id,
 769                            });
 770                        }
 771
 772                        let location = ParticipantLocation::from_proto(participant.location)
 773                            .unwrap_or(ParticipantLocation::External);
 774                        if let Some(remote_participant) =
 775                            this.remote_participants.get_mut(&participant.user_id)
 776                        {
 777                            remote_participant.peer_id = peer_id;
 778                            remote_participant.projects = participant.projects;
 779                            remote_participant.participant_index = participant_index;
 780                            if location != remote_participant.location {
 781                                remote_participant.location = location;
 782                                cx.emit(Event::ParticipantLocationChanged {
 783                                    participant_id: peer_id,
 784                                });
 785                            }
 786                        } else {
 787                            this.remote_participants.insert(
 788                                participant.user_id,
 789                                RemoteParticipant {
 790                                    user: user.clone(),
 791                                    participant_index,
 792                                    peer_id,
 793                                    projects: participant.projects,
 794                                    location,
 795                                    muted: true,
 796                                    speaking: false,
 797                                    // video_tracks: Default::default(),
 798                                    // audio_tracks: Default::default(),
 799                                },
 800                            );
 801
 802                            Audio::play_sound(Sound::Joined, cx);
 803
 804                            // if let Some(live_kit) = this.live_kit.as_ref() {
 805                            //     let video_tracks =
 806                            //         live_kit.room.remote_video_tracks(&user.id.to_string());
 807                            //     let audio_tracks =
 808                            //         live_kit.room.remote_audio_tracks(&user.id.to_string());
 809                            //     let publications = live_kit
 810                            //         .room
 811                            //         .remote_audio_track_publications(&user.id.to_string());
 812
 813                            //     for track in video_tracks {
 814                            //         this.remote_video_track_updated(
 815                            //             RemoteVideoTrackUpdate::Subscribed(track),
 816                            //             cx,
 817                            //         )
 818                            //         .log_err();
 819                            //     }
 820
 821                            //     for (track, publication) in
 822                            //         audio_tracks.iter().zip(publications.iter())
 823                            //     {
 824                            //         this.remote_audio_track_updated(
 825                            //             RemoteAudioTrackUpdate::Subscribed(
 826                            //                 track.clone(),
 827                            //                 publication.clone(),
 828                            //             ),
 829                            //             cx,
 830                            //         )
 831                            //         .log_err();
 832                            //     }
 833                            // }
 834                        }
 835                    }
 836
 837                    this.remote_participants.retain(|user_id, participant| {
 838                        if this.participant_user_ids.contains(user_id) {
 839                            true
 840                        } else {
 841                            for project in &participant.projects {
 842                                cx.emit(Event::RemoteProjectUnshared {
 843                                    project_id: project.id,
 844                                });
 845                            }
 846                            false
 847                        }
 848                    });
 849                }
 850
 851                if let Some(pending_participants) = pending_participants.log_err() {
 852                    this.pending_participants = pending_participants;
 853                    for participant in &this.pending_participants {
 854                        this.participant_user_ids.insert(participant.id);
 855                    }
 856                }
 857
 858                this.follows_by_leader_id_project_id.clear();
 859                for follower in room.followers {
 860                    let project_id = follower.project_id;
 861                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 862                        (Some(leader), Some(follower)) => (leader, follower),
 863
 864                        _ => {
 865                            log::error!("Follower message {follower:?} missing some state");
 866                            continue;
 867                        }
 868                    };
 869
 870                    let list = this
 871                        .follows_by_leader_id_project_id
 872                        .entry((leader, project_id))
 873                        .or_insert(Vec::new());
 874                    if !list.contains(&follower) {
 875                        list.push(follower);
 876                    }
 877                }
 878
 879                this.pending_room_update.take();
 880                if this.should_leave() {
 881                    log::info!("room is empty, leaving");
 882                    let _ = this.leave(cx);
 883                }
 884
 885                this.user_store.update(cx, |user_store, cx| {
 886                    let participant_indices_by_user_id = this
 887                        .remote_participants
 888                        .iter()
 889                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 890                        .collect();
 891                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 892                });
 893
 894                this.check_invariants();
 895                this.room_update_completed_tx.try_send(Some(())).ok();
 896                cx.notify();
 897            })
 898            .ok();
 899        }));
 900
 901        cx.notify();
 902        Ok(())
 903    }
 904
 905    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 906        let mut done_rx = self.room_update_completed_rx.clone();
 907        async move {
 908            while let Some(result) = done_rx.next().await {
 909                if result.is_some() {
 910                    break;
 911                }
 912            }
 913        }
 914    }
 915
 916    fn remote_video_track_updated(
 917        &mut self,
 918        change: RemoteVideoTrackUpdate,
 919        cx: &mut ModelContext<Self>,
 920    ) -> Result<()> {
 921        todo!();
 922        match change {
 923            RemoteVideoTrackUpdate::Subscribed(track) => {
 924                let user_id = track.publisher_id().parse()?;
 925                let track_id = track.sid().to_string();
 926                let participant = self
 927                    .remote_participants
 928                    .get_mut(&user_id)
 929                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 930                // participant.video_tracks.insert(
 931                //     track_id.clone(),
 932                //     Arc::new(RemoteVideoTrack {
 933                //         live_kit_track: track,
 934                //     }),
 935                // );
 936                cx.emit(Event::RemoteVideoTracksChanged {
 937                    participant_id: participant.peer_id,
 938                });
 939            }
 940            RemoteVideoTrackUpdate::Unsubscribed {
 941                publisher_id,
 942                track_id,
 943            } => {
 944                let user_id = publisher_id.parse()?;
 945                let participant = self
 946                    .remote_participants
 947                    .get_mut(&user_id)
 948                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 949                // participant.video_tracks.remove(&track_id);
 950                cx.emit(Event::RemoteVideoTracksChanged {
 951                    participant_id: participant.peer_id,
 952                });
 953            }
 954        }
 955
 956        cx.notify();
 957        Ok(())
 958    }
 959
 960    fn remote_audio_track_updated(
 961        &mut self,
 962        change: RemoteAudioTrackUpdate,
 963        cx: &mut ModelContext<Self>,
 964    ) -> Result<()> {
 965        match change {
 966            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 967                let mut speaker_ids = speakers
 968                    .into_iter()
 969                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 970                    .collect::<Vec<u64>>();
 971                speaker_ids.sort_unstable();
 972                for (sid, participant) in &mut self.remote_participants {
 973                    if let Ok(_) = speaker_ids.binary_search(sid) {
 974                        participant.speaking = true;
 975                    } else {
 976                        participant.speaking = false;
 977                    }
 978                }
 979                // todo!()
 980                // if let Some(id) = self.client.user_id() {
 981                // if let Some(room) = &mut self.live_kit {
 982                //     if let Ok(_) = speaker_ids.binary_search(&id) {
 983                //         room.speaking = true;
 984                //     } else {
 985                //         room.speaking = false;
 986                //     }
 987                // }
 988                // }
 989                cx.notify();
 990            }
 991            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 992                // todo!()
 993                // let mut found = false;
 994                // for participant in &mut self.remote_participants.values_mut() {
 995                //     for track in participant.audio_tracks.values() {
 996                //         if track.sid() == track_id {
 997                //             found = true;
 998                //             break;
 999                //         }
1000                //     }
1001                //     if found {
1002                //         participant.muted = muted;
1003                //         break;
1004                //     }
1005                // }
1006
1007                cx.notify();
1008            }
1009            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
1010                // todo!()
1011                // let user_id = track.publisher_id().parse()?;
1012                // let track_id = track.sid().to_string();
1013                // let participant = self
1014                //     .remote_participants
1015                //     .get_mut(&user_id)
1016                //     .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1017                // // participant.audio_tracks.insert(track_id.clone(), track);
1018                // participant.muted = publication.is_muted();
1019
1020                // cx.emit(Event::RemoteAudioTracksChanged {
1021                //     participant_id: participant.peer_id,
1022                // });
1023            }
1024            RemoteAudioTrackUpdate::Unsubscribed {
1025                publisher_id,
1026                track_id,
1027            } => {
1028                // todo!()
1029                // let user_id = publisher_id.parse()?;
1030                // let participant = self
1031                //     .remote_participants
1032                //     .get_mut(&user_id)
1033                //     .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1034                // participant.audio_tracks.remove(&track_id);
1035                // cx.emit(Event::RemoteAudioTracksChanged {
1036                //     participant_id: participant.peer_id,
1037                // });
1038            }
1039        }
1040
1041        cx.notify();
1042        Ok(())
1043    }
1044
1045    fn check_invariants(&self) {
1046        #[cfg(any(test, feature = "test-support"))]
1047        {
1048            for participant in self.remote_participants.values() {
1049                assert!(self.participant_user_ids.contains(&participant.user.id));
1050                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1051            }
1052
1053            for participant in &self.pending_participants {
1054                assert!(self.participant_user_ids.contains(&participant.id));
1055                assert_ne!(participant.id, self.client.user_id().unwrap());
1056            }
1057
1058            assert_eq!(
1059                self.participant_user_ids.len(),
1060                self.remote_participants.len() + self.pending_participants.len()
1061            );
1062        }
1063    }
1064
1065    pub(crate) fn call(
1066        &mut self,
1067        called_user_id: u64,
1068        initial_project_id: Option<u64>,
1069        cx: &mut ModelContext<Self>,
1070    ) -> Task<Result<()>> {
1071        if self.status.is_offline() {
1072            return Task::ready(Err(anyhow!("room is offline")));
1073        }
1074
1075        cx.notify();
1076        let client = self.client.clone();
1077        let room_id = self.id;
1078        self.pending_call_count += 1;
1079        cx.spawn(move |this, mut cx| async move {
1080            let result = client
1081                .request(proto::Call {
1082                    room_id,
1083                    called_user_id,
1084                    initial_project_id,
1085                })
1086                .await;
1087            this.update(&mut cx, |this, cx| {
1088                this.pending_call_count -= 1;
1089                if this.should_leave() {
1090                    this.leave(cx).detach_and_log_err(cx);
1091                }
1092            })?;
1093            result?;
1094            Ok(())
1095        })
1096    }
1097
1098    pub fn join_project(
1099        &mut self,
1100        id: u64,
1101        language_registry: Arc<LanguageRegistry>,
1102        fs: Arc<dyn Fs>,
1103        cx: &mut ModelContext<Self>,
1104    ) -> Task<Result<Model<Project>>> {
1105        let client = self.client.clone();
1106        let user_store = self.user_store.clone();
1107        cx.emit(Event::RemoteProjectJoined { project_id: id });
1108        cx.spawn(move |this, mut cx| async move {
1109            let project =
1110                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
1111
1112            this.update(&mut cx, |this, cx| {
1113                this.joined_projects.retain(|project| {
1114                    if let Some(project) = project.upgrade() {
1115                        !project.read(cx).is_read_only()
1116                    } else {
1117                        false
1118                    }
1119                });
1120                this.joined_projects.insert(project.downgrade());
1121            })?;
1122            Ok(project)
1123        })
1124    }
1125
1126    pub(crate) fn share_project(
1127        &mut self,
1128        project: Model<Project>,
1129        cx: &mut ModelContext<Self>,
1130    ) -> Task<Result<u64>> {
1131        if let Some(project_id) = project.read(cx).remote_id() {
1132            return Task::ready(Ok(project_id));
1133        }
1134
1135        let request = self.client.request(proto::ShareProject {
1136            room_id: self.id(),
1137            worktrees: project.read(cx).worktree_metadata_protos(cx),
1138        });
1139        cx.spawn(|this, mut cx| async move {
1140            let response = request.await?;
1141
1142            project.update(&mut cx, |project, cx| {
1143                project.shared(response.project_id, cx)
1144            })??;
1145
1146            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1147            this.update(&mut cx, |this, cx| {
1148                this.shared_projects.insert(project.downgrade());
1149                let active_project = this.local_participant.active_project.as_ref();
1150                if active_project.map_or(false, |location| *location == project) {
1151                    this.set_location(Some(&project), cx)
1152                } else {
1153                    Task::ready(Ok(()))
1154                }
1155            })?
1156            .await?;
1157
1158            Ok(response.project_id)
1159        })
1160    }
1161
1162    pub(crate) fn unshare_project(
1163        &mut self,
1164        project: Model<Project>,
1165        cx: &mut ModelContext<Self>,
1166    ) -> Result<()> {
1167        let project_id = match project.read(cx).remote_id() {
1168            Some(project_id) => project_id,
1169            None => return Ok(()),
1170        };
1171
1172        self.client.send(proto::UnshareProject { project_id })?;
1173        project.update(cx, |this, cx| this.unshare(cx))
1174    }
1175
1176    pub(crate) fn set_location(
1177        &mut self,
1178        project: Option<&Model<Project>>,
1179        cx: &mut ModelContext<Self>,
1180    ) -> Task<Result<()>> {
1181        if self.status.is_offline() {
1182            return Task::ready(Err(anyhow!("room is offline")));
1183        }
1184
1185        let client = self.client.clone();
1186        let room_id = self.id;
1187        let location = if let Some(project) = project {
1188            self.local_participant.active_project = Some(project.downgrade());
1189            if let Some(project_id) = project.read(cx).remote_id() {
1190                proto::participant_location::Variant::SharedProject(
1191                    proto::participant_location::SharedProject { id: project_id },
1192                )
1193            } else {
1194                proto::participant_location::Variant::UnsharedProject(
1195                    proto::participant_location::UnsharedProject {},
1196                )
1197            }
1198        } else {
1199            self.local_participant.active_project = None;
1200            proto::participant_location::Variant::External(proto::participant_location::External {})
1201        };
1202
1203        cx.notify();
1204        cx.executor().spawn_on_main(move || async move {
1205            client
1206                .request(proto::UpdateParticipantLocation {
1207                    room_id,
1208                    location: Some(proto::ParticipantLocation {
1209                        variant: Some(location),
1210                    }),
1211                })
1212                .await?;
1213            Ok(())
1214        })
1215    }
1216
1217    pub fn is_screen_sharing(&self) -> bool {
1218        todo!()
1219        // self.live_kit.as_ref().map_or(false, |live_kit| {
1220        //     !matches!(live_kit.screen_track, LocalTrack::None)
1221        // })
1222    }
1223
1224    pub fn is_sharing_mic(&self) -> bool {
1225        todo!()
1226        // self.live_kit.as_ref().map_or(false, |live_kit| {
1227        //     !matches!(live_kit.microphone_track, LocalTrack::None)
1228        // })
1229    }
1230
1231    pub fn is_muted(&self, cx: &AppContext) -> bool {
1232        todo!()
1233        // self.live_kit
1234        //     .as_ref()
1235        //     .and_then(|live_kit| match &live_kit.microphone_track {
1236        //         LocalTrack::None => Some(Self::mute_on_join(cx)),
1237        //         LocalTrack::Pending { muted, .. } => Some(*muted),
1238        //         LocalTrack::Published { muted, .. } => Some(*muted),
1239        //     })
1240        //     .unwrap_or(false)
1241    }
1242
1243    pub fn is_speaking(&self) -> bool {
1244        todo!()
1245        // self.live_kit
1246        //     .as_ref()
1247        //     .map_or(false, |live_kit| live_kit.speaking)
1248    }
1249
1250    pub fn is_deafened(&self) -> Option<bool> {
1251        // self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1252        todo!()
1253    }
1254
1255    #[track_caller]
1256    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1257        todo!()
1258        // if self.status.is_offline() {
1259        //     return Task::ready(Err(anyhow!("room is offline")));
1260        // } else if self.is_sharing_mic() {
1261        //     return Task::ready(Err(anyhow!("microphone was already shared")));
1262        // }
1263
1264        // let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1265        //     let publish_id = post_inc(&mut live_kit.next_publish_id);
1266        //     live_kit.microphone_track = LocalTrack::Pending {
1267        //         publish_id,
1268        //         muted: false,
1269        //     };
1270        //     cx.notify();
1271        //     publish_id
1272        // } else {
1273        // return Task::ready(Err(anyhow!("live-kit was not initialized")));
1274        // };
1275
1276        // cx.spawn(move |this, mut cx| async move {
1277        //     let publish_track = async {
1278        //         let track = LocalAudioTrack::create();
1279        //         this.upgrade()
1280        //             .ok_or_else(|| anyhow!("room was dropped"))?
1281        //             .update(&mut cx, |this, _| {
1282        //                 this.live_kit
1283        //                     .as_ref()
1284        //                     .map(|live_kit| live_kit.room.publish_audio_track(track))
1285        //             })?
1286        //             .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1287        //             .await
1288        //     };
1289
1290        //     let publication = publish_track.await;
1291        //     this.upgrade()
1292        //         .ok_or_else(|| anyhow!("room was dropped"))?
1293        //         .update(&mut cx, |this, cx| {
1294        //             let live_kit = this
1295        //                 .live_kit
1296        //                 .as_mut()
1297        //                 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1298
1299        //             let (canceled, muted) = if let LocalTrack::Pending {
1300        //                 publish_id: cur_publish_id,
1301        //                 muted,
1302        //             } = &live_kit.microphone_track
1303        //             {
1304        //                 (*cur_publish_id != publish_id, *muted)
1305        //             } else {
1306        //                 (true, false)
1307        //             };
1308
1309        //             match publication {
1310        //                 Ok(publication) => {
1311        //                     if canceled {
1312        //                         live_kit.room.unpublish_track(publication);
1313        //                     } else {
1314        //                         if muted {
1315        //                             cx.executor().spawn(publication.set_mute(muted)).detach();
1316        //                         }
1317        //                         live_kit.microphone_track = LocalTrack::Published {
1318        //                             track_publication: publication,
1319        //                             muted,
1320        //                         };
1321        //                         cx.notify();
1322        //                     }
1323        //                     Ok(())
1324        //                 }
1325        //                 Err(error) => {
1326        //                     if canceled {
1327        //                         Ok(())
1328        //                     } else {
1329        //                         live_kit.microphone_track = LocalTrack::None;
1330        //                         cx.notify();
1331        //                         Err(error)
1332        //                     }
1333        //                 }
1334        //             }
1335        //         })?
1336        // })
1337    }
1338
1339    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1340        todo!()
1341        // if self.status.is_offline() {
1342        //     return Task::ready(Err(anyhow!("room is offline")));
1343        // } else if self.is_screen_sharing() {
1344        //     return Task::ready(Err(anyhow!("screen was already shared")));
1345        // }
1346
1347        // let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1348        //     let publish_id = post_inc(&mut live_kit.next_publish_id);
1349        //     live_kit.screen_track = LocalTrack::Pending {
1350        //         publish_id,
1351        //         muted: false,
1352        //     };
1353        //     cx.notify();
1354        //     (live_kit.room.display_sources(), publish_id)
1355        // } else {
1356        //     return Task::ready(Err(anyhow!("live-kit was not initialized")));
1357        // };
1358
1359        // cx.spawn(move |this, mut cx| async move {
1360        //     let publish_track = async {
1361        //         let displays = displays.await?;
1362        //         let display = displays
1363        //             .first()
1364        //             .ok_or_else(|| anyhow!("no display found"))?;
1365        //         let track = LocalVideoTrack::screen_share_for_display(&display);
1366        //         this.upgrade()
1367        //             .ok_or_else(|| anyhow!("room was dropped"))?
1368        //             .update(&mut cx, |this, _| {
1369        //                 this.live_kit
1370        //                     .as_ref()
1371        //                     .map(|live_kit| live_kit.room.publish_video_track(track))
1372        //             })?
1373        //             .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1374        //             .await
1375        //     };
1376
1377        //     let publication = publish_track.await;
1378        //     this.upgrade()
1379        //         .ok_or_else(|| anyhow!("room was dropped"))?
1380        //         .update(&mut cx, |this, cx| {
1381        //             let live_kit = this
1382        //                 .live_kit
1383        //                 .as_mut()
1384        //                 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1385
1386        //             let (canceled, muted) = if let LocalTrack::Pending {
1387        //                 publish_id: cur_publish_id,
1388        //                 muted,
1389        //             } = &live_kit.screen_track
1390        //             {
1391        //                 (*cur_publish_id != publish_id, *muted)
1392        //             } else {
1393        //                 (true, false)
1394        //             };
1395
1396        //             match publication {
1397        //                 Ok(publication) => {
1398        //                     if canceled {
1399        //                         live_kit.room.unpublish_track(publication);
1400        //                     } else {
1401        //                         if muted {
1402        //                             cx.executor().spawn(publication.set_mute(muted)).detach();
1403        //                         }
1404        //                         live_kit.screen_track = LocalTrack::Published {
1405        //                             track_publication: publication,
1406        //                             muted,
1407        //                         };
1408        //                         cx.notify();
1409        //                     }
1410
1411        //                     Audio::play_sound(Sound::StartScreenshare, cx);
1412
1413        //                     Ok(())
1414        //                 }
1415        //                 Err(error) => {
1416        //                     if canceled {
1417        //                         Ok(())
1418        //                     } else {
1419        //                         live_kit.screen_track = LocalTrack::None;
1420        //                         cx.notify();
1421        //                         Err(error)
1422        //                     }
1423        //                 }
1424        //             }
1425        //         })?
1426        // })
1427    }
1428
1429    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1430        todo!()
1431        // let should_mute = !self.is_muted(cx);
1432        // if let Some(live_kit) = self.live_kit.as_mut() {
1433        //     if matches!(live_kit.microphone_track, LocalTrack::None) {
1434        //         return Ok(self.share_microphone(cx));
1435        //     }
1436
1437        //     let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1438        //     live_kit.muted_by_user = should_mute;
1439
1440        //     if old_muted == true && live_kit.deafened == true {
1441        //         if let Some(task) = self.toggle_deafen(cx).ok() {
1442        //             task.detach();
1443        //         }
1444        //     }
1445
1446        //     Ok(ret_task)
1447        // } else {
1448        //     Err(anyhow!("LiveKit not started"))
1449        // }
1450    }
1451
1452    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1453        todo!()
1454        // if let Some(live_kit) = self.live_kit.as_mut() {
1455        //     (*live_kit).deafened = !live_kit.deafened;
1456
1457        //     let mut tasks = Vec::with_capacity(self.remote_participants.len());
1458        //     // Context notification is sent within set_mute itself.
1459        //     let mut mute_task = None;
1460        //     // When deafening, mute user's mic as well.
1461        //     // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1462        //     if live_kit.deafened || !live_kit.muted_by_user {
1463        //         mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1464        //     };
1465        //     for participant in self.remote_participants.values() {
1466        //         for track in live_kit
1467        //             .room
1468        //             .remote_audio_track_publications(&participant.user.id.to_string())
1469        //         {
1470        //             let deafened = live_kit.deafened;
1471        //             tasks.push(
1472        //                 cx.executor()
1473        //                     .spawn_on_main(move || track.set_enabled(!deafened)),
1474        //             );
1475        //         }
1476        //     }
1477
1478        //     Ok(cx.executor().spawn_on_main(|| async {
1479        //         if let Some(mute_task) = mute_task {
1480        //             mute_task.await?;
1481        //         }
1482        //         for task in tasks {
1483        //             task.await?;
1484        //         }
1485        //         Ok(())
1486        //     }))
1487        // } else {
1488        //     Err(anyhow!("LiveKit not started"))
1489        // }
1490    }
1491
1492    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1493        if self.status.is_offline() {
1494            return Err(anyhow!("room is offline"));
1495        }
1496
1497        todo!()
1498        // let live_kit = self
1499        //     .live_kit
1500        //     .as_mut()
1501        //     .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1502        // match mem::take(&mut live_kit.screen_track) {
1503        //     LocalTrack::None => Err(anyhow!("screen was not shared")),
1504        //     LocalTrack::Pending { .. } => {
1505        //         cx.notify();
1506        //         Ok(())
1507        //     }
1508        //     LocalTrack::Published {
1509        //         track_publication, ..
1510        //     } => {
1511        //         live_kit.room.unpublish_track(track_publication);
1512        //         cx.notify();
1513
1514        //         Audio::play_sound(Sound::StopScreenshare, cx);
1515        //         Ok(())
1516        //     }
1517        // }
1518    }
1519
1520    #[cfg(any(test, feature = "test-support"))]
1521    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1522        todo!()
1523        // self.live_kit
1524        //     .as_ref()
1525        //     .unwrap()
1526        //     .room
1527        //     .set_display_sources(sources);
1528    }
1529}
1530
1531struct LiveKitRoom {
1532    room: Arc<live_kit_client::Room>,
1533    screen_track: LocalTrack,
1534    microphone_track: LocalTrack,
1535    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1536    muted_by_user: bool,
1537    deafened: bool,
1538    speaking: bool,
1539    next_publish_id: usize,
1540    _maintain_room: Task<()>,
1541    _maintain_tracks: [Task<()>; 2],
1542}
1543
1544impl LiveKitRoom {
1545    fn set_mute(
1546        self: &mut LiveKitRoom,
1547        should_mute: bool,
1548        cx: &mut ModelContext<Room>,
1549    ) -> Result<(Task<Result<()>>, bool)> {
1550        if !should_mute {
1551            // clear user muting state.
1552            self.muted_by_user = false;
1553        }
1554
1555        let (result, old_muted) = match &mut self.microphone_track {
1556            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1557            LocalTrack::Pending { muted, .. } => {
1558                let old_muted = *muted;
1559                *muted = should_mute;
1560                cx.notify();
1561                Ok((Task::Ready(Some(Ok(()))), old_muted))
1562            }
1563            LocalTrack::Published {
1564                track_publication,
1565                muted,
1566            } => {
1567                let old_muted = *muted;
1568                *muted = should_mute;
1569                cx.notify();
1570                Ok((
1571                    cx.executor().spawn(track_publication.set_mute(*muted)),
1572                    old_muted,
1573                ))
1574            }
1575        }?;
1576
1577        if old_muted != should_mute {
1578            if should_mute {
1579                Audio::play_sound(Sound::Mute, cx);
1580            } else {
1581                Audio::play_sound(Sound::Unmute, cx);
1582            }
1583        }
1584
1585        Ok((result, old_muted))
1586    }
1587}
1588
1589enum LocalTrack {
1590    None,
1591    Pending {
1592        publish_id: usize,
1593        muted: bool,
1594    },
1595    Published {
1596        track_publication: LocalTrackPublication,
1597        muted: bool,
1598    },
1599}
1600
1601impl Default for LocalTrack {
1602    fn default() -> Self {
1603        Self::None
1604    }
1605}
1606
1607#[derive(Copy, Clone, PartialEq, Eq)]
1608pub enum RoomStatus {
1609    Online,
1610    Rejoining,
1611    Offline,
1612}
1613
1614impl RoomStatus {
1615    pub fn is_offline(&self) -> bool {
1616        matches!(self, RoomStatus::Offline)
1617    }
1618
1619    pub fn is_online(&self) -> bool {
1620        matches!(self, RoomStatus::Online)
1621    }
1622}