room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   4    IncomingCall,
   5};
   6use anyhow::{anyhow, Result};
   7use audio::{Audio, Sound};
   8use client::{
   9    proto::{self, PeerId},
  10    Client, TypedEnvelope, User, UserStore,
  11};
  12use collections::{BTreeMap, HashMap, HashSet};
  13use fs::Fs;
  14use futures::{FutureExt, StreamExt};
  15use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  16use language::LanguageRegistry;
  17use live_kit_client::{
  18    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  19    RemoteVideoTrackUpdate,
  20};
  21use postage::stream::Stream;
  22use project::Project;
  23use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  24use util::{post_inc, ResultExt, TryFutureExt};
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28#[derive(Clone, Debug, PartialEq, Eq)]
  29pub enum Event {
  30    ParticipantLocationChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteVideoTracksChanged {
  34        participant_id: proto::PeerId,
  35    },
  36    RemoteAudioTracksChanged {
  37        participant_id: proto::PeerId,
  38    },
  39    RemoteProjectShared {
  40        owner: Arc<User>,
  41        project_id: u64,
  42        worktree_root_names: Vec<String>,
  43    },
  44    RemoteProjectUnshared {
  45        project_id: u64,
  46    },
  47    Left,
  48}
  49
  50pub struct Room {
  51    id: u64,
  52    live_kit: Option<LiveKitRoom>,
  53    status: RoomStatus,
  54    shared_projects: HashSet<WeakModelHandle<Project>>,
  55    joined_projects: HashSet<WeakModelHandle<Project>>,
  56    local_participant: LocalParticipant,
  57    remote_participants: BTreeMap<u64, RemoteParticipant>,
  58    pending_participants: Vec<Arc<User>>,
  59    participant_user_ids: HashSet<u64>,
  60    pending_call_count: usize,
  61    leave_when_empty: bool,
  62    client: Arc<Client>,
  63    user_store: ModelHandle<UserStore>,
  64    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  65    subscriptions: Vec<client::Subscription>,
  66    pending_room_update: Option<Task<()>>,
  67    maintain_connection: Option<Task<Option<()>>>,
  68}
  69
  70impl Entity for Room {
  71    type Event = Event;
  72
  73    fn release(&mut self, cx: &mut AppContext) {
  74        if self.status.is_online() {
  75            self.leave_internal(cx).detach_and_log_err(cx);
  76        }
  77    }
  78
  79    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  80        if self.status.is_online() {
  81            let leave = self.leave_internal(cx);
  82            Some(
  83                cx.background()
  84                    .spawn(async move {
  85                        leave.await.log_err();
  86                    })
  87                    .boxed(),
  88            )
  89        } else {
  90            None
  91        }
  92    }
  93}
  94
  95impl Room {
  96    fn new(
  97        id: u64,
  98        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  99        client: Arc<Client>,
 100        user_store: ModelHandle<UserStore>,
 101        cx: &mut ModelContext<Self>,
 102    ) -> Self {
 103        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 104            let room = live_kit_client::Room::new();
 105            let mut status = room.status();
 106            // Consume the initial status of the room.
 107            let _ = status.try_recv();
 108            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 109                while let Some(status) = status.next().await {
 110                    let this = if let Some(this) = this.upgrade(&cx) {
 111                        this
 112                    } else {
 113                        break;
 114                    };
 115
 116                    if status == live_kit_client::ConnectionState::Disconnected {
 117                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 118                        break;
 119                    }
 120                }
 121            });
 122
 123            let mut track_video_changes = room.remote_video_track_updates();
 124            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 125                while let Some(track_change) = track_video_changes.next().await {
 126                    let this = if let Some(this) = this.upgrade(&cx) {
 127                        this
 128                    } else {
 129                        break;
 130                    };
 131
 132                    this.update(&mut cx, |this, cx| {
 133                        this.remote_video_track_updated(track_change, cx).log_err()
 134                    });
 135                }
 136            });
 137
 138            let mut track_audio_changes = room.remote_audio_track_updates();
 139            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 140                while let Some(track_change) = track_audio_changes.next().await {
 141                    let this = if let Some(this) = this.upgrade(&cx) {
 142                        this
 143                    } else {
 144                        break;
 145                    };
 146
 147                    this.update(&mut cx, |this, cx| {
 148                        this.remote_audio_track_updated(track_change, cx).log_err()
 149                    });
 150                }
 151            });
 152
 153            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 154            cx.spawn(|this, mut cx| async move {
 155                connect.await?;
 156
 157                if !cx.read(|cx| settings::get::<CallSettings>(cx).mute_on_join) {
 158                    this.update(&mut cx, |this, cx| this.share_microphone(cx))
 159                        .await?;
 160                }
 161
 162                anyhow::Ok(())
 163            })
 164            .detach_and_log_err(cx);
 165
 166            Some(LiveKitRoom {
 167                room,
 168                screen_track: LocalTrack::None,
 169                microphone_track: LocalTrack::None,
 170                next_publish_id: 0,
 171                muted_by_user: false,
 172                deafened: false,
 173                speaking: false,
 174                _maintain_room,
 175                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 176            })
 177        } else {
 178            None
 179        };
 180
 181        let maintain_connection =
 182            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 183
 184        Audio::play_sound(Sound::Joined, cx);
 185
 186        Self {
 187            id,
 188            live_kit: live_kit_room,
 189            status: RoomStatus::Online,
 190            shared_projects: Default::default(),
 191            joined_projects: Default::default(),
 192            participant_user_ids: Default::default(),
 193            local_participant: Default::default(),
 194            remote_participants: Default::default(),
 195            pending_participants: Default::default(),
 196            pending_call_count: 0,
 197            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 198            leave_when_empty: false,
 199            pending_room_update: None,
 200            client,
 201            user_store,
 202            follows_by_leader_id_project_id: Default::default(),
 203            maintain_connection: Some(maintain_connection),
 204        }
 205    }
 206
 207    pub(crate) fn create(
 208        called_user_id: u64,
 209        initial_project: Option<ModelHandle<Project>>,
 210        client: Arc<Client>,
 211        user_store: ModelHandle<UserStore>,
 212        cx: &mut AppContext,
 213    ) -> Task<Result<ModelHandle<Self>>> {
 214        cx.spawn(|mut cx| async move {
 215            let response = client.request(proto::CreateRoom {}).await?;
 216            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 217            let room = cx.add_model(|cx| {
 218                Self::new(
 219                    room_proto.id,
 220                    response.live_kit_connection_info,
 221                    client,
 222                    user_store,
 223                    cx,
 224                )
 225            });
 226
 227            let initial_project_id = if let Some(initial_project) = initial_project {
 228                let initial_project_id = room
 229                    .update(&mut cx, |room, cx| {
 230                        room.share_project(initial_project.clone(), cx)
 231                    })
 232                    .await?;
 233                Some(initial_project_id)
 234            } else {
 235                None
 236            };
 237
 238            match room
 239                .update(&mut cx, |room, cx| {
 240                    room.leave_when_empty = true;
 241                    room.call(called_user_id, initial_project_id, cx)
 242                })
 243                .await
 244            {
 245                Ok(()) => Ok(room),
 246                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 247            }
 248        })
 249    }
 250
 251    pub(crate) fn join(
 252        call: &IncomingCall,
 253        client: Arc<Client>,
 254        user_store: ModelHandle<UserStore>,
 255        cx: &mut AppContext,
 256    ) -> Task<Result<ModelHandle<Self>>> {
 257        let room_id = call.room_id;
 258        cx.spawn(|mut cx| async move {
 259            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 260            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 261            let room = cx.add_model(|cx| {
 262                Self::new(
 263                    room_id,
 264                    response.live_kit_connection_info,
 265                    client,
 266                    user_store,
 267                    cx,
 268                )
 269            });
 270            room.update(&mut cx, |room, cx| {
 271                room.leave_when_empty = true;
 272                room.apply_room_update(room_proto, cx)?;
 273                anyhow::Ok(())
 274            })?;
 275
 276            Ok(room)
 277        })
 278    }
 279
 280    fn should_leave(&self) -> bool {
 281        self.leave_when_empty
 282            && self.pending_room_update.is_none()
 283            && self.pending_participants.is_empty()
 284            && self.remote_participants.is_empty()
 285            && self.pending_call_count == 0
 286    }
 287
 288    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 289        cx.notify();
 290        cx.emit(Event::Left);
 291        self.leave_internal(cx)
 292    }
 293
 294    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 295        if self.status.is_offline() {
 296            return Task::ready(Err(anyhow!("room is offline")));
 297        }
 298
 299        log::info!("leaving room");
 300
 301        for project in self.shared_projects.drain() {
 302            if let Some(project) = project.upgrade(cx) {
 303                project.update(cx, |project, cx| {
 304                    project.unshare(cx).log_err();
 305                });
 306            }
 307        }
 308        for project in self.joined_projects.drain() {
 309            if let Some(project) = project.upgrade(cx) {
 310                project.update(cx, |project, cx| {
 311                    project.disconnected_from_host(cx);
 312                    project.close(cx);
 313                });
 314            }
 315        }
 316
 317        Audio::play_sound(Sound::Leave, cx);
 318
 319        self.status = RoomStatus::Offline;
 320        self.remote_participants.clear();
 321        self.pending_participants.clear();
 322        self.participant_user_ids.clear();
 323        self.subscriptions.clear();
 324        self.live_kit.take();
 325        self.pending_room_update.take();
 326        self.maintain_connection.take();
 327
 328        let leave_room = self.client.request(proto::LeaveRoom {});
 329        cx.background().spawn(async move {
 330            leave_room.await?;
 331            anyhow::Ok(())
 332        })
 333    }
 334
 335    async fn maintain_connection(
 336        this: WeakModelHandle<Self>,
 337        client: Arc<Client>,
 338        mut cx: AsyncAppContext,
 339    ) -> Result<()> {
 340        let mut client_status = client.status();
 341        loop {
 342            let _ = client_status.try_recv();
 343            let is_connected = client_status.borrow().is_connected();
 344            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 345            if !is_connected || client_status.next().await.is_some() {
 346                log::info!("detected client disconnection");
 347
 348                this.upgrade(&cx)
 349                    .ok_or_else(|| anyhow!("room was dropped"))?
 350                    .update(&mut cx, |this, cx| {
 351                        this.status = RoomStatus::Rejoining;
 352                        cx.notify();
 353                    });
 354
 355                // Wait for client to re-establish a connection to the server.
 356                {
 357                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 358                    let client_reconnection = async {
 359                        let mut remaining_attempts = 3;
 360                        while remaining_attempts > 0 {
 361                            if client_status.borrow().is_connected() {
 362                                log::info!("client reconnected, attempting to rejoin room");
 363
 364                                let Some(this) = this.upgrade(&cx) else { break };
 365                                if this
 366                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 367                                    .await
 368                                    .log_err()
 369                                    .is_some()
 370                                {
 371                                    return true;
 372                                } else {
 373                                    remaining_attempts -= 1;
 374                                }
 375                            } else if client_status.borrow().is_signed_out() {
 376                                return false;
 377                            }
 378
 379                            log::info!(
 380                                "waiting for client status change, remaining attempts {}",
 381                                remaining_attempts
 382                            );
 383                            client_status.next().await;
 384                        }
 385                        false
 386                    }
 387                    .fuse();
 388                    futures::pin_mut!(client_reconnection);
 389
 390                    futures::select_biased! {
 391                        reconnected = client_reconnection => {
 392                            if reconnected {
 393                                log::info!("successfully reconnected to room");
 394                                // If we successfully joined the room, go back around the loop
 395                                // waiting for future connection status changes.
 396                                continue;
 397                            }
 398                        }
 399                        _ = reconnection_timeout => {
 400                            log::info!("room reconnection timeout expired");
 401                        }
 402                    }
 403                }
 404
 405                break;
 406            }
 407        }
 408
 409        // The client failed to re-establish a connection to the server
 410        // or an error occurred while trying to re-join the room. Either way
 411        // we leave the room and return an error.
 412        if let Some(this) = this.upgrade(&cx) {
 413            log::info!("reconnection failed, leaving room");
 414            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 415        }
 416        Err(anyhow!(
 417            "can't reconnect to room: client failed to re-establish connection"
 418        ))
 419    }
 420
 421    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 422        let mut projects = HashMap::default();
 423        let mut reshared_projects = Vec::new();
 424        let mut rejoined_projects = Vec::new();
 425        self.shared_projects.retain(|project| {
 426            if let Some(handle) = project.upgrade(cx) {
 427                let project = handle.read(cx);
 428                if let Some(project_id) = project.remote_id() {
 429                    projects.insert(project_id, handle.clone());
 430                    reshared_projects.push(proto::UpdateProject {
 431                        project_id,
 432                        worktrees: project.worktree_metadata_protos(cx),
 433                    });
 434                    return true;
 435                }
 436            }
 437            false
 438        });
 439        self.joined_projects.retain(|project| {
 440            if let Some(handle) = project.upgrade(cx) {
 441                let project = handle.read(cx);
 442                if let Some(project_id) = project.remote_id() {
 443                    projects.insert(project_id, handle.clone());
 444                    rejoined_projects.push(proto::RejoinProject {
 445                        id: project_id,
 446                        worktrees: project
 447                            .worktrees(cx)
 448                            .map(|worktree| {
 449                                let worktree = worktree.read(cx);
 450                                proto::RejoinWorktree {
 451                                    id: worktree.id().to_proto(),
 452                                    scan_id: worktree.completed_scan_id() as u64,
 453                                }
 454                            })
 455                            .collect(),
 456                    });
 457                }
 458                return true;
 459            }
 460            false
 461        });
 462
 463        let response = self.client.request_envelope(proto::RejoinRoom {
 464            id: self.id,
 465            reshared_projects,
 466            rejoined_projects,
 467        });
 468
 469        cx.spawn(|this, mut cx| async move {
 470            let response = response.await?;
 471            let message_id = response.message_id;
 472            let response = response.payload;
 473            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 474            this.update(&mut cx, |this, cx| {
 475                this.status = RoomStatus::Online;
 476                this.apply_room_update(room_proto, cx)?;
 477
 478                for reshared_project in response.reshared_projects {
 479                    if let Some(project) = projects.get(&reshared_project.id) {
 480                        project.update(cx, |project, cx| {
 481                            project.reshared(reshared_project, cx).log_err();
 482                        });
 483                    }
 484                }
 485
 486                for rejoined_project in response.rejoined_projects {
 487                    if let Some(project) = projects.get(&rejoined_project.id) {
 488                        project.update(cx, |project, cx| {
 489                            project.rejoined(rejoined_project, message_id, cx).log_err();
 490                        });
 491                    }
 492                }
 493
 494                anyhow::Ok(())
 495            })
 496        })
 497    }
 498
 499    pub fn id(&self) -> u64 {
 500        self.id
 501    }
 502
 503    pub fn status(&self) -> RoomStatus {
 504        self.status
 505    }
 506
 507    pub fn local_participant(&self) -> &LocalParticipant {
 508        &self.local_participant
 509    }
 510
 511    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 512        &self.remote_participants
 513    }
 514
 515    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 516        self.remote_participants
 517            .values()
 518            .find(|p| p.peer_id == peer_id)
 519    }
 520
 521    pub fn pending_participants(&self) -> &[Arc<User>] {
 522        &self.pending_participants
 523    }
 524
 525    pub fn contains_participant(&self, user_id: u64) -> bool {
 526        self.participant_user_ids.contains(&user_id)
 527    }
 528
 529    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 530        self.follows_by_leader_id_project_id
 531            .get(&(leader_id, project_id))
 532            .map_or(&[], |v| v.as_slice())
 533    }
 534
 535    async fn handle_room_updated(
 536        this: ModelHandle<Self>,
 537        envelope: TypedEnvelope<proto::RoomUpdated>,
 538        _: Arc<Client>,
 539        mut cx: AsyncAppContext,
 540    ) -> Result<()> {
 541        let room = envelope
 542            .payload
 543            .room
 544            .ok_or_else(|| anyhow!("invalid room"))?;
 545        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 546    }
 547
 548    fn apply_room_update(
 549        &mut self,
 550        mut room: proto::Room,
 551        cx: &mut ModelContext<Self>,
 552    ) -> Result<()> {
 553        // Filter ourselves out from the room's participants.
 554        let local_participant_ix = room
 555            .participants
 556            .iter()
 557            .position(|participant| Some(participant.user_id) == self.client.user_id());
 558        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 559
 560        let pending_participant_user_ids = room
 561            .pending_participants
 562            .iter()
 563            .map(|p| p.user_id)
 564            .collect::<Vec<_>>();
 565
 566        let remote_participant_user_ids = room
 567            .participants
 568            .iter()
 569            .map(|p| p.user_id)
 570            .collect::<Vec<_>>();
 571
 572        let (remote_participants, pending_participants) =
 573            self.user_store.update(cx, move |user_store, cx| {
 574                (
 575                    user_store.get_users(remote_participant_user_ids, cx),
 576                    user_store.get_users(pending_participant_user_ids, cx),
 577                )
 578            });
 579
 580        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 581            let (remote_participants, pending_participants) =
 582                futures::join!(remote_participants, pending_participants);
 583
 584            this.update(&mut cx, |this, cx| {
 585                this.participant_user_ids.clear();
 586
 587                if let Some(participant) = local_participant {
 588                    this.local_participant.projects = participant.projects;
 589                } else {
 590                    this.local_participant.projects.clear();
 591                }
 592
 593                if let Some(participants) = remote_participants.log_err() {
 594                    for (participant, user) in room.participants.into_iter().zip(participants) {
 595                        let Some(peer_id) = participant.peer_id else { continue };
 596                        this.participant_user_ids.insert(participant.user_id);
 597
 598                        let old_projects = this
 599                            .remote_participants
 600                            .get(&participant.user_id)
 601                            .into_iter()
 602                            .flat_map(|existing| &existing.projects)
 603                            .map(|project| project.id)
 604                            .collect::<HashSet<_>>();
 605                        let new_projects = participant
 606                            .projects
 607                            .iter()
 608                            .map(|project| project.id)
 609                            .collect::<HashSet<_>>();
 610
 611                        for project in &participant.projects {
 612                            if !old_projects.contains(&project.id) {
 613                                cx.emit(Event::RemoteProjectShared {
 614                                    owner: user.clone(),
 615                                    project_id: project.id,
 616                                    worktree_root_names: project.worktree_root_names.clone(),
 617                                });
 618                            }
 619                        }
 620
 621                        for unshared_project_id in old_projects.difference(&new_projects) {
 622                            this.joined_projects.retain(|project| {
 623                                if let Some(project) = project.upgrade(cx) {
 624                                    project.update(cx, |project, cx| {
 625                                        if project.remote_id() == Some(*unshared_project_id) {
 626                                            project.disconnected_from_host(cx);
 627                                            false
 628                                        } else {
 629                                            true
 630                                        }
 631                                    })
 632                                } else {
 633                                    false
 634                                }
 635                            });
 636                            cx.emit(Event::RemoteProjectUnshared {
 637                                project_id: *unshared_project_id,
 638                            });
 639                        }
 640
 641                        let location = ParticipantLocation::from_proto(participant.location)
 642                            .unwrap_or(ParticipantLocation::External);
 643                        if let Some(remote_participant) =
 644                            this.remote_participants.get_mut(&participant.user_id)
 645                        {
 646                            remote_participant.projects = participant.projects;
 647                            remote_participant.peer_id = peer_id;
 648                            if location != remote_participant.location {
 649                                remote_participant.location = location;
 650                                cx.emit(Event::ParticipantLocationChanged {
 651                                    participant_id: peer_id,
 652                                });
 653                            }
 654                        } else {
 655                            this.remote_participants.insert(
 656                                participant.user_id,
 657                                RemoteParticipant {
 658                                    user: user.clone(),
 659                                    peer_id,
 660                                    projects: participant.projects,
 661                                    location,
 662                                    muted: true,
 663                                    speaking: false,
 664                                    video_tracks: Default::default(),
 665                                    audio_tracks: Default::default(),
 666                                },
 667                            );
 668
 669                            Audio::play_sound(Sound::Joined, cx);
 670
 671                            if let Some(live_kit) = this.live_kit.as_ref() {
 672                                let video_tracks =
 673                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 674                                let audio_tracks =
 675                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 676                                let publications = live_kit
 677                                    .room
 678                                    .remote_audio_track_publications(&user.id.to_string());
 679
 680                                for track in video_tracks {
 681                                    this.remote_video_track_updated(
 682                                        RemoteVideoTrackUpdate::Subscribed(track),
 683                                        cx,
 684                                    )
 685                                    .log_err();
 686                                }
 687
 688                                for (track, publication) in
 689                                    audio_tracks.iter().zip(publications.iter())
 690                                {
 691                                    this.remote_audio_track_updated(
 692                                        RemoteAudioTrackUpdate::Subscribed(
 693                                            track.clone(),
 694                                            publication.clone(),
 695                                        ),
 696                                        cx,
 697                                    )
 698                                    .log_err();
 699                                }
 700                            }
 701                        }
 702                    }
 703
 704                    this.remote_participants.retain(|user_id, participant| {
 705                        if this.participant_user_ids.contains(user_id) {
 706                            true
 707                        } else {
 708                            for project in &participant.projects {
 709                                cx.emit(Event::RemoteProjectUnshared {
 710                                    project_id: project.id,
 711                                });
 712                            }
 713                            false
 714                        }
 715                    });
 716                }
 717
 718                if let Some(pending_participants) = pending_participants.log_err() {
 719                    this.pending_participants = pending_participants;
 720                    for participant in &this.pending_participants {
 721                        this.participant_user_ids.insert(participant.id);
 722                    }
 723                }
 724
 725                this.follows_by_leader_id_project_id.clear();
 726                for follower in room.followers {
 727                    let project_id = follower.project_id;
 728                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 729                        (Some(leader), Some(follower)) => (leader, follower),
 730
 731                        _ => {
 732                            log::error!("Follower message {follower:?} missing some state");
 733                            continue;
 734                        }
 735                    };
 736
 737                    let list = this
 738                        .follows_by_leader_id_project_id
 739                        .entry((leader, project_id))
 740                        .or_insert(Vec::new());
 741                    if !list.contains(&follower) {
 742                        list.push(follower);
 743                    }
 744                }
 745
 746                this.pending_room_update.take();
 747                if this.should_leave() {
 748                    log::info!("room is empty, leaving");
 749                    let _ = this.leave(cx);
 750                }
 751
 752                this.check_invariants();
 753                cx.notify();
 754            });
 755        }));
 756
 757        cx.notify();
 758        Ok(())
 759    }
 760
 761    fn remote_video_track_updated(
 762        &mut self,
 763        change: RemoteVideoTrackUpdate,
 764        cx: &mut ModelContext<Self>,
 765    ) -> Result<()> {
 766        match change {
 767            RemoteVideoTrackUpdate::Subscribed(track) => {
 768                let user_id = track.publisher_id().parse()?;
 769                let track_id = track.sid().to_string();
 770                let participant = self
 771                    .remote_participants
 772                    .get_mut(&user_id)
 773                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 774                participant.video_tracks.insert(
 775                    track_id.clone(),
 776                    Arc::new(RemoteVideoTrack {
 777                        live_kit_track: track,
 778                    }),
 779                );
 780                cx.emit(Event::RemoteVideoTracksChanged {
 781                    participant_id: participant.peer_id,
 782                });
 783            }
 784            RemoteVideoTrackUpdate::Unsubscribed {
 785                publisher_id,
 786                track_id,
 787            } => {
 788                let user_id = publisher_id.parse()?;
 789                let participant = self
 790                    .remote_participants
 791                    .get_mut(&user_id)
 792                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 793                participant.video_tracks.remove(&track_id);
 794                cx.emit(Event::RemoteVideoTracksChanged {
 795                    participant_id: participant.peer_id,
 796                });
 797            }
 798        }
 799
 800        cx.notify();
 801        Ok(())
 802    }
 803
 804    fn remote_audio_track_updated(
 805        &mut self,
 806        change: RemoteAudioTrackUpdate,
 807        cx: &mut ModelContext<Self>,
 808    ) -> Result<()> {
 809        match change {
 810            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 811                let mut speaker_ids = speakers
 812                    .into_iter()
 813                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 814                    .collect::<Vec<u64>>();
 815                speaker_ids.sort_unstable();
 816                for (sid, participant) in &mut self.remote_participants {
 817                    if let Ok(_) = speaker_ids.binary_search(sid) {
 818                        participant.speaking = true;
 819                    } else {
 820                        participant.speaking = false;
 821                    }
 822                }
 823                if let Some(id) = self.client.user_id() {
 824                    if let Some(room) = &mut self.live_kit {
 825                        if let Ok(_) = speaker_ids.binary_search(&id) {
 826                            room.speaking = true;
 827                        } else {
 828                            room.speaking = false;
 829                        }
 830                    }
 831                }
 832                cx.notify();
 833            }
 834            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 835                let mut found = false;
 836                for participant in &mut self.remote_participants.values_mut() {
 837                    for track in participant.audio_tracks.values() {
 838                        if track.sid() == track_id {
 839                            found = true;
 840                            break;
 841                        }
 842                    }
 843                    if found {
 844                        participant.muted = muted;
 845                        break;
 846                    }
 847                }
 848
 849                cx.notify();
 850            }
 851            RemoteAudioTrackUpdate::Subscribed(track, publication) => {
 852                let user_id = track.publisher_id().parse()?;
 853                let track_id = track.sid().to_string();
 854                let participant = self
 855                    .remote_participants
 856                    .get_mut(&user_id)
 857                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 858
 859                participant.audio_tracks.insert(track_id.clone(), track);
 860                participant.muted = publication.is_muted();
 861
 862                cx.emit(Event::RemoteAudioTracksChanged {
 863                    participant_id: participant.peer_id,
 864                });
 865            }
 866            RemoteAudioTrackUpdate::Unsubscribed {
 867                publisher_id,
 868                track_id,
 869            } => {
 870                let user_id = publisher_id.parse()?;
 871                let participant = self
 872                    .remote_participants
 873                    .get_mut(&user_id)
 874                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 875                participant.audio_tracks.remove(&track_id);
 876                cx.emit(Event::RemoteAudioTracksChanged {
 877                    participant_id: participant.peer_id,
 878                });
 879            }
 880        }
 881
 882        cx.notify();
 883        Ok(())
 884    }
 885
 886    fn check_invariants(&self) {
 887        #[cfg(any(test, feature = "test-support"))]
 888        {
 889            for participant in self.remote_participants.values() {
 890                assert!(self.participant_user_ids.contains(&participant.user.id));
 891                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 892            }
 893
 894            for participant in &self.pending_participants {
 895                assert!(self.participant_user_ids.contains(&participant.id));
 896                assert_ne!(participant.id, self.client.user_id().unwrap());
 897            }
 898
 899            assert_eq!(
 900                self.participant_user_ids.len(),
 901                self.remote_participants.len() + self.pending_participants.len()
 902            );
 903        }
 904    }
 905
 906    pub(crate) fn call(
 907        &mut self,
 908        called_user_id: u64,
 909        initial_project_id: Option<u64>,
 910        cx: &mut ModelContext<Self>,
 911    ) -> Task<Result<()>> {
 912        if self.status.is_offline() {
 913            return Task::ready(Err(anyhow!("room is offline")));
 914        }
 915
 916        cx.notify();
 917        let client = self.client.clone();
 918        let room_id = self.id;
 919        self.pending_call_count += 1;
 920        cx.spawn(|this, mut cx| async move {
 921            let result = client
 922                .request(proto::Call {
 923                    room_id,
 924                    called_user_id,
 925                    initial_project_id,
 926                })
 927                .await;
 928            this.update(&mut cx, |this, cx| {
 929                this.pending_call_count -= 1;
 930                if this.should_leave() {
 931                    this.leave(cx).detach_and_log_err(cx);
 932                }
 933            });
 934            result?;
 935            Ok(())
 936        })
 937    }
 938
 939    pub fn join_project(
 940        &mut self,
 941        id: u64,
 942        language_registry: Arc<LanguageRegistry>,
 943        fs: Arc<dyn Fs>,
 944        cx: &mut ModelContext<Self>,
 945    ) -> Task<Result<ModelHandle<Project>>> {
 946        let client = self.client.clone();
 947        let user_store = self.user_store.clone();
 948        cx.spawn(|this, mut cx| async move {
 949            let project =
 950                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 951
 952            this.update(&mut cx, |this, cx| {
 953                this.joined_projects.retain(|project| {
 954                    if let Some(project) = project.upgrade(cx) {
 955                        !project.read(cx).is_read_only()
 956                    } else {
 957                        false
 958                    }
 959                });
 960                this.joined_projects.insert(project.downgrade());
 961            });
 962            Ok(project)
 963        })
 964    }
 965
 966    pub(crate) fn share_project(
 967        &mut self,
 968        project: ModelHandle<Project>,
 969        cx: &mut ModelContext<Self>,
 970    ) -> Task<Result<u64>> {
 971        if let Some(project_id) = project.read(cx).remote_id() {
 972            return Task::ready(Ok(project_id));
 973        }
 974
 975        let request = self.client.request(proto::ShareProject {
 976            room_id: self.id(),
 977            worktrees: project.read(cx).worktree_metadata_protos(cx),
 978        });
 979        cx.spawn(|this, mut cx| async move {
 980            let response = request.await?;
 981
 982            project.update(&mut cx, |project, cx| {
 983                project.shared(response.project_id, cx)
 984            })?;
 985
 986            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 987            this.update(&mut cx, |this, cx| {
 988                this.shared_projects.insert(project.downgrade());
 989                let active_project = this.local_participant.active_project.as_ref();
 990                if active_project.map_or(false, |location| *location == project) {
 991                    this.set_location(Some(&project), cx)
 992                } else {
 993                    Task::ready(Ok(()))
 994                }
 995            })
 996            .await?;
 997
 998            Ok(response.project_id)
 999        })
1000    }
1001
1002    pub(crate) fn unshare_project(
1003        &mut self,
1004        project: ModelHandle<Project>,
1005        cx: &mut ModelContext<Self>,
1006    ) -> Result<()> {
1007        let project_id = match project.read(cx).remote_id() {
1008            Some(project_id) => project_id,
1009            None => return Ok(()),
1010        };
1011
1012        self.client.send(proto::UnshareProject { project_id })?;
1013        project.update(cx, |this, cx| this.unshare(cx))
1014    }
1015
1016    pub(crate) fn set_location(
1017        &mut self,
1018        project: Option<&ModelHandle<Project>>,
1019        cx: &mut ModelContext<Self>,
1020    ) -> Task<Result<()>> {
1021        if self.status.is_offline() {
1022            return Task::ready(Err(anyhow!("room is offline")));
1023        }
1024
1025        let client = self.client.clone();
1026        let room_id = self.id;
1027        let location = if let Some(project) = project {
1028            self.local_participant.active_project = Some(project.downgrade());
1029            if let Some(project_id) = project.read(cx).remote_id() {
1030                proto::participant_location::Variant::SharedProject(
1031                    proto::participant_location::SharedProject { id: project_id },
1032                )
1033            } else {
1034                proto::participant_location::Variant::UnsharedProject(
1035                    proto::participant_location::UnsharedProject {},
1036                )
1037            }
1038        } else {
1039            self.local_participant.active_project = None;
1040            proto::participant_location::Variant::External(proto::participant_location::External {})
1041        };
1042
1043        cx.notify();
1044        cx.foreground().spawn(async move {
1045            client
1046                .request(proto::UpdateParticipantLocation {
1047                    room_id,
1048                    location: Some(proto::ParticipantLocation {
1049                        variant: Some(location),
1050                    }),
1051                })
1052                .await?;
1053            Ok(())
1054        })
1055    }
1056
1057    pub fn is_screen_sharing(&self) -> bool {
1058        self.live_kit.as_ref().map_or(false, |live_kit| {
1059            !matches!(live_kit.screen_track, LocalTrack::None)
1060        })
1061    }
1062
1063    pub fn is_sharing_mic(&self) -> bool {
1064        self.live_kit.as_ref().map_or(false, |live_kit| {
1065            !matches!(live_kit.microphone_track, LocalTrack::None)
1066        })
1067    }
1068
1069    pub fn is_muted(&self) -> bool {
1070        self.live_kit
1071            .as_ref()
1072            .and_then(|live_kit| match &live_kit.microphone_track {
1073                LocalTrack::None => Some(true),
1074                LocalTrack::Pending { muted, .. } => Some(*muted),
1075                LocalTrack::Published { muted, .. } => Some(*muted),
1076            })
1077            .unwrap_or(false)
1078    }
1079
1080    pub fn is_speaking(&self) -> bool {
1081        self.live_kit
1082            .as_ref()
1083            .map_or(false, |live_kit| live_kit.speaking)
1084    }
1085
1086    pub fn is_deafened(&self) -> Option<bool> {
1087        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1088    }
1089
1090    #[track_caller]
1091    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1092        if self.status.is_offline() {
1093            return Task::ready(Err(anyhow!("room is offline")));
1094        } else if self.is_sharing_mic() {
1095            return Task::ready(Err(anyhow!("microphone was already shared")));
1096        }
1097
1098        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1099            let publish_id = post_inc(&mut live_kit.next_publish_id);
1100            live_kit.microphone_track = LocalTrack::Pending {
1101                publish_id,
1102                muted: false,
1103            };
1104            cx.notify();
1105            publish_id
1106        } else {
1107            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1108        };
1109
1110        cx.spawn_weak(|this, mut cx| async move {
1111            let publish_track = async {
1112                let track = LocalAudioTrack::create();
1113                this.upgrade(&cx)
1114                    .ok_or_else(|| anyhow!("room was dropped"))?
1115                    .read_with(&cx, |this, _| {
1116                        this.live_kit
1117                            .as_ref()
1118                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1119                    })
1120                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1121                    .await
1122            };
1123
1124            let publication = publish_track.await;
1125            this.upgrade(&cx)
1126                .ok_or_else(|| anyhow!("room was dropped"))?
1127                .update(&mut cx, |this, cx| {
1128                    let live_kit = this
1129                        .live_kit
1130                        .as_mut()
1131                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1132
1133                    let (canceled, muted) = if let LocalTrack::Pending {
1134                        publish_id: cur_publish_id,
1135                        muted,
1136                    } = &live_kit.microphone_track
1137                    {
1138                        (*cur_publish_id != publish_id, *muted)
1139                    } else {
1140                        (true, false)
1141                    };
1142
1143                    match publication {
1144                        Ok(publication) => {
1145                            if canceled {
1146                                live_kit.room.unpublish_track(publication);
1147                            } else {
1148                                if muted {
1149                                    cx.background().spawn(publication.set_mute(muted)).detach();
1150                                }
1151                                live_kit.microphone_track = LocalTrack::Published {
1152                                    track_publication: publication,
1153                                    muted,
1154                                };
1155                                cx.notify();
1156                            }
1157                            Ok(())
1158                        }
1159                        Err(error) => {
1160                            if canceled {
1161                                Ok(())
1162                            } else {
1163                                live_kit.microphone_track = LocalTrack::None;
1164                                cx.notify();
1165                                Err(error)
1166                            }
1167                        }
1168                    }
1169                })
1170        })
1171    }
1172
1173    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1174        if self.status.is_offline() {
1175            return Task::ready(Err(anyhow!("room is offline")));
1176        } else if self.is_screen_sharing() {
1177            return Task::ready(Err(anyhow!("screen was already shared")));
1178        }
1179
1180        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1181            let publish_id = post_inc(&mut live_kit.next_publish_id);
1182            live_kit.screen_track = LocalTrack::Pending {
1183                publish_id,
1184                muted: false,
1185            };
1186            cx.notify();
1187            (live_kit.room.display_sources(), publish_id)
1188        } else {
1189            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1190        };
1191
1192        cx.spawn_weak(|this, mut cx| async move {
1193            let publish_track = async {
1194                let displays = displays.await?;
1195                let display = displays
1196                    .first()
1197                    .ok_or_else(|| anyhow!("no display found"))?;
1198                let track = LocalVideoTrack::screen_share_for_display(&display);
1199                this.upgrade(&cx)
1200                    .ok_or_else(|| anyhow!("room was dropped"))?
1201                    .read_with(&cx, |this, _| {
1202                        this.live_kit
1203                            .as_ref()
1204                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1205                    })
1206                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1207                    .await
1208            };
1209
1210            let publication = publish_track.await;
1211            this.upgrade(&cx)
1212                .ok_or_else(|| anyhow!("room was dropped"))?
1213                .update(&mut cx, |this, cx| {
1214                    let live_kit = this
1215                        .live_kit
1216                        .as_mut()
1217                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1218
1219                    let (canceled, muted) = if let LocalTrack::Pending {
1220                        publish_id: cur_publish_id,
1221                        muted,
1222                    } = &live_kit.screen_track
1223                    {
1224                        (*cur_publish_id != publish_id, *muted)
1225                    } else {
1226                        (true, false)
1227                    };
1228
1229                    match publication {
1230                        Ok(publication) => {
1231                            if canceled {
1232                                live_kit.room.unpublish_track(publication);
1233                            } else {
1234                                if muted {
1235                                    cx.background().spawn(publication.set_mute(muted)).detach();
1236                                }
1237                                live_kit.screen_track = LocalTrack::Published {
1238                                    track_publication: publication,
1239                                    muted,
1240                                };
1241                                cx.notify();
1242                            }
1243
1244                            Audio::play_sound(Sound::StartScreenshare, cx);
1245
1246                            Ok(())
1247                        }
1248                        Err(error) => {
1249                            if canceled {
1250                                Ok(())
1251                            } else {
1252                                live_kit.screen_track = LocalTrack::None;
1253                                cx.notify();
1254                                Err(error)
1255                            }
1256                        }
1257                    }
1258                })
1259        })
1260    }
1261
1262    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1263        let should_mute = !self.is_muted();
1264        if let Some(live_kit) = self.live_kit.as_mut() {
1265            if matches!(live_kit.microphone_track, LocalTrack::None) {
1266                return Ok(self.share_microphone(cx));
1267            }
1268
1269            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1270            live_kit.muted_by_user = should_mute;
1271
1272            if old_muted == true && live_kit.deafened == true {
1273                if let Some(task) = self.toggle_deafen(cx).ok() {
1274                    task.detach();
1275                }
1276            }
1277
1278            Ok(ret_task)
1279        } else {
1280            Err(anyhow!("LiveKit not started"))
1281        }
1282    }
1283
1284    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1285        if let Some(live_kit) = self.live_kit.as_mut() {
1286            (*live_kit).deafened = !live_kit.deafened;
1287
1288            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1289            // Context notification is sent within set_mute itself.
1290            let mut mute_task = None;
1291            // When deafening, mute user's mic as well.
1292            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1293            if live_kit.deafened || !live_kit.muted_by_user {
1294                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1295            };
1296            for participant in self.remote_participants.values() {
1297                for track in live_kit
1298                    .room
1299                    .remote_audio_track_publications(&participant.user.id.to_string())
1300                {
1301                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1302                }
1303            }
1304
1305            Ok(cx.foreground().spawn(async move {
1306                if let Some(mute_task) = mute_task {
1307                    mute_task.await?;
1308                }
1309                for task in tasks {
1310                    task.await?;
1311                }
1312                Ok(())
1313            }))
1314        } else {
1315            Err(anyhow!("LiveKit not started"))
1316        }
1317    }
1318
1319    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1320        if self.status.is_offline() {
1321            return Err(anyhow!("room is offline"));
1322        }
1323
1324        let live_kit = self
1325            .live_kit
1326            .as_mut()
1327            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1328        match mem::take(&mut live_kit.screen_track) {
1329            LocalTrack::None => Err(anyhow!("screen was not shared")),
1330            LocalTrack::Pending { .. } => {
1331                cx.notify();
1332                Ok(())
1333            }
1334            LocalTrack::Published {
1335                track_publication, ..
1336            } => {
1337                live_kit.room.unpublish_track(track_publication);
1338                cx.notify();
1339
1340                Audio::play_sound(Sound::StopScreenshare, cx);
1341                Ok(())
1342            }
1343        }
1344    }
1345
1346    #[cfg(any(test, feature = "test-support"))]
1347    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1348        self.live_kit
1349            .as_ref()
1350            .unwrap()
1351            .room
1352            .set_display_sources(sources);
1353    }
1354}
1355
1356struct LiveKitRoom {
1357    room: Arc<live_kit_client::Room>,
1358    screen_track: LocalTrack,
1359    microphone_track: LocalTrack,
1360    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1361    muted_by_user: bool,
1362    deafened: bool,
1363    speaking: bool,
1364    next_publish_id: usize,
1365    _maintain_room: Task<()>,
1366    _maintain_tracks: [Task<()>; 2],
1367}
1368
1369impl LiveKitRoom {
1370    fn set_mute(
1371        self: &mut LiveKitRoom,
1372        should_mute: bool,
1373        cx: &mut ModelContext<Room>,
1374    ) -> Result<(Task<Result<()>>, bool)> {
1375        if !should_mute {
1376            // clear user muting state.
1377            self.muted_by_user = false;
1378        }
1379
1380        let (result, old_muted) = match &mut self.microphone_track {
1381            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1382            LocalTrack::Pending { muted, .. } => {
1383                let old_muted = *muted;
1384                *muted = should_mute;
1385                cx.notify();
1386                Ok((Task::Ready(Some(Ok(()))), old_muted))
1387            }
1388            LocalTrack::Published {
1389                track_publication,
1390                muted,
1391            } => {
1392                let old_muted = *muted;
1393                *muted = should_mute;
1394                cx.notify();
1395                Ok((
1396                    cx.background().spawn(track_publication.set_mute(*muted)),
1397                    old_muted,
1398                ))
1399            }
1400        }?;
1401
1402        if old_muted != should_mute {
1403            if should_mute {
1404                Audio::play_sound(Sound::Mute, cx);
1405            } else {
1406                Audio::play_sound(Sound::Unmute, cx);
1407            }
1408        }
1409
1410        Ok((result, old_muted))
1411    }
1412}
1413
1414enum LocalTrack {
1415    None,
1416    Pending {
1417        publish_id: usize,
1418        muted: bool,
1419    },
1420    Published {
1421        track_publication: LocalTrackPublication,
1422        muted: bool,
1423    },
1424}
1425
1426impl Default for LocalTrack {
1427    fn default() -> Self {
1428        Self::None
1429    }
1430}
1431
1432#[derive(Copy, Clone, PartialEq, Eq)]
1433pub enum RoomStatus {
1434    Online,
1435    Rejoining,
1436    Offline,
1437}
1438
1439impl RoomStatus {
1440    pub fn is_offline(&self) -> bool {
1441        matches!(self, RoomStatus::Offline)
1442    }
1443
1444    pub fn is_online(&self) -> bool {
1445        matches!(self, RoomStatus::Online)
1446    }
1447}