room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  14use language::LanguageRegistry;
  15use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  16use postage::stream::Stream;
  17use project::Project;
  18use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  19use util::{post_inc, ResultExt, TryFutureExt};
  20
  21pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  22
  23#[derive(Clone, Debug, PartialEq, Eq)]
  24pub enum Event {
  25    ParticipantLocationChanged {
  26        participant_id: proto::PeerId,
  27    },
  28    RemoteVideoTracksChanged {
  29        participant_id: proto::PeerId,
  30    },
  31    RemoteProjectShared {
  32        owner: Arc<User>,
  33        project_id: u64,
  34        worktree_root_names: Vec<String>,
  35    },
  36    RemoteProjectUnshared {
  37        project_id: u64,
  38    },
  39    Left,
  40}
  41
  42pub struct Room {
  43    id: u64,
  44    live_kit: Option<LiveKitRoom>,
  45    status: RoomStatus,
  46    shared_projects: HashSet<WeakModelHandle<Project>>,
  47    joined_projects: HashSet<WeakModelHandle<Project>>,
  48    local_participant: LocalParticipant,
  49    remote_participants: BTreeMap<u64, RemoteParticipant>,
  50    pending_participants: Vec<Arc<User>>,
  51    participant_user_ids: HashSet<u64>,
  52    pending_call_count: usize,
  53    leave_when_empty: bool,
  54    client: Arc<Client>,
  55    user_store: ModelHandle<UserStore>,
  56    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  57    subscriptions: Vec<client::Subscription>,
  58    pending_room_update: Option<Task<()>>,
  59    maintain_connection: Option<Task<Option<()>>>,
  60}
  61
  62impl Entity for Room {
  63    type Event = Event;
  64
  65    fn release(&mut self, cx: &mut AppContext) {
  66        if self.status.is_online() {
  67            self.leave_internal(cx).detach_and_log_err(cx);
  68        }
  69    }
  70
  71    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  72        if self.status.is_online() {
  73            let leave = self.leave_internal(cx);
  74            Some(
  75                cx.background()
  76                    .spawn(async move {
  77                        leave.await.log_err();
  78                    })
  79                    .boxed(),
  80            )
  81        } else {
  82            None
  83        }
  84    }
  85}
  86
  87impl Room {
  88    fn new(
  89        id: u64,
  90        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  91        client: Arc<Client>,
  92        user_store: ModelHandle<UserStore>,
  93        cx: &mut ModelContext<Self>,
  94    ) -> Self {
  95        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
  96            let room = live_kit_client::Room::new();
  97            let mut status = room.status();
  98            // Consume the initial status of the room.
  99            let _ = status.try_recv();
 100            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 101                while let Some(status) = status.next().await {
 102                    let this = if let Some(this) = this.upgrade(&cx) {
 103                        this
 104                    } else {
 105                        break;
 106                    };
 107
 108                    if status == live_kit_client::ConnectionState::Disconnected {
 109                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 110                        break;
 111                    }
 112                }
 113            });
 114
 115            let mut track_changes = room.remote_video_track_updates();
 116            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 117                while let Some(track_change) = track_changes.next().await {
 118                    let this = if let Some(this) = this.upgrade(&cx) {
 119                        this
 120                    } else {
 121                        break;
 122                    };
 123
 124                    this.update(&mut cx, |this, cx| {
 125                        this.remote_video_track_updated(track_change, cx).log_err()
 126                    });
 127                }
 128            });
 129
 130            cx.foreground()
 131                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 132                .detach_and_log_err(cx);
 133
 134            Some(LiveKitRoom {
 135                room,
 136                screen_track: ScreenTrack::None,
 137                next_publish_id: 0,
 138                _maintain_room,
 139                _maintain_tracks,
 140            })
 141        } else {
 142            None
 143        };
 144
 145        let maintain_connection =
 146            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 147
 148        Self {
 149            id,
 150            live_kit: live_kit_room,
 151            status: RoomStatus::Online,
 152            shared_projects: Default::default(),
 153            joined_projects: Default::default(),
 154            participant_user_ids: Default::default(),
 155            local_participant: Default::default(),
 156            remote_participants: Default::default(),
 157            pending_participants: Default::default(),
 158            pending_call_count: 0,
 159            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 160            leave_when_empty: false,
 161            pending_room_update: None,
 162            client,
 163            user_store,
 164            follows_by_leader_id_project_id: Default::default(),
 165            maintain_connection: Some(maintain_connection),
 166        }
 167    }
 168
 169    pub(crate) fn create(
 170        called_user_id: u64,
 171        initial_project: Option<ModelHandle<Project>>,
 172        client: Arc<Client>,
 173        user_store: ModelHandle<UserStore>,
 174        cx: &mut AppContext,
 175    ) -> Task<Result<ModelHandle<Self>>> {
 176        cx.spawn(|mut cx| async move {
 177            let response = client.request(proto::CreateRoom {}).await?;
 178            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 179            let room = cx.add_model(|cx| {
 180                Self::new(
 181                    room_proto.id,
 182                    response.live_kit_connection_info,
 183                    client,
 184                    user_store,
 185                    cx,
 186                )
 187            });
 188
 189            let initial_project_id = if let Some(initial_project) = initial_project {
 190                let initial_project_id = room
 191                    .update(&mut cx, |room, cx| {
 192                        room.share_project(initial_project.clone(), cx)
 193                    })
 194                    .await?;
 195                Some(initial_project_id)
 196            } else {
 197                None
 198            };
 199
 200            match room
 201                .update(&mut cx, |room, cx| {
 202                    room.leave_when_empty = true;
 203                    room.call(called_user_id, initial_project_id, cx)
 204                })
 205                .await
 206            {
 207                Ok(()) => Ok(room),
 208                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 209            }
 210        })
 211    }
 212
 213    pub(crate) fn join(
 214        call: &IncomingCall,
 215        client: Arc<Client>,
 216        user_store: ModelHandle<UserStore>,
 217        cx: &mut AppContext,
 218    ) -> Task<Result<ModelHandle<Self>>> {
 219        let room_id = call.room_id;
 220        cx.spawn(|mut cx| async move {
 221            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 222            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 223            let room = cx.add_model(|cx| {
 224                Self::new(
 225                    room_id,
 226                    response.live_kit_connection_info,
 227                    client,
 228                    user_store,
 229                    cx,
 230                )
 231            });
 232            room.update(&mut cx, |room, cx| {
 233                room.leave_when_empty = true;
 234                room.apply_room_update(room_proto, cx)?;
 235                anyhow::Ok(())
 236            })?;
 237            Ok(room)
 238        })
 239    }
 240
 241    fn should_leave(&self) -> bool {
 242        self.leave_when_empty
 243            && self.pending_room_update.is_none()
 244            && self.pending_participants.is_empty()
 245            && self.remote_participants.is_empty()
 246            && self.pending_call_count == 0
 247    }
 248
 249    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 250        cx.notify();
 251        cx.emit(Event::Left);
 252        self.leave_internal(cx)
 253    }
 254
 255    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 256        if self.status.is_offline() {
 257            return Task::ready(Err(anyhow!("room is offline")));
 258        }
 259
 260        log::info!("leaving room");
 261
 262        for project in self.shared_projects.drain() {
 263            if let Some(project) = project.upgrade(cx) {
 264                project.update(cx, |project, cx| {
 265                    project.unshare(cx).log_err();
 266                });
 267            }
 268        }
 269        for project in self.joined_projects.drain() {
 270            if let Some(project) = project.upgrade(cx) {
 271                project.update(cx, |project, cx| {
 272                    project.disconnected_from_host(cx);
 273                    project.close(cx);
 274                });
 275            }
 276        }
 277
 278        self.status = RoomStatus::Offline;
 279        self.remote_participants.clear();
 280        self.pending_participants.clear();
 281        self.participant_user_ids.clear();
 282        self.subscriptions.clear();
 283        self.live_kit.take();
 284        self.pending_room_update.take();
 285        self.maintain_connection.take();
 286
 287        let leave_room = self.client.request(proto::LeaveRoom {});
 288        cx.background().spawn(async move {
 289            leave_room.await?;
 290            anyhow::Ok(())
 291        })
 292    }
 293
 294    async fn maintain_connection(
 295        this: WeakModelHandle<Self>,
 296        client: Arc<Client>,
 297        mut cx: AsyncAppContext,
 298    ) -> Result<()> {
 299        let mut client_status = client.status();
 300        loop {
 301            let _ = client_status.try_recv();
 302            let is_connected = client_status.borrow().is_connected();
 303            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 304            if !is_connected || client_status.next().await.is_some() {
 305                log::info!("detected client disconnection");
 306
 307                this.upgrade(&cx)
 308                    .ok_or_else(|| anyhow!("room was dropped"))?
 309                    .update(&mut cx, |this, cx| {
 310                        this.status = RoomStatus::Rejoining;
 311                        cx.notify();
 312                    });
 313
 314                // Wait for client to re-establish a connection to the server.
 315                {
 316                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 317                    let client_reconnection = async {
 318                        let mut remaining_attempts = 3;
 319                        while remaining_attempts > 0 {
 320                            if client_status.borrow().is_connected() {
 321                                log::info!("client reconnected, attempting to rejoin room");
 322
 323                                let Some(this) = this.upgrade(&cx) else { break };
 324                                if this
 325                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 326                                    .await
 327                                    .log_err()
 328                                    .is_some()
 329                                {
 330                                    return true;
 331                                } else {
 332                                    remaining_attempts -= 1;
 333                                }
 334                            } else if client_status.borrow().is_signed_out() {
 335                                return false;
 336                            }
 337
 338                            log::info!(
 339                                "waiting for client status change, remaining attempts {}",
 340                                remaining_attempts
 341                            );
 342                            client_status.next().await;
 343                        }
 344                        false
 345                    }
 346                    .fuse();
 347                    futures::pin_mut!(client_reconnection);
 348
 349                    futures::select_biased! {
 350                        reconnected = client_reconnection => {
 351                            if reconnected {
 352                                log::info!("successfully reconnected to room");
 353                                // If we successfully joined the room, go back around the loop
 354                                // waiting for future connection status changes.
 355                                continue;
 356                            }
 357                        }
 358                        _ = reconnection_timeout => {
 359                            log::info!("room reconnection timeout expired");
 360                        }
 361                    }
 362                }
 363
 364                break;
 365            }
 366        }
 367
 368        // The client failed to re-establish a connection to the server
 369        // or an error occurred while trying to re-join the room. Either way
 370        // we leave the room and return an error.
 371        if let Some(this) = this.upgrade(&cx) {
 372            log::info!("reconnection failed, leaving room");
 373            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 374        }
 375        Err(anyhow!(
 376            "can't reconnect to room: client failed to re-establish connection"
 377        ))
 378    }
 379
 380    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 381        let mut projects = HashMap::default();
 382        let mut reshared_projects = Vec::new();
 383        let mut rejoined_projects = Vec::new();
 384        self.shared_projects.retain(|project| {
 385            if let Some(handle) = project.upgrade(cx) {
 386                let project = handle.read(cx);
 387                if let Some(project_id) = project.remote_id() {
 388                    projects.insert(project_id, handle.clone());
 389                    reshared_projects.push(proto::UpdateProject {
 390                        project_id,
 391                        worktrees: project.worktree_metadata_protos(cx),
 392                    });
 393                    return true;
 394                }
 395            }
 396            false
 397        });
 398        self.joined_projects.retain(|project| {
 399            if let Some(handle) = project.upgrade(cx) {
 400                let project = handle.read(cx);
 401                if let Some(project_id) = project.remote_id() {
 402                    projects.insert(project_id, handle.clone());
 403                    rejoined_projects.push(proto::RejoinProject {
 404                        id: project_id,
 405                        worktrees: project
 406                            .worktrees(cx)
 407                            .map(|worktree| {
 408                                let worktree = worktree.read(cx);
 409                                proto::RejoinWorktree {
 410                                    id: worktree.id().to_proto(),
 411                                    scan_id: worktree.completed_scan_id() as u64,
 412                                }
 413                            })
 414                            .collect(),
 415                    });
 416                }
 417                return true;
 418            }
 419            false
 420        });
 421
 422        let response = self.client.request(proto::RejoinRoom {
 423            id: self.id,
 424            reshared_projects,
 425            rejoined_projects,
 426        });
 427
 428        cx.spawn(|this, mut cx| async move {
 429            let response = response.await?;
 430            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 431            this.update(&mut cx, |this, cx| {
 432                this.status = RoomStatus::Online;
 433                this.apply_room_update(room_proto, cx)?;
 434
 435                for reshared_project in response.reshared_projects {
 436                    if let Some(project) = projects.get(&reshared_project.id) {
 437                        project.update(cx, |project, cx| {
 438                            project.reshared(reshared_project, cx).log_err();
 439                        });
 440                    }
 441                }
 442
 443                for rejoined_project in response.rejoined_projects {
 444                    if let Some(project) = projects.get(&rejoined_project.id) {
 445                        project.update(cx, |project, cx| {
 446                            project.rejoined(rejoined_project, cx).log_err();
 447                        });
 448                    }
 449                }
 450
 451                anyhow::Ok(())
 452            })
 453        })
 454    }
 455
 456    pub fn id(&self) -> u64 {
 457        self.id
 458    }
 459
 460    pub fn status(&self) -> RoomStatus {
 461        self.status
 462    }
 463
 464    pub fn local_participant(&self) -> &LocalParticipant {
 465        &self.local_participant
 466    }
 467
 468    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 469        &self.remote_participants
 470    }
 471
 472    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 473        self.remote_participants
 474            .values()
 475            .find(|p| p.peer_id == peer_id)
 476    }
 477
 478    pub fn pending_participants(&self) -> &[Arc<User>] {
 479        &self.pending_participants
 480    }
 481
 482    pub fn contains_participant(&self, user_id: u64) -> bool {
 483        self.participant_user_ids.contains(&user_id)
 484    }
 485
 486    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 487        self.follows_by_leader_id_project_id
 488            .get(&(leader_id, project_id))
 489            .map_or(&[], |v| v.as_slice())
 490    }
 491
 492    async fn handle_room_updated(
 493        this: ModelHandle<Self>,
 494        envelope: TypedEnvelope<proto::RoomUpdated>,
 495        _: Arc<Client>,
 496        mut cx: AsyncAppContext,
 497    ) -> Result<()> {
 498        let room = envelope
 499            .payload
 500            .room
 501            .ok_or_else(|| anyhow!("invalid room"))?;
 502        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 503    }
 504
 505    fn apply_room_update(
 506        &mut self,
 507        mut room: proto::Room,
 508        cx: &mut ModelContext<Self>,
 509    ) -> Result<()> {
 510        // Filter ourselves out from the room's participants.
 511        let local_participant_ix = room
 512            .participants
 513            .iter()
 514            .position(|participant| Some(participant.user_id) == self.client.user_id());
 515        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 516
 517        let pending_participant_user_ids = room
 518            .pending_participants
 519            .iter()
 520            .map(|p| p.user_id)
 521            .collect::<Vec<_>>();
 522
 523        let remote_participant_user_ids = room
 524            .participants
 525            .iter()
 526            .map(|p| p.user_id)
 527            .collect::<Vec<_>>();
 528
 529        let (remote_participants, pending_participants) =
 530            self.user_store.update(cx, move |user_store, cx| {
 531                (
 532                    user_store.get_users(remote_participant_user_ids, cx),
 533                    user_store.get_users(pending_participant_user_ids, cx),
 534                )
 535            });
 536
 537        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 538            let (remote_participants, pending_participants) =
 539                futures::join!(remote_participants, pending_participants);
 540
 541            this.update(&mut cx, |this, cx| {
 542                this.participant_user_ids.clear();
 543
 544                if let Some(participant) = local_participant {
 545                    this.local_participant.projects = participant.projects;
 546                } else {
 547                    this.local_participant.projects.clear();
 548                }
 549
 550                if let Some(participants) = remote_participants.log_err() {
 551                    for (participant, user) in room.participants.into_iter().zip(participants) {
 552                        let Some(peer_id) = participant.peer_id else { continue };
 553                        this.participant_user_ids.insert(participant.user_id);
 554
 555                        let old_projects = this
 556                            .remote_participants
 557                            .get(&participant.user_id)
 558                            .into_iter()
 559                            .flat_map(|existing| &existing.projects)
 560                            .map(|project| project.id)
 561                            .collect::<HashSet<_>>();
 562                        let new_projects = participant
 563                            .projects
 564                            .iter()
 565                            .map(|project| project.id)
 566                            .collect::<HashSet<_>>();
 567
 568                        for project in &participant.projects {
 569                            if !old_projects.contains(&project.id) {
 570                                cx.emit(Event::RemoteProjectShared {
 571                                    owner: user.clone(),
 572                                    project_id: project.id,
 573                                    worktree_root_names: project.worktree_root_names.clone(),
 574                                });
 575                            }
 576                        }
 577
 578                        for unshared_project_id in old_projects.difference(&new_projects) {
 579                            this.joined_projects.retain(|project| {
 580                                if let Some(project) = project.upgrade(cx) {
 581                                    project.update(cx, |project, cx| {
 582                                        if project.remote_id() == Some(*unshared_project_id) {
 583                                            project.disconnected_from_host(cx);
 584                                            false
 585                                        } else {
 586                                            true
 587                                        }
 588                                    })
 589                                } else {
 590                                    false
 591                                }
 592                            });
 593                            cx.emit(Event::RemoteProjectUnshared {
 594                                project_id: *unshared_project_id,
 595                            });
 596                        }
 597
 598                        let location = ParticipantLocation::from_proto(participant.location)
 599                            .unwrap_or(ParticipantLocation::External);
 600                        if let Some(remote_participant) =
 601                            this.remote_participants.get_mut(&participant.user_id)
 602                        {
 603                            remote_participant.projects = participant.projects;
 604                            remote_participant.peer_id = peer_id;
 605                            if location != remote_participant.location {
 606                                remote_participant.location = location;
 607                                cx.emit(Event::ParticipantLocationChanged {
 608                                    participant_id: peer_id,
 609                                });
 610                            }
 611                        } else {
 612                            this.remote_participants.insert(
 613                                participant.user_id,
 614                                RemoteParticipant {
 615                                    user: user.clone(),
 616                                    peer_id,
 617                                    projects: participant.projects,
 618                                    location,
 619                                    tracks: Default::default(),
 620                                },
 621                            );
 622
 623                            if let Some(live_kit) = this.live_kit.as_ref() {
 624                                let tracks =
 625                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 626                                for track in tracks {
 627                                    this.remote_video_track_updated(
 628                                        RemoteVideoTrackUpdate::Subscribed(track),
 629                                        cx,
 630                                    )
 631                                    .log_err();
 632                                }
 633                            }
 634                        }
 635                    }
 636
 637                    this.remote_participants.retain(|user_id, participant| {
 638                        if this.participant_user_ids.contains(user_id) {
 639                            true
 640                        } else {
 641                            for project in &participant.projects {
 642                                cx.emit(Event::RemoteProjectUnshared {
 643                                    project_id: project.id,
 644                                });
 645                            }
 646                            false
 647                        }
 648                    });
 649                }
 650
 651                if let Some(pending_participants) = pending_participants.log_err() {
 652                    this.pending_participants = pending_participants;
 653                    for participant in &this.pending_participants {
 654                        this.participant_user_ids.insert(participant.id);
 655                    }
 656                }
 657
 658                this.follows_by_leader_id_project_id.clear();
 659                for follower in room.followers {
 660                    let project_id = follower.project_id;
 661                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 662                        (Some(leader), Some(follower)) => (leader, follower),
 663
 664                        _ => {
 665                            log::error!("Follower message {follower:?} missing some state");
 666                            continue;
 667                        }
 668                    };
 669
 670                    let list = this
 671                        .follows_by_leader_id_project_id
 672                        .entry((leader, project_id))
 673                        .or_insert(Vec::new());
 674                    if !list.contains(&follower) {
 675                        list.push(follower);
 676                    }
 677                }
 678
 679                this.pending_room_update.take();
 680                if this.should_leave() {
 681                    log::info!("room is empty, leaving");
 682                    let _ = this.leave(cx);
 683                }
 684
 685                this.check_invariants();
 686                cx.notify();
 687            });
 688        }));
 689
 690        cx.notify();
 691        Ok(())
 692    }
 693
 694    fn remote_video_track_updated(
 695        &mut self,
 696        change: RemoteVideoTrackUpdate,
 697        cx: &mut ModelContext<Self>,
 698    ) -> Result<()> {
 699        match change {
 700            RemoteVideoTrackUpdate::Subscribed(track) => {
 701                let user_id = track.publisher_id().parse()?;
 702                let track_id = track.sid().to_string();
 703                let participant = self
 704                    .remote_participants
 705                    .get_mut(&user_id)
 706                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 707                participant.tracks.insert(
 708                    track_id.clone(),
 709                    Arc::new(RemoteVideoTrack {
 710                        live_kit_track: track,
 711                    }),
 712                );
 713                cx.emit(Event::RemoteVideoTracksChanged {
 714                    participant_id: participant.peer_id,
 715                });
 716            }
 717            RemoteVideoTrackUpdate::Unsubscribed {
 718                publisher_id,
 719                track_id,
 720            } => {
 721                let user_id = publisher_id.parse()?;
 722                let participant = self
 723                    .remote_participants
 724                    .get_mut(&user_id)
 725                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 726                participant.tracks.remove(&track_id);
 727                cx.emit(Event::RemoteVideoTracksChanged {
 728                    participant_id: participant.peer_id,
 729                });
 730            }
 731        }
 732
 733        cx.notify();
 734        Ok(())
 735    }
 736
 737    fn check_invariants(&self) {
 738        #[cfg(any(test, feature = "test-support"))]
 739        {
 740            for participant in self.remote_participants.values() {
 741                assert!(self.participant_user_ids.contains(&participant.user.id));
 742                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 743            }
 744
 745            for participant in &self.pending_participants {
 746                assert!(self.participant_user_ids.contains(&participant.id));
 747                assert_ne!(participant.id, self.client.user_id().unwrap());
 748            }
 749
 750            assert_eq!(
 751                self.participant_user_ids.len(),
 752                self.remote_participants.len() + self.pending_participants.len()
 753            );
 754        }
 755    }
 756
 757    pub(crate) fn call(
 758        &mut self,
 759        called_user_id: u64,
 760        initial_project_id: Option<u64>,
 761        cx: &mut ModelContext<Self>,
 762    ) -> Task<Result<()>> {
 763        if self.status.is_offline() {
 764            return Task::ready(Err(anyhow!("room is offline")));
 765        }
 766
 767        cx.notify();
 768        let client = self.client.clone();
 769        let room_id = self.id;
 770        self.pending_call_count += 1;
 771        cx.spawn(|this, mut cx| async move {
 772            let result = client
 773                .request(proto::Call {
 774                    room_id,
 775                    called_user_id,
 776                    initial_project_id,
 777                })
 778                .await;
 779            this.update(&mut cx, |this, cx| {
 780                this.pending_call_count -= 1;
 781                if this.should_leave() {
 782                    this.leave(cx).detach_and_log_err(cx);
 783                }
 784            });
 785            result?;
 786            Ok(())
 787        })
 788    }
 789
 790    pub fn join_project(
 791        &mut self,
 792        id: u64,
 793        language_registry: Arc<LanguageRegistry>,
 794        fs: Arc<dyn Fs>,
 795        cx: &mut ModelContext<Self>,
 796    ) -> Task<Result<ModelHandle<Project>>> {
 797        let client = self.client.clone();
 798        let user_store = self.user_store.clone();
 799        cx.spawn(|this, mut cx| async move {
 800            let project =
 801                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 802            this.update(&mut cx, |this, cx| {
 803                this.joined_projects.retain(|project| {
 804                    if let Some(project) = project.upgrade(cx) {
 805                        !project.read(cx).is_read_only()
 806                    } else {
 807                        false
 808                    }
 809                });
 810                this.joined_projects.insert(project.downgrade());
 811            });
 812            Ok(project)
 813        })
 814    }
 815
 816    pub(crate) fn share_project(
 817        &mut self,
 818        project: ModelHandle<Project>,
 819        cx: &mut ModelContext<Self>,
 820    ) -> Task<Result<u64>> {
 821        if let Some(project_id) = project.read(cx).remote_id() {
 822            return Task::ready(Ok(project_id));
 823        }
 824
 825        let request = self.client.request(proto::ShareProject {
 826            room_id: self.id(),
 827            worktrees: project.read(cx).worktree_metadata_protos(cx),
 828        });
 829        cx.spawn(|this, mut cx| async move {
 830            let response = request.await?;
 831
 832            project.update(&mut cx, |project, cx| {
 833                project.shared(response.project_id, cx)
 834            })?;
 835
 836            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 837            this.update(&mut cx, |this, cx| {
 838                this.shared_projects.insert(project.downgrade());
 839                let active_project = this.local_participant.active_project.as_ref();
 840                if active_project.map_or(false, |location| *location == project) {
 841                    this.set_location(Some(&project), cx)
 842                } else {
 843                    Task::ready(Ok(()))
 844                }
 845            })
 846            .await?;
 847
 848            Ok(response.project_id)
 849        })
 850    }
 851
 852    pub(crate) fn unshare_project(
 853        &mut self,
 854        project: ModelHandle<Project>,
 855        cx: &mut ModelContext<Self>,
 856    ) -> Result<()> {
 857        let project_id = match project.read(cx).remote_id() {
 858            Some(project_id) => project_id,
 859            None => return Ok(()),
 860        };
 861
 862        self.client.send(proto::UnshareProject { project_id })?;
 863        project.update(cx, |this, cx| this.unshare(cx))
 864    }
 865
 866    pub(crate) fn set_location(
 867        &mut self,
 868        project: Option<&ModelHandle<Project>>,
 869        cx: &mut ModelContext<Self>,
 870    ) -> Task<Result<()>> {
 871        if self.status.is_offline() {
 872            return Task::ready(Err(anyhow!("room is offline")));
 873        }
 874
 875        let client = self.client.clone();
 876        let room_id = self.id;
 877        let location = if let Some(project) = project {
 878            self.local_participant.active_project = Some(project.downgrade());
 879            if let Some(project_id) = project.read(cx).remote_id() {
 880                proto::participant_location::Variant::SharedProject(
 881                    proto::participant_location::SharedProject { id: project_id },
 882                )
 883            } else {
 884                proto::participant_location::Variant::UnsharedProject(
 885                    proto::participant_location::UnsharedProject {},
 886                )
 887            }
 888        } else {
 889            self.local_participant.active_project = None;
 890            proto::participant_location::Variant::External(proto::participant_location::External {})
 891        };
 892
 893        cx.notify();
 894        cx.foreground().spawn(async move {
 895            client
 896                .request(proto::UpdateParticipantLocation {
 897                    room_id,
 898                    location: Some(proto::ParticipantLocation {
 899                        variant: Some(location),
 900                    }),
 901                })
 902                .await?;
 903            Ok(())
 904        })
 905    }
 906
 907    pub fn is_screen_sharing(&self) -> bool {
 908        self.live_kit.as_ref().map_or(false, |live_kit| {
 909            !matches!(live_kit.screen_track, ScreenTrack::None)
 910        })
 911    }
 912
 913    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 914        if self.status.is_offline() {
 915            return Task::ready(Err(anyhow!("room is offline")));
 916        } else if self.is_screen_sharing() {
 917            return Task::ready(Err(anyhow!("screen was already shared")));
 918        }
 919
 920        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 921            let publish_id = post_inc(&mut live_kit.next_publish_id);
 922            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 923            cx.notify();
 924            (live_kit.room.display_sources(), publish_id)
 925        } else {
 926            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 927        };
 928
 929        cx.spawn_weak(|this, mut cx| async move {
 930            let publish_track = async {
 931                let displays = displays.await?;
 932                let display = displays
 933                    .first()
 934                    .ok_or_else(|| anyhow!("no display found"))?;
 935                let track = LocalVideoTrack::screen_share_for_display(&display);
 936                this.upgrade(&cx)
 937                    .ok_or_else(|| anyhow!("room was dropped"))?
 938                    .read_with(&cx, |this, _| {
 939                        this.live_kit
 940                            .as_ref()
 941                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 942                    })
 943                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 944                    .await
 945            };
 946
 947            let publication = publish_track.await;
 948            this.upgrade(&cx)
 949                .ok_or_else(|| anyhow!("room was dropped"))?
 950                .update(&mut cx, |this, cx| {
 951                    let live_kit = this
 952                        .live_kit
 953                        .as_mut()
 954                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 955
 956                    let canceled = if let ScreenTrack::Pending {
 957                        publish_id: cur_publish_id,
 958                    } = &live_kit.screen_track
 959                    {
 960                        *cur_publish_id != publish_id
 961                    } else {
 962                        true
 963                    };
 964
 965                    match publication {
 966                        Ok(publication) => {
 967                            if canceled {
 968                                live_kit.room.unpublish_track(publication);
 969                            } else {
 970                                live_kit.screen_track = ScreenTrack::Published(publication);
 971                                cx.notify();
 972                            }
 973                            Ok(())
 974                        }
 975                        Err(error) => {
 976                            if canceled {
 977                                Ok(())
 978                            } else {
 979                                live_kit.screen_track = ScreenTrack::None;
 980                                cx.notify();
 981                                Err(error)
 982                            }
 983                        }
 984                    }
 985                })
 986        })
 987    }
 988
 989    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 990        if self.status.is_offline() {
 991            return Err(anyhow!("room is offline"));
 992        }
 993
 994        let live_kit = self
 995            .live_kit
 996            .as_mut()
 997            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 998        match mem::take(&mut live_kit.screen_track) {
 999            ScreenTrack::None => Err(anyhow!("screen was not shared")),
1000            ScreenTrack::Pending { .. } => {
1001                cx.notify();
1002                Ok(())
1003            }
1004            ScreenTrack::Published(track) => {
1005                live_kit.room.unpublish_track(track);
1006                cx.notify();
1007                Ok(())
1008            }
1009        }
1010    }
1011
1012    #[cfg(any(test, feature = "test-support"))]
1013    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1014        self.live_kit
1015            .as_ref()
1016            .unwrap()
1017            .room
1018            .set_display_sources(sources);
1019    }
1020}
1021
1022struct LiveKitRoom {
1023    room: Arc<live_kit_client::Room>,
1024    screen_track: ScreenTrack,
1025    next_publish_id: usize,
1026    _maintain_room: Task<()>,
1027    _maintain_tracks: Task<()>,
1028}
1029
1030enum ScreenTrack {
1031    None,
1032    Pending { publish_id: usize },
1033    Published(LocalTrackPublication),
1034}
1035
1036impl Default for ScreenTrack {
1037    fn default() -> Self {
1038        Self::None
1039    }
1040}
1041
1042#[derive(Copy, Clone, PartialEq, Eq)]
1043pub enum RoomStatus {
1044    Online,
1045    Rejoining,
1046    Offline,
1047}
1048
1049impl RoomStatus {
1050    pub fn is_offline(&self) -> bool {
1051        matches!(self, RoomStatus::Offline)
1052    }
1053
1054    pub fn is_online(&self) -> bool {
1055        matches!(self, RoomStatus::Online)
1056    }
1057}