room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  15use language::LanguageRegistry;
  16use live_kit_client::{
  17    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  18    RemoteVideoTrackUpdate,
  19};
  20use postage::stream::Stream;
  21use project::Project;
  22use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  23use util::{post_inc, ResultExt, TryFutureExt};
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27#[derive(Clone, Debug, PartialEq, Eq)]
  28pub enum Event {
  29    ParticipantLocationChanged {
  30        participant_id: proto::PeerId,
  31    },
  32    RemoteVideoTracksChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteAudioTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteProjectShared {
  39        owner: Arc<User>,
  40        project_id: u64,
  41        worktree_root_names: Vec<String>,
  42    },
  43    RemoteProjectUnshared {
  44        project_id: u64,
  45    },
  46    Left,
  47}
  48
  49pub struct Room {
  50    id: u64,
  51    live_kit: Option<LiveKitRoom>,
  52    status: RoomStatus,
  53    shared_projects: HashSet<WeakModelHandle<Project>>,
  54    joined_projects: HashSet<WeakModelHandle<Project>>,
  55    local_participant: LocalParticipant,
  56    remote_participants: BTreeMap<u64, RemoteParticipant>,
  57    pending_participants: Vec<Arc<User>>,
  58    participant_user_ids: HashSet<u64>,
  59    pending_call_count: usize,
  60    leave_when_empty: bool,
  61    client: Arc<Client>,
  62    user_store: ModelHandle<UserStore>,
  63    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  64    subscriptions: Vec<client::Subscription>,
  65    pending_room_update: Option<Task<()>>,
  66    maintain_connection: Option<Task<Option<()>>>,
  67}
  68
  69impl Entity for Room {
  70    type Event = Event;
  71
  72    fn release(&mut self, cx: &mut AppContext) {
  73        if self.status.is_online() {
  74            self.leave_internal(cx).detach_and_log_err(cx);
  75        }
  76    }
  77
  78    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  79        if self.status.is_online() {
  80            let leave = self.leave_internal(cx);
  81            Some(
  82                cx.background()
  83                    .spawn(async move {
  84                        leave.await.log_err();
  85                    })
  86                    .boxed(),
  87            )
  88        } else {
  89            None
  90        }
  91    }
  92}
  93
  94impl Room {
  95    fn new(
  96        id: u64,
  97        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  98        client: Arc<Client>,
  99        user_store: ModelHandle<UserStore>,
 100        cx: &mut ModelContext<Self>,
 101    ) -> Self {
 102        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 103            let room = live_kit_client::Room::new();
 104            let mut status = room.status();
 105            // Consume the initial status of the room.
 106            let _ = status.try_recv();
 107            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 108                while let Some(status) = status.next().await {
 109                    let this = if let Some(this) = this.upgrade(&cx) {
 110                        this
 111                    } else {
 112                        break;
 113                    };
 114
 115                    if status == live_kit_client::ConnectionState::Disconnected {
 116                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 117                        break;
 118                    }
 119                }
 120            });
 121
 122            let mut track_video_changes = room.remote_video_track_updates();
 123            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 124                while let Some(track_change) = track_video_changes.next().await {
 125                    let this = if let Some(this) = this.upgrade(&cx) {
 126                        this
 127                    } else {
 128                        break;
 129                    };
 130
 131                    this.update(&mut cx, |this, cx| {
 132                        this.remote_video_track_updated(track_change, cx).log_err()
 133                    });
 134                }
 135            });
 136
 137            let mut track_audio_changes = room.remote_audio_track_updates();
 138            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 139                while let Some(track_change) = track_audio_changes.next().await {
 140                    let this = if let Some(this) = this.upgrade(&cx) {
 141                        this
 142                    } else {
 143                        break;
 144                    };
 145
 146                    this.update(&mut cx, |this, cx| {
 147                        this.remote_audio_track_updated(track_change, cx).log_err()
 148                    });
 149                }
 150            });
 151
 152            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 153            cx.spawn(|this, mut cx| async move {
 154                connect.await?;
 155
 156                this.update(&mut cx, |this, cx| this.share_microphone(cx))
 157                    .await?;
 158
 159                anyhow::Ok(())
 160            })
 161            .detach_and_log_err(cx);
 162
 163            Some(LiveKitRoom {
 164                room,
 165                screen_track: LocalTrack::None,
 166                microphone_track: LocalTrack::None,
 167                next_publish_id: 0,
 168                muted_by_user: false,
 169                deafened: false,
 170                speaking: false,
 171                _maintain_room,
 172                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 173            })
 174        } else {
 175            None
 176        };
 177
 178        let maintain_connection =
 179            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 180
 181        Audio::play_sound(Sound::Joined, cx);
 182
 183        Self {
 184            id,
 185            live_kit: live_kit_room,
 186            status: RoomStatus::Online,
 187            shared_projects: Default::default(),
 188            joined_projects: Default::default(),
 189            participant_user_ids: Default::default(),
 190            local_participant: Default::default(),
 191            remote_participants: Default::default(),
 192            pending_participants: Default::default(),
 193            pending_call_count: 0,
 194            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 195            leave_when_empty: false,
 196            pending_room_update: None,
 197            client,
 198            user_store,
 199            follows_by_leader_id_project_id: Default::default(),
 200            maintain_connection: Some(maintain_connection),
 201        }
 202    }
 203
 204    pub(crate) fn create(
 205        called_user_id: u64,
 206        initial_project: Option<ModelHandle<Project>>,
 207        client: Arc<Client>,
 208        user_store: ModelHandle<UserStore>,
 209        cx: &mut AppContext,
 210    ) -> Task<Result<ModelHandle<Self>>> {
 211        cx.spawn(|mut cx| async move {
 212            let response = client.request(proto::CreateRoom {}).await?;
 213            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 214            let room = cx.add_model(|cx| {
 215                Self::new(
 216                    room_proto.id,
 217                    response.live_kit_connection_info,
 218                    client,
 219                    user_store,
 220                    cx,
 221                )
 222            });
 223
 224            let initial_project_id = if let Some(initial_project) = initial_project {
 225                let initial_project_id = room
 226                    .update(&mut cx, |room, cx| {
 227                        room.share_project(initial_project.clone(), cx)
 228                    })
 229                    .await?;
 230                Some(initial_project_id)
 231            } else {
 232                None
 233            };
 234
 235            match room
 236                .update(&mut cx, |room, cx| {
 237                    room.leave_when_empty = true;
 238                    room.call(called_user_id, initial_project_id, cx)
 239                })
 240                .await
 241            {
 242                Ok(()) => Ok(room),
 243                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 244            }
 245        })
 246    }
 247
 248    pub(crate) fn join(
 249        call: &IncomingCall,
 250        client: Arc<Client>,
 251        user_store: ModelHandle<UserStore>,
 252        cx: &mut AppContext,
 253    ) -> Task<Result<ModelHandle<Self>>> {
 254        let room_id = call.room_id;
 255        cx.spawn(|mut cx| async move {
 256            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 257            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 258            let room = cx.add_model(|cx| {
 259                Self::new(
 260                    room_id,
 261                    response.live_kit_connection_info,
 262                    client,
 263                    user_store,
 264                    cx,
 265                )
 266            });
 267            room.update(&mut cx, |room, cx| {
 268                room.leave_when_empty = true;
 269                room.apply_room_update(room_proto, cx)?;
 270                anyhow::Ok(())
 271            })?;
 272
 273            Ok(room)
 274        })
 275    }
 276
 277    fn should_leave(&self) -> bool {
 278        self.leave_when_empty
 279            && self.pending_room_update.is_none()
 280            && self.pending_participants.is_empty()
 281            && self.remote_participants.is_empty()
 282            && self.pending_call_count == 0
 283    }
 284
 285    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 286        cx.notify();
 287        cx.emit(Event::Left);
 288        self.leave_internal(cx)
 289    }
 290
 291    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 292        if self.status.is_offline() {
 293            return Task::ready(Err(anyhow!("room is offline")));
 294        }
 295
 296        log::info!("leaving room");
 297
 298        for project in self.shared_projects.drain() {
 299            if let Some(project) = project.upgrade(cx) {
 300                project.update(cx, |project, cx| {
 301                    project.unshare(cx).log_err();
 302                });
 303            }
 304        }
 305        for project in self.joined_projects.drain() {
 306            if let Some(project) = project.upgrade(cx) {
 307                project.update(cx, |project, cx| {
 308                    project.disconnected_from_host(cx);
 309                    project.close(cx);
 310                });
 311            }
 312        }
 313
 314        Audio::play_sound(Sound::Leave, cx);
 315
 316        self.status = RoomStatus::Offline;
 317        self.remote_participants.clear();
 318        self.pending_participants.clear();
 319        self.participant_user_ids.clear();
 320        self.subscriptions.clear();
 321        self.live_kit.take();
 322        self.pending_room_update.take();
 323        self.maintain_connection.take();
 324
 325        let leave_room = self.client.request(proto::LeaveRoom {});
 326        cx.background().spawn(async move {
 327            leave_room.await?;
 328            anyhow::Ok(())
 329        })
 330    }
 331
 332    async fn maintain_connection(
 333        this: WeakModelHandle<Self>,
 334        client: Arc<Client>,
 335        mut cx: AsyncAppContext,
 336    ) -> Result<()> {
 337        let mut client_status = client.status();
 338        loop {
 339            let _ = client_status.try_recv();
 340            let is_connected = client_status.borrow().is_connected();
 341            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 342            if !is_connected || client_status.next().await.is_some() {
 343                log::info!("detected client disconnection");
 344
 345                this.upgrade(&cx)
 346                    .ok_or_else(|| anyhow!("room was dropped"))?
 347                    .update(&mut cx, |this, cx| {
 348                        this.status = RoomStatus::Rejoining;
 349                        cx.notify();
 350                    });
 351
 352                // Wait for client to re-establish a connection to the server.
 353                {
 354                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 355                    let client_reconnection = async {
 356                        let mut remaining_attempts = 3;
 357                        while remaining_attempts > 0 {
 358                            if client_status.borrow().is_connected() {
 359                                log::info!("client reconnected, attempting to rejoin room");
 360
 361                                let Some(this) = this.upgrade(&cx) else { break };
 362                                if this
 363                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 364                                    .await
 365                                    .log_err()
 366                                    .is_some()
 367                                {
 368                                    return true;
 369                                } else {
 370                                    remaining_attempts -= 1;
 371                                }
 372                            } else if client_status.borrow().is_signed_out() {
 373                                return false;
 374                            }
 375
 376                            log::info!(
 377                                "waiting for client status change, remaining attempts {}",
 378                                remaining_attempts
 379                            );
 380                            client_status.next().await;
 381                        }
 382                        false
 383                    }
 384                    .fuse();
 385                    futures::pin_mut!(client_reconnection);
 386
 387                    futures::select_biased! {
 388                        reconnected = client_reconnection => {
 389                            if reconnected {
 390                                log::info!("successfully reconnected to room");
 391                                // If we successfully joined the room, go back around the loop
 392                                // waiting for future connection status changes.
 393                                continue;
 394                            }
 395                        }
 396                        _ = reconnection_timeout => {
 397                            log::info!("room reconnection timeout expired");
 398                        }
 399                    }
 400                }
 401
 402                break;
 403            }
 404        }
 405
 406        // The client failed to re-establish a connection to the server
 407        // or an error occurred while trying to re-join the room. Either way
 408        // we leave the room and return an error.
 409        if let Some(this) = this.upgrade(&cx) {
 410            log::info!("reconnection failed, leaving room");
 411            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 412        }
 413        Err(anyhow!(
 414            "can't reconnect to room: client failed to re-establish connection"
 415        ))
 416    }
 417
 418    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 419        let mut projects = HashMap::default();
 420        let mut reshared_projects = Vec::new();
 421        let mut rejoined_projects = Vec::new();
 422        self.shared_projects.retain(|project| {
 423            if let Some(handle) = project.upgrade(cx) {
 424                let project = handle.read(cx);
 425                if let Some(project_id) = project.remote_id() {
 426                    projects.insert(project_id, handle.clone());
 427                    reshared_projects.push(proto::UpdateProject {
 428                        project_id,
 429                        worktrees: project.worktree_metadata_protos(cx),
 430                    });
 431                    return true;
 432                }
 433            }
 434            false
 435        });
 436        self.joined_projects.retain(|project| {
 437            if let Some(handle) = project.upgrade(cx) {
 438                let project = handle.read(cx);
 439                if let Some(project_id) = project.remote_id() {
 440                    projects.insert(project_id, handle.clone());
 441                    rejoined_projects.push(proto::RejoinProject {
 442                        id: project_id,
 443                        worktrees: project
 444                            .worktrees(cx)
 445                            .map(|worktree| {
 446                                let worktree = worktree.read(cx);
 447                                proto::RejoinWorktree {
 448                                    id: worktree.id().to_proto(),
 449                                    scan_id: worktree.completed_scan_id() as u64,
 450                                }
 451                            })
 452                            .collect(),
 453                    });
 454                }
 455                return true;
 456            }
 457            false
 458        });
 459
 460        let response = self.client.request_envelope(proto::RejoinRoom {
 461            id: self.id,
 462            reshared_projects,
 463            rejoined_projects,
 464        });
 465
 466        cx.spawn(|this, mut cx| async move {
 467            let response = response.await?;
 468            let message_id = response.message_id;
 469            let response = response.payload;
 470            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 471            this.update(&mut cx, |this, cx| {
 472                this.status = RoomStatus::Online;
 473                this.apply_room_update(room_proto, cx)?;
 474
 475                for reshared_project in response.reshared_projects {
 476                    if let Some(project) = projects.get(&reshared_project.id) {
 477                        project.update(cx, |project, cx| {
 478                            project.reshared(reshared_project, cx).log_err();
 479                        });
 480                    }
 481                }
 482
 483                for rejoined_project in response.rejoined_projects {
 484                    if let Some(project) = projects.get(&rejoined_project.id) {
 485                        project.update(cx, |project, cx| {
 486                            project.rejoined(rejoined_project, message_id, cx).log_err();
 487                        });
 488                    }
 489                }
 490
 491                anyhow::Ok(())
 492            })
 493        })
 494    }
 495
 496    pub fn id(&self) -> u64 {
 497        self.id
 498    }
 499
 500    pub fn status(&self) -> RoomStatus {
 501        self.status
 502    }
 503
 504    pub fn local_participant(&self) -> &LocalParticipant {
 505        &self.local_participant
 506    }
 507
 508    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 509        &self.remote_participants
 510    }
 511
 512    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 513        self.remote_participants
 514            .values()
 515            .find(|p| p.peer_id == peer_id)
 516    }
 517
 518    pub fn pending_participants(&self) -> &[Arc<User>] {
 519        &self.pending_participants
 520    }
 521
 522    pub fn contains_participant(&self, user_id: u64) -> bool {
 523        self.participant_user_ids.contains(&user_id)
 524    }
 525
 526    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 527        self.follows_by_leader_id_project_id
 528            .get(&(leader_id, project_id))
 529            .map_or(&[], |v| v.as_slice())
 530    }
 531
 532    async fn handle_room_updated(
 533        this: ModelHandle<Self>,
 534        envelope: TypedEnvelope<proto::RoomUpdated>,
 535        _: Arc<Client>,
 536        mut cx: AsyncAppContext,
 537    ) -> Result<()> {
 538        let room = envelope
 539            .payload
 540            .room
 541            .ok_or_else(|| anyhow!("invalid room"))?;
 542        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 543    }
 544
 545    fn apply_room_update(
 546        &mut self,
 547        mut room: proto::Room,
 548        cx: &mut ModelContext<Self>,
 549    ) -> Result<()> {
 550        // Filter ourselves out from the room's participants.
 551        let local_participant_ix = room
 552            .participants
 553            .iter()
 554            .position(|participant| Some(participant.user_id) == self.client.user_id());
 555        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 556
 557        let pending_participant_user_ids = room
 558            .pending_participants
 559            .iter()
 560            .map(|p| p.user_id)
 561            .collect::<Vec<_>>();
 562
 563        let remote_participant_user_ids = room
 564            .participants
 565            .iter()
 566            .map(|p| p.user_id)
 567            .collect::<Vec<_>>();
 568
 569        let (remote_participants, pending_participants) =
 570            self.user_store.update(cx, move |user_store, cx| {
 571                (
 572                    user_store.get_users(remote_participant_user_ids, cx),
 573                    user_store.get_users(pending_participant_user_ids, cx),
 574                )
 575            });
 576
 577        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 578            let (remote_participants, pending_participants) =
 579                futures::join!(remote_participants, pending_participants);
 580
 581            this.update(&mut cx, |this, cx| {
 582                this.participant_user_ids.clear();
 583
 584                if let Some(participant) = local_participant {
 585                    this.local_participant.projects = participant.projects;
 586                } else {
 587                    this.local_participant.projects.clear();
 588                }
 589
 590                if let Some(participants) = remote_participants.log_err() {
 591                    for (participant, user) in room.participants.into_iter().zip(participants) {
 592                        let Some(peer_id) = participant.peer_id else { continue };
 593                        this.participant_user_ids.insert(participant.user_id);
 594
 595                        let old_projects = this
 596                            .remote_participants
 597                            .get(&participant.user_id)
 598                            .into_iter()
 599                            .flat_map(|existing| &existing.projects)
 600                            .map(|project| project.id)
 601                            .collect::<HashSet<_>>();
 602                        let new_projects = participant
 603                            .projects
 604                            .iter()
 605                            .map(|project| project.id)
 606                            .collect::<HashSet<_>>();
 607
 608                        for project in &participant.projects {
 609                            if !old_projects.contains(&project.id) {
 610                                cx.emit(Event::RemoteProjectShared {
 611                                    owner: user.clone(),
 612                                    project_id: project.id,
 613                                    worktree_root_names: project.worktree_root_names.clone(),
 614                                });
 615                            }
 616                        }
 617
 618                        for unshared_project_id in old_projects.difference(&new_projects) {
 619                            this.joined_projects.retain(|project| {
 620                                if let Some(project) = project.upgrade(cx) {
 621                                    project.update(cx, |project, cx| {
 622                                        if project.remote_id() == Some(*unshared_project_id) {
 623                                            project.disconnected_from_host(cx);
 624                                            false
 625                                        } else {
 626                                            true
 627                                        }
 628                                    })
 629                                } else {
 630                                    false
 631                                }
 632                            });
 633                            cx.emit(Event::RemoteProjectUnshared {
 634                                project_id: *unshared_project_id,
 635                            });
 636                        }
 637
 638                        let location = ParticipantLocation::from_proto(participant.location)
 639                            .unwrap_or(ParticipantLocation::External);
 640                        if let Some(remote_participant) =
 641                            this.remote_participants.get_mut(&participant.user_id)
 642                        {
 643                            remote_participant.projects = participant.projects;
 644                            remote_participant.peer_id = peer_id;
 645                            if location != remote_participant.location {
 646                                remote_participant.location = location;
 647                                cx.emit(Event::ParticipantLocationChanged {
 648                                    participant_id: peer_id,
 649                                });
 650                            }
 651                        } else {
 652                            this.remote_participants.insert(
 653                                participant.user_id,
 654                                RemoteParticipant {
 655                                    user: user.clone(),
 656                                    peer_id,
 657                                    projects: participant.projects,
 658                                    location,
 659                                    muted: false,
 660                                    speaking: false,
 661                                    video_tracks: Default::default(),
 662                                    audio_tracks: Default::default(),
 663                                },
 664                            );
 665
 666                            Audio::play_sound(Sound::Joined, cx);
 667
 668                            if let Some(live_kit) = this.live_kit.as_ref() {
 669                                let video_tracks =
 670                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 671                                let audio_tracks =
 672                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 673                                for track in video_tracks {
 674                                    this.remote_video_track_updated(
 675                                        RemoteVideoTrackUpdate::Subscribed(track),
 676                                        cx,
 677                                    )
 678                                    .log_err();
 679                                }
 680                                for track in audio_tracks {
 681                                    this.remote_audio_track_updated(
 682                                        RemoteAudioTrackUpdate::Subscribed(track),
 683                                        cx,
 684                                    )
 685                                    .log_err();
 686                                }
 687                            }
 688                        }
 689                    }
 690
 691                    this.remote_participants.retain(|user_id, participant| {
 692                        if this.participant_user_ids.contains(user_id) {
 693                            true
 694                        } else {
 695                            for project in &participant.projects {
 696                                cx.emit(Event::RemoteProjectUnshared {
 697                                    project_id: project.id,
 698                                });
 699                            }
 700                            false
 701                        }
 702                    });
 703                }
 704
 705                if let Some(pending_participants) = pending_participants.log_err() {
 706                    this.pending_participants = pending_participants;
 707                    for participant in &this.pending_participants {
 708                        this.participant_user_ids.insert(participant.id);
 709                    }
 710                }
 711
 712                this.follows_by_leader_id_project_id.clear();
 713                for follower in room.followers {
 714                    let project_id = follower.project_id;
 715                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 716                        (Some(leader), Some(follower)) => (leader, follower),
 717
 718                        _ => {
 719                            log::error!("Follower message {follower:?} missing some state");
 720                            continue;
 721                        }
 722                    };
 723
 724                    let list = this
 725                        .follows_by_leader_id_project_id
 726                        .entry((leader, project_id))
 727                        .or_insert(Vec::new());
 728                    if !list.contains(&follower) {
 729                        list.push(follower);
 730                    }
 731                }
 732
 733                this.pending_room_update.take();
 734                if this.should_leave() {
 735                    log::info!("room is empty, leaving");
 736                    let _ = this.leave(cx);
 737                }
 738
 739                this.check_invariants();
 740                cx.notify();
 741            });
 742        }));
 743
 744        cx.notify();
 745        Ok(())
 746    }
 747
 748    fn remote_video_track_updated(
 749        &mut self,
 750        change: RemoteVideoTrackUpdate,
 751        cx: &mut ModelContext<Self>,
 752    ) -> Result<()> {
 753        match change {
 754            RemoteVideoTrackUpdate::Subscribed(track) => {
 755                let user_id = track.publisher_id().parse()?;
 756                let track_id = track.sid().to_string();
 757                let participant = self
 758                    .remote_participants
 759                    .get_mut(&user_id)
 760                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 761                participant.video_tracks.insert(
 762                    track_id.clone(),
 763                    Arc::new(RemoteVideoTrack {
 764                        live_kit_track: track,
 765                    }),
 766                );
 767                cx.emit(Event::RemoteVideoTracksChanged {
 768                    participant_id: participant.peer_id,
 769                });
 770            }
 771            RemoteVideoTrackUpdate::Unsubscribed {
 772                publisher_id,
 773                track_id,
 774            } => {
 775                let user_id = publisher_id.parse()?;
 776                let participant = self
 777                    .remote_participants
 778                    .get_mut(&user_id)
 779                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 780                participant.video_tracks.remove(&track_id);
 781                cx.emit(Event::RemoteVideoTracksChanged {
 782                    participant_id: participant.peer_id,
 783                });
 784            }
 785        }
 786
 787        cx.notify();
 788        Ok(())
 789    }
 790
 791    fn remote_audio_track_updated(
 792        &mut self,
 793        change: RemoteAudioTrackUpdate,
 794        cx: &mut ModelContext<Self>,
 795    ) -> Result<()> {
 796        match change {
 797            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 798                let mut speaker_ids = speakers
 799                    .into_iter()
 800                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 801                    .collect::<Vec<u64>>();
 802                speaker_ids.sort_unstable();
 803                for (sid, participant) in &mut self.remote_participants {
 804                    if let Ok(_) = speaker_ids.binary_search(sid) {
 805                        participant.speaking = true;
 806                    } else {
 807                        participant.speaking = false;
 808                    }
 809                }
 810                if let Some(id) = self.client.user_id() {
 811                    if let Some(room) = &mut self.live_kit {
 812                        if let Ok(_) = speaker_ids.binary_search(&id) {
 813                            room.speaking = true;
 814                        } else {
 815                            room.speaking = false;
 816                        }
 817                    }
 818                }
 819                cx.notify();
 820            }
 821            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 822                for participant in &mut self.remote_participants.values_mut() {
 823                    let mut found = false;
 824                    for track in participant.audio_tracks.values() {
 825                        if track.sid() == track_id {
 826                            found = true;
 827                            break;
 828                        }
 829                    }
 830                    if found {
 831                        participant.muted = muted;
 832                        break;
 833                    }
 834                }
 835                cx.notify();
 836            }
 837            RemoteAudioTrackUpdate::Subscribed(track) => {
 838                let user_id = track.publisher_id().parse()?;
 839                let track_id = track.sid().to_string();
 840                let participant = self
 841                    .remote_participants
 842                    .get_mut(&user_id)
 843                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 844                participant.audio_tracks.insert(track_id.clone(), track);
 845                cx.emit(Event::RemoteAudioTracksChanged {
 846                    participant_id: participant.peer_id,
 847                });
 848            }
 849            RemoteAudioTrackUpdate::Unsubscribed {
 850                publisher_id,
 851                track_id,
 852            } => {
 853                let user_id = publisher_id.parse()?;
 854                let participant = self
 855                    .remote_participants
 856                    .get_mut(&user_id)
 857                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 858                participant.audio_tracks.remove(&track_id);
 859                cx.emit(Event::RemoteAudioTracksChanged {
 860                    participant_id: participant.peer_id,
 861                });
 862            }
 863        }
 864
 865        cx.notify();
 866        Ok(())
 867    }
 868
 869    fn check_invariants(&self) {
 870        #[cfg(any(test, feature = "test-support"))]
 871        {
 872            for participant in self.remote_participants.values() {
 873                assert!(self.participant_user_ids.contains(&participant.user.id));
 874                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 875            }
 876
 877            for participant in &self.pending_participants {
 878                assert!(self.participant_user_ids.contains(&participant.id));
 879                assert_ne!(participant.id, self.client.user_id().unwrap());
 880            }
 881
 882            assert_eq!(
 883                self.participant_user_ids.len(),
 884                self.remote_participants.len() + self.pending_participants.len()
 885            );
 886        }
 887    }
 888
 889    pub(crate) fn call(
 890        &mut self,
 891        called_user_id: u64,
 892        initial_project_id: Option<u64>,
 893        cx: &mut ModelContext<Self>,
 894    ) -> Task<Result<()>> {
 895        if self.status.is_offline() {
 896            return Task::ready(Err(anyhow!("room is offline")));
 897        }
 898
 899        cx.notify();
 900        let client = self.client.clone();
 901        let room_id = self.id;
 902        self.pending_call_count += 1;
 903        cx.spawn(|this, mut cx| async move {
 904            let result = client
 905                .request(proto::Call {
 906                    room_id,
 907                    called_user_id,
 908                    initial_project_id,
 909                })
 910                .await;
 911            this.update(&mut cx, |this, cx| {
 912                this.pending_call_count -= 1;
 913                if this.should_leave() {
 914                    this.leave(cx).detach_and_log_err(cx);
 915                }
 916            });
 917            result?;
 918            Ok(())
 919        })
 920    }
 921
 922    pub fn join_project(
 923        &mut self,
 924        id: u64,
 925        language_registry: Arc<LanguageRegistry>,
 926        fs: Arc<dyn Fs>,
 927        cx: &mut ModelContext<Self>,
 928    ) -> Task<Result<ModelHandle<Project>>> {
 929        let client = self.client.clone();
 930        let user_store = self.user_store.clone();
 931        cx.spawn(|this, mut cx| async move {
 932            let project =
 933                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 934
 935            this.update(&mut cx, |this, cx| {
 936                this.joined_projects.retain(|project| {
 937                    if let Some(project) = project.upgrade(cx) {
 938                        !project.read(cx).is_read_only()
 939                    } else {
 940                        false
 941                    }
 942                });
 943                this.joined_projects.insert(project.downgrade());
 944            });
 945            Ok(project)
 946        })
 947    }
 948
 949    pub(crate) fn share_project(
 950        &mut self,
 951        project: ModelHandle<Project>,
 952        cx: &mut ModelContext<Self>,
 953    ) -> Task<Result<u64>> {
 954        if let Some(project_id) = project.read(cx).remote_id() {
 955            return Task::ready(Ok(project_id));
 956        }
 957
 958        let request = self.client.request(proto::ShareProject {
 959            room_id: self.id(),
 960            worktrees: project.read(cx).worktree_metadata_protos(cx),
 961        });
 962        cx.spawn(|this, mut cx| async move {
 963            let response = request.await?;
 964
 965            project.update(&mut cx, |project, cx| {
 966                project.shared(response.project_id, cx)
 967            })?;
 968
 969            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 970            this.update(&mut cx, |this, cx| {
 971                this.shared_projects.insert(project.downgrade());
 972                let active_project = this.local_participant.active_project.as_ref();
 973                if active_project.map_or(false, |location| *location == project) {
 974                    this.set_location(Some(&project), cx)
 975                } else {
 976                    Task::ready(Ok(()))
 977                }
 978            })
 979            .await?;
 980
 981            Ok(response.project_id)
 982        })
 983    }
 984
 985    pub(crate) fn unshare_project(
 986        &mut self,
 987        project: ModelHandle<Project>,
 988        cx: &mut ModelContext<Self>,
 989    ) -> Result<()> {
 990        let project_id = match project.read(cx).remote_id() {
 991            Some(project_id) => project_id,
 992            None => return Ok(()),
 993        };
 994
 995        self.client.send(proto::UnshareProject { project_id })?;
 996        project.update(cx, |this, cx| this.unshare(cx))
 997    }
 998
 999    pub(crate) fn set_location(
1000        &mut self,
1001        project: Option<&ModelHandle<Project>>,
1002        cx: &mut ModelContext<Self>,
1003    ) -> Task<Result<()>> {
1004        if self.status.is_offline() {
1005            return Task::ready(Err(anyhow!("room is offline")));
1006        }
1007
1008        let client = self.client.clone();
1009        let room_id = self.id;
1010        let location = if let Some(project) = project {
1011            self.local_participant.active_project = Some(project.downgrade());
1012            if let Some(project_id) = project.read(cx).remote_id() {
1013                proto::participant_location::Variant::SharedProject(
1014                    proto::participant_location::SharedProject { id: project_id },
1015                )
1016            } else {
1017                proto::participant_location::Variant::UnsharedProject(
1018                    proto::participant_location::UnsharedProject {},
1019                )
1020            }
1021        } else {
1022            self.local_participant.active_project = None;
1023            proto::participant_location::Variant::External(proto::participant_location::External {})
1024        };
1025
1026        cx.notify();
1027        cx.foreground().spawn(async move {
1028            client
1029                .request(proto::UpdateParticipantLocation {
1030                    room_id,
1031                    location: Some(proto::ParticipantLocation {
1032                        variant: Some(location),
1033                    }),
1034                })
1035                .await?;
1036            Ok(())
1037        })
1038    }
1039
1040    pub fn is_screen_sharing(&self) -> bool {
1041        self.live_kit.as_ref().map_or(false, |live_kit| {
1042            !matches!(live_kit.screen_track, LocalTrack::None)
1043        })
1044    }
1045
1046    pub fn is_sharing_mic(&self) -> bool {
1047        self.live_kit.as_ref().map_or(false, |live_kit| {
1048            !matches!(live_kit.microphone_track, LocalTrack::None)
1049        })
1050    }
1051
1052    pub fn is_muted(&self) -> bool {
1053        self.live_kit
1054            .as_ref()
1055            .and_then(|live_kit| match &live_kit.microphone_track {
1056                LocalTrack::None => None,
1057                LocalTrack::Pending { muted, .. } => Some(*muted),
1058                LocalTrack::Published { muted, .. } => Some(*muted),
1059            })
1060            .unwrap_or(false)
1061    }
1062
1063    pub fn is_speaking(&self) -> bool {
1064        self.live_kit
1065            .as_ref()
1066            .map_or(false, |live_kit| live_kit.speaking)
1067    }
1068
1069    pub fn is_deafened(&self) -> Option<bool> {
1070        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1071    }
1072
1073    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1074        if self.status.is_offline() {
1075            return Task::ready(Err(anyhow!("room is offline")));
1076        } else if self.is_sharing_mic() {
1077            return Task::ready(Err(anyhow!("microphone was already shared")));
1078        }
1079
1080        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1081            let publish_id = post_inc(&mut live_kit.next_publish_id);
1082            live_kit.microphone_track = LocalTrack::Pending {
1083                publish_id,
1084                muted: false,
1085            };
1086            cx.notify();
1087            publish_id
1088        } else {
1089            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1090        };
1091
1092        cx.spawn_weak(|this, mut cx| async move {
1093            let publish_track = async {
1094                let track = LocalAudioTrack::create();
1095                this.upgrade(&cx)
1096                    .ok_or_else(|| anyhow!("room was dropped"))?
1097                    .read_with(&cx, |this, _| {
1098                        this.live_kit
1099                            .as_ref()
1100                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1101                    })
1102                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1103                    .await
1104            };
1105
1106            let publication = publish_track.await;
1107            this.upgrade(&cx)
1108                .ok_or_else(|| anyhow!("room was dropped"))?
1109                .update(&mut cx, |this, cx| {
1110                    let live_kit = this
1111                        .live_kit
1112                        .as_mut()
1113                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1114
1115                    let (canceled, muted) = if let LocalTrack::Pending {
1116                        publish_id: cur_publish_id,
1117                        muted,
1118                    } = &live_kit.microphone_track
1119                    {
1120                        (*cur_publish_id != publish_id, *muted)
1121                    } else {
1122                        (true, false)
1123                    };
1124
1125                    match publication {
1126                        Ok(publication) => {
1127                            if canceled {
1128                                live_kit.room.unpublish_track(publication);
1129                            } else {
1130                                if muted {
1131                                    cx.background().spawn(publication.set_mute(muted)).detach();
1132                                }
1133                                live_kit.microphone_track = LocalTrack::Published {
1134                                    track_publication: publication,
1135                                    muted,
1136                                };
1137                                cx.notify();
1138                            }
1139                            Ok(())
1140                        }
1141                        Err(error) => {
1142                            if canceled {
1143                                Ok(())
1144                            } else {
1145                                live_kit.microphone_track = LocalTrack::None;
1146                                cx.notify();
1147                                Err(error)
1148                            }
1149                        }
1150                    }
1151                })
1152        })
1153    }
1154
1155    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1156        if self.status.is_offline() {
1157            return Task::ready(Err(anyhow!("room is offline")));
1158        } else if self.is_screen_sharing() {
1159            return Task::ready(Err(anyhow!("screen was already shared")));
1160        }
1161
1162        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1163            let publish_id = post_inc(&mut live_kit.next_publish_id);
1164            live_kit.screen_track = LocalTrack::Pending {
1165                publish_id,
1166                muted: false,
1167            };
1168            cx.notify();
1169            (live_kit.room.display_sources(), publish_id)
1170        } else {
1171            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1172        };
1173
1174        cx.spawn_weak(|this, mut cx| async move {
1175            let publish_track = async {
1176                let displays = displays.await?;
1177                let display = displays
1178                    .first()
1179                    .ok_or_else(|| anyhow!("no display found"))?;
1180                let track = LocalVideoTrack::screen_share_for_display(&display);
1181                this.upgrade(&cx)
1182                    .ok_or_else(|| anyhow!("room was dropped"))?
1183                    .read_with(&cx, |this, _| {
1184                        this.live_kit
1185                            .as_ref()
1186                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1187                    })
1188                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1189                    .await
1190            };
1191
1192            let publication = publish_track.await;
1193            this.upgrade(&cx)
1194                .ok_or_else(|| anyhow!("room was dropped"))?
1195                .update(&mut cx, |this, cx| {
1196                    let live_kit = this
1197                        .live_kit
1198                        .as_mut()
1199                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1200
1201                    let (canceled, muted) = if let LocalTrack::Pending {
1202                        publish_id: cur_publish_id,
1203                        muted,
1204                    } = &live_kit.screen_track
1205                    {
1206                        (*cur_publish_id != publish_id, *muted)
1207                    } else {
1208                        (true, false)
1209                    };
1210
1211                    match publication {
1212                        Ok(publication) => {
1213                            if canceled {
1214                                live_kit.room.unpublish_track(publication);
1215                            } else {
1216                                if muted {
1217                                    cx.background().spawn(publication.set_mute(muted)).detach();
1218                                }
1219                                live_kit.screen_track = LocalTrack::Published {
1220                                    track_publication: publication,
1221                                    muted,
1222                                };
1223                                cx.notify();
1224                            }
1225
1226                            Audio::play_sound(Sound::StartScreenshare, cx);
1227
1228                            Ok(())
1229                        }
1230                        Err(error) => {
1231                            if canceled {
1232                                Ok(())
1233                            } else {
1234                                live_kit.screen_track = LocalTrack::None;
1235                                cx.notify();
1236                                Err(error)
1237                            }
1238                        }
1239                    }
1240                })
1241        })
1242    }
1243
1244    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1245        let should_mute = !self.is_muted();
1246        if let Some(live_kit) = self.live_kit.as_mut() {
1247            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1248            live_kit.muted_by_user = should_mute;
1249
1250            if old_muted == true && live_kit.deafened == true {
1251                if let Some(task) = self.toggle_deafen(cx).ok() {
1252                    task.detach();
1253                }
1254            }
1255
1256            Ok(ret_task)
1257        } else {
1258            Err(anyhow!("LiveKit not started"))
1259        }
1260    }
1261
1262    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1263        if let Some(live_kit) = self.live_kit.as_mut() {
1264            (*live_kit).deafened = !live_kit.deafened;
1265
1266            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1267            // Context notification is sent within set_mute itself.
1268            let mut mute_task = None;
1269            // When deafening, mute user's mic as well.
1270            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1271            if live_kit.deafened || !live_kit.muted_by_user {
1272                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1273            };
1274            for participant in self.remote_participants.values() {
1275                for track in live_kit
1276                    .room
1277                    .remote_audio_track_publications(&participant.user.id.to_string())
1278                {
1279                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1280                }
1281            }
1282
1283            Ok(cx.foreground().spawn(async move {
1284                if let Some(mute_task) = mute_task {
1285                    mute_task.await?;
1286                }
1287                for task in tasks {
1288                    task.await?;
1289                }
1290                Ok(())
1291            }))
1292        } else {
1293            Err(anyhow!("LiveKit not started"))
1294        }
1295    }
1296
1297    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1298        if self.status.is_offline() {
1299            return Err(anyhow!("room is offline"));
1300        }
1301
1302        let live_kit = self
1303            .live_kit
1304            .as_mut()
1305            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1306        match mem::take(&mut live_kit.screen_track) {
1307            LocalTrack::None => Err(anyhow!("screen was not shared")),
1308            LocalTrack::Pending { .. } => {
1309                cx.notify();
1310                Ok(())
1311            }
1312            LocalTrack::Published {
1313                track_publication, ..
1314            } => {
1315                live_kit.room.unpublish_track(track_publication);
1316                cx.notify();
1317
1318                Audio::play_sound(Sound::StopScreenshare, cx);
1319                Ok(())
1320            }
1321        }
1322    }
1323
1324    #[cfg(any(test, feature = "test-support"))]
1325    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1326        self.live_kit
1327            .as_ref()
1328            .unwrap()
1329            .room
1330            .set_display_sources(sources);
1331    }
1332}
1333
1334struct LiveKitRoom {
1335    room: Arc<live_kit_client::Room>,
1336    screen_track: LocalTrack,
1337    microphone_track: LocalTrack,
1338    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1339    muted_by_user: bool,
1340    deafened: bool,
1341    speaking: bool,
1342    next_publish_id: usize,
1343    _maintain_room: Task<()>,
1344    _maintain_tracks: [Task<()>; 2],
1345}
1346
1347impl LiveKitRoom {
1348    fn set_mute(
1349        self: &mut LiveKitRoom,
1350        should_mute: bool,
1351        cx: &mut ModelContext<Room>,
1352    ) -> Result<(Task<Result<()>>, bool)> {
1353        if !should_mute {
1354            // clear user muting state.
1355            self.muted_by_user = false;
1356        }
1357
1358        let (result, old_muted) = match &mut self.microphone_track {
1359            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1360            LocalTrack::Pending { muted, .. } => {
1361                let old_muted = *muted;
1362                *muted = should_mute;
1363                cx.notify();
1364                Ok((Task::Ready(Some(Ok(()))), old_muted))
1365            }
1366            LocalTrack::Published {
1367                track_publication,
1368                muted,
1369            } => {
1370                let old_muted = *muted;
1371                *muted = should_mute;
1372                cx.notify();
1373                Ok((
1374                    cx.background().spawn(track_publication.set_mute(*muted)),
1375                    old_muted,
1376                ))
1377            }
1378        }?;
1379
1380        if old_muted != should_mute {
1381            if should_mute {
1382                Audio::play_sound(Sound::Mute, cx);
1383            } else {
1384                Audio::play_sound(Sound::Unmute, cx);
1385            }
1386        }
1387
1388        Ok((result, old_muted))
1389    }
1390}
1391
1392enum LocalTrack {
1393    None,
1394    Pending {
1395        publish_id: usize,
1396        muted: bool,
1397    },
1398    Published {
1399        track_publication: LocalTrackPublication,
1400        muted: bool,
1401    },
1402}
1403
1404impl Default for LocalTrack {
1405    fn default() -> Self {
1406        Self::None
1407    }
1408}
1409
1410#[derive(Copy, Clone, PartialEq, Eq)]
1411pub enum RoomStatus {
1412    Online,
1413    Rejoining,
1414    Offline,
1415}
1416
1417impl RoomStatus {
1418    pub fn is_offline(&self) -> bool {
1419        matches!(self, RoomStatus::Offline)
1420    }
1421
1422    pub fn is_online(&self) -> bool {
1423        matches!(self, RoomStatus::Online)
1424    }
1425}