room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{
  14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  15};
  16use language::LanguageRegistry;
  17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  18use postage::stream::Stream;
  19use project::Project;
  20use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    ParticipantLocationChanged {
  28        participant_id: proto::PeerId,
  29    },
  30    RemoteVideoTracksChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteProjectShared {
  34        owner: Arc<User>,
  35        project_id: u64,
  36        worktree_root_names: Vec<String>,
  37    },
  38    RemoteProjectUnshared {
  39        project_id: u64,
  40    },
  41    Left,
  42}
  43
  44pub struct Room {
  45    id: u64,
  46    live_kit: Option<LiveKitRoom>,
  47    status: RoomStatus,
  48    shared_projects: HashSet<WeakModelHandle<Project>>,
  49    joined_projects: HashSet<WeakModelHandle<Project>>,
  50    local_participant: LocalParticipant,
  51    remote_participants: BTreeMap<u64, RemoteParticipant>,
  52    pending_participants: Vec<Arc<User>>,
  53    participant_user_ids: HashSet<u64>,
  54    pending_call_count: usize,
  55    leave_when_empty: bool,
  56    client: Arc<Client>,
  57    user_store: ModelHandle<UserStore>,
  58    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  59    subscriptions: Vec<client::Subscription>,
  60    pending_room_update: Option<Task<()>>,
  61    maintain_connection: Option<Task<Option<()>>>,
  62}
  63
  64impl Entity for Room {
  65    type Event = Event;
  66
  67    fn release(&mut self, cx: &mut MutableAppContext) {
  68        if self.status.is_online() {
  69            self.leave_internal(cx).detach_and_log_err(cx);
  70        }
  71    }
  72
  73    fn app_will_quit(
  74        &mut self,
  75        cx: &mut MutableAppContext,
  76    ) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  77        if self.status.is_online() {
  78            let leave = self.leave_internal(cx);
  79            Some(
  80                cx.background()
  81                    .spawn(async move {
  82                        leave.await.log_err();
  83                    })
  84                    .boxed(),
  85            )
  86        } else {
  87            None
  88        }
  89    }
  90}
  91
  92impl Room {
  93    fn new(
  94        id: u64,
  95        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  96        client: Arc<Client>,
  97        user_store: ModelHandle<UserStore>,
  98        cx: &mut ModelContext<Self>,
  99    ) -> Self {
 100        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 101            let room = live_kit_client::Room::new();
 102            let mut status = room.status();
 103            // Consume the initial status of the room.
 104            let _ = status.try_recv();
 105            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 106                while let Some(status) = status.next().await {
 107                    let this = if let Some(this) = this.upgrade(&cx) {
 108                        this
 109                    } else {
 110                        break;
 111                    };
 112
 113                    if status == live_kit_client::ConnectionState::Disconnected {
 114                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 115                        break;
 116                    }
 117                }
 118            });
 119
 120            let mut track_changes = room.remote_video_track_updates();
 121            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 122                while let Some(track_change) = track_changes.next().await {
 123                    let this = if let Some(this) = this.upgrade(&cx) {
 124                        this
 125                    } else {
 126                        break;
 127                    };
 128
 129                    this.update(&mut cx, |this, cx| {
 130                        this.remote_video_track_updated(track_change, cx).log_err()
 131                    });
 132                }
 133            });
 134
 135            cx.foreground()
 136                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 137                .detach_and_log_err(cx);
 138
 139            Some(LiveKitRoom {
 140                room,
 141                screen_track: ScreenTrack::None,
 142                next_publish_id: 0,
 143                _maintain_room,
 144                _maintain_tracks,
 145            })
 146        } else {
 147            None
 148        };
 149
 150        let maintain_connection =
 151            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 152
 153        Self {
 154            id,
 155            live_kit: live_kit_room,
 156            status: RoomStatus::Online,
 157            shared_projects: Default::default(),
 158            joined_projects: Default::default(),
 159            participant_user_ids: Default::default(),
 160            local_participant: Default::default(),
 161            remote_participants: Default::default(),
 162            pending_participants: Default::default(),
 163            pending_call_count: 0,
 164            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 165            leave_when_empty: false,
 166            pending_room_update: None,
 167            client,
 168            user_store,
 169            follows_by_leader_id_project_id: Default::default(),
 170            maintain_connection: Some(maintain_connection),
 171        }
 172    }
 173
 174    pub(crate) fn create(
 175        called_user_id: u64,
 176        initial_project: Option<ModelHandle<Project>>,
 177        client: Arc<Client>,
 178        user_store: ModelHandle<UserStore>,
 179        cx: &mut MutableAppContext,
 180    ) -> Task<Result<ModelHandle<Self>>> {
 181        cx.spawn(|mut cx| async move {
 182            let response = client.request(proto::CreateRoom {}).await?;
 183            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 184            let room = cx.add_model(|cx| {
 185                Self::new(
 186                    room_proto.id,
 187                    response.live_kit_connection_info,
 188                    client,
 189                    user_store,
 190                    cx,
 191                )
 192            });
 193
 194            let initial_project_id = if let Some(initial_project) = initial_project {
 195                let initial_project_id = room
 196                    .update(&mut cx, |room, cx| {
 197                        room.share_project(initial_project.clone(), cx)
 198                    })
 199                    .await?;
 200                Some(initial_project_id)
 201            } else {
 202                None
 203            };
 204
 205            match room
 206                .update(&mut cx, |room, cx| {
 207                    room.leave_when_empty = true;
 208                    room.call(called_user_id, initial_project_id, cx)
 209                })
 210                .await
 211            {
 212                Ok(()) => Ok(room),
 213                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 214            }
 215        })
 216    }
 217
 218    pub(crate) fn join(
 219        call: &IncomingCall,
 220        client: Arc<Client>,
 221        user_store: ModelHandle<UserStore>,
 222        cx: &mut MutableAppContext,
 223    ) -> Task<Result<ModelHandle<Self>>> {
 224        let room_id = call.room_id;
 225        cx.spawn(|mut cx| async move {
 226            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 227            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 228            let room = cx.add_model(|cx| {
 229                Self::new(
 230                    room_id,
 231                    response.live_kit_connection_info,
 232                    client,
 233                    user_store,
 234                    cx,
 235                )
 236            });
 237            room.update(&mut cx, |room, cx| {
 238                room.leave_when_empty = true;
 239                room.apply_room_update(room_proto, cx)?;
 240                anyhow::Ok(())
 241            })?;
 242            Ok(room)
 243        })
 244    }
 245
 246    fn should_leave(&self) -> bool {
 247        self.leave_when_empty
 248            && self.pending_room_update.is_none()
 249            && self.pending_participants.is_empty()
 250            && self.remote_participants.is_empty()
 251            && self.pending_call_count == 0
 252    }
 253
 254    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 255        cx.notify();
 256        cx.emit(Event::Left);
 257        self.leave_internal(cx)
 258    }
 259
 260    fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task<Result<()>> {
 261        if self.status.is_offline() {
 262            return Task::ready(Err(anyhow!("room is offline")));
 263        }
 264
 265        log::info!("leaving room");
 266
 267        for project in self.shared_projects.drain() {
 268            if let Some(project) = project.upgrade(cx) {
 269                project.update(cx, |project, cx| {
 270                    project.unshare(cx).log_err();
 271                });
 272            }
 273        }
 274        for project in self.joined_projects.drain() {
 275            if let Some(project) = project.upgrade(cx) {
 276                project.update(cx, |project, cx| {
 277                    project.disconnected_from_host(cx);
 278                });
 279            }
 280        }
 281
 282        self.status = RoomStatus::Offline;
 283        self.remote_participants.clear();
 284        self.pending_participants.clear();
 285        self.participant_user_ids.clear();
 286        self.subscriptions.clear();
 287        self.live_kit.take();
 288        self.pending_room_update.take();
 289        self.maintain_connection.take();
 290
 291        let leave_room = self.client.request(proto::LeaveRoom {});
 292        cx.background().spawn(async move {
 293            leave_room.await?;
 294            anyhow::Ok(())
 295        })
 296    }
 297
 298    async fn maintain_connection(
 299        this: WeakModelHandle<Self>,
 300        client: Arc<Client>,
 301        mut cx: AsyncAppContext,
 302    ) -> Result<()> {
 303        let mut client_status = client.status();
 304        loop {
 305            let _ = client_status.try_recv();
 306            let is_connected = client_status.borrow().is_connected();
 307            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 308            if !is_connected || client_status.next().await.is_some() {
 309                log::info!("detected client disconnection");
 310
 311                this.upgrade(&cx)
 312                    .ok_or_else(|| anyhow!("room was dropped"))?
 313                    .update(&mut cx, |this, cx| {
 314                        this.status = RoomStatus::Rejoining;
 315                        cx.notify();
 316                    });
 317
 318                // Wait for client to re-establish a connection to the server.
 319                {
 320                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 321                    let client_reconnection = async {
 322                        let mut remaining_attempts = 3;
 323                        while remaining_attempts > 0 {
 324                            if client_status.borrow().is_connected() {
 325                                log::info!("client reconnected, attempting to rejoin room");
 326
 327                                let Some(this) = this.upgrade(&cx) else { break };
 328                                if this
 329                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 330                                    .await
 331                                    .log_err()
 332                                    .is_some()
 333                                {
 334                                    return true;
 335                                } else {
 336                                    remaining_attempts -= 1;
 337                                }
 338                            } else if client_status.borrow().is_signed_out() {
 339                                return false;
 340                            }
 341
 342                            log::info!(
 343                                "waiting for client status change, remaining attempts {}",
 344                                remaining_attempts
 345                            );
 346                            client_status.next().await;
 347                        }
 348                        false
 349                    }
 350                    .fuse();
 351                    futures::pin_mut!(client_reconnection);
 352
 353                    futures::select_biased! {
 354                        reconnected = client_reconnection => {
 355                            if reconnected {
 356                                log::info!("successfully reconnected to room");
 357                                // If we successfully joined the room, go back around the loop
 358                                // waiting for future connection status changes.
 359                                continue;
 360                            }
 361                        }
 362                        _ = reconnection_timeout => {
 363                            log::info!("room reconnection timeout expired");
 364                        }
 365                    }
 366                }
 367
 368                break;
 369            }
 370        }
 371
 372        // The client failed to re-establish a connection to the server
 373        // or an error occurred while trying to re-join the room. Either way
 374        // we leave the room and return an error.
 375        if let Some(this) = this.upgrade(&cx) {
 376            log::info!("reconnection failed, leaving room");
 377            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 378        }
 379        Err(anyhow!(
 380            "can't reconnect to room: client failed to re-establish connection"
 381        ))
 382    }
 383
 384    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 385        let mut projects = HashMap::default();
 386        let mut reshared_projects = Vec::new();
 387        let mut rejoined_projects = Vec::new();
 388        self.shared_projects.retain(|project| {
 389            if let Some(handle) = project.upgrade(cx) {
 390                let project = handle.read(cx);
 391                if let Some(project_id) = project.remote_id() {
 392                    projects.insert(project_id, handle.clone());
 393                    reshared_projects.push(proto::UpdateProject {
 394                        project_id,
 395                        worktrees: project.worktree_metadata_protos(cx),
 396                    });
 397                    return true;
 398                }
 399            }
 400            false
 401        });
 402        self.joined_projects.retain(|project| {
 403            if let Some(handle) = project.upgrade(cx) {
 404                let project = handle.read(cx);
 405                if let Some(project_id) = project.remote_id() {
 406                    projects.insert(project_id, handle.clone());
 407                    rejoined_projects.push(proto::RejoinProject {
 408                        id: project_id,
 409                        worktrees: project
 410                            .worktrees(cx)
 411                            .map(|worktree| {
 412                                let worktree = worktree.read(cx);
 413                                proto::RejoinWorktree {
 414                                    id: worktree.id().to_proto(),
 415                                    scan_id: worktree.completed_scan_id() as u64,
 416                                }
 417                            })
 418                            .collect(),
 419                    });
 420                }
 421                return true;
 422            }
 423            false
 424        });
 425
 426        let response = self.client.request(proto::RejoinRoom {
 427            id: self.id,
 428            reshared_projects,
 429            rejoined_projects,
 430        });
 431
 432        cx.spawn(|this, mut cx| async move {
 433            let response = response.await?;
 434            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 435            this.update(&mut cx, |this, cx| {
 436                this.status = RoomStatus::Online;
 437                this.apply_room_update(room_proto, cx)?;
 438
 439                for reshared_project in response.reshared_projects {
 440                    if let Some(project) = projects.get(&reshared_project.id) {
 441                        project.update(cx, |project, cx| {
 442                            project.reshared(reshared_project, cx).log_err();
 443                        });
 444                    }
 445                }
 446
 447                for rejoined_project in response.rejoined_projects {
 448                    if let Some(project) = projects.get(&rejoined_project.id) {
 449                        project.update(cx, |project, cx| {
 450                            project.rejoined(rejoined_project, cx).log_err();
 451                        });
 452                    }
 453                }
 454
 455                anyhow::Ok(())
 456            })
 457        })
 458    }
 459
 460    pub fn id(&self) -> u64 {
 461        self.id
 462    }
 463
 464    pub fn status(&self) -> RoomStatus {
 465        self.status
 466    }
 467
 468    pub fn local_participant(&self) -> &LocalParticipant {
 469        &self.local_participant
 470    }
 471
 472    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 473        &self.remote_participants
 474    }
 475
 476    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 477        self.remote_participants
 478            .values()
 479            .find(|p| p.peer_id == peer_id)
 480    }
 481
 482    pub fn pending_participants(&self) -> &[Arc<User>] {
 483        &self.pending_participants
 484    }
 485
 486    pub fn contains_participant(&self, user_id: u64) -> bool {
 487        self.participant_user_ids.contains(&user_id)
 488    }
 489
 490    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 491        self.follows_by_leader_id_project_id
 492            .get(&(leader_id, project_id))
 493            .map_or(&[], |v| v.as_slice())
 494    }
 495
 496    async fn handle_room_updated(
 497        this: ModelHandle<Self>,
 498        envelope: TypedEnvelope<proto::RoomUpdated>,
 499        _: Arc<Client>,
 500        mut cx: AsyncAppContext,
 501    ) -> Result<()> {
 502        let room = envelope
 503            .payload
 504            .room
 505            .ok_or_else(|| anyhow!("invalid room"))?;
 506        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 507    }
 508
 509    fn apply_room_update(
 510        &mut self,
 511        mut room: proto::Room,
 512        cx: &mut ModelContext<Self>,
 513    ) -> Result<()> {
 514        // Filter ourselves out from the room's participants.
 515        let local_participant_ix = room
 516            .participants
 517            .iter()
 518            .position(|participant| Some(participant.user_id) == self.client.user_id());
 519        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 520
 521        let pending_participant_user_ids = room
 522            .pending_participants
 523            .iter()
 524            .map(|p| p.user_id)
 525            .collect::<Vec<_>>();
 526
 527        let remote_participant_user_ids = room
 528            .participants
 529            .iter()
 530            .map(|p| p.user_id)
 531            .collect::<Vec<_>>();
 532
 533        let (remote_participants, pending_participants) =
 534            self.user_store.update(cx, move |user_store, cx| {
 535                (
 536                    user_store.get_users(remote_participant_user_ids, cx),
 537                    user_store.get_users(pending_participant_user_ids, cx),
 538                )
 539            });
 540
 541        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 542            let (remote_participants, pending_participants) =
 543                futures::join!(remote_participants, pending_participants);
 544
 545            this.update(&mut cx, |this, cx| {
 546                this.participant_user_ids.clear();
 547
 548                if let Some(participant) = local_participant {
 549                    this.local_participant.projects = participant.projects;
 550                } else {
 551                    this.local_participant.projects.clear();
 552                }
 553
 554                if let Some(participants) = remote_participants.log_err() {
 555                    for (participant, user) in room.participants.into_iter().zip(participants) {
 556                        let Some(peer_id) = participant.peer_id else { continue };
 557                        this.participant_user_ids.insert(participant.user_id);
 558
 559                        let old_projects = this
 560                            .remote_participants
 561                            .get(&participant.user_id)
 562                            .into_iter()
 563                            .flat_map(|existing| &existing.projects)
 564                            .map(|project| project.id)
 565                            .collect::<HashSet<_>>();
 566                        let new_projects = participant
 567                            .projects
 568                            .iter()
 569                            .map(|project| project.id)
 570                            .collect::<HashSet<_>>();
 571
 572                        for project in &participant.projects {
 573                            if !old_projects.contains(&project.id) {
 574                                cx.emit(Event::RemoteProjectShared {
 575                                    owner: user.clone(),
 576                                    project_id: project.id,
 577                                    worktree_root_names: project.worktree_root_names.clone(),
 578                                });
 579                            }
 580                        }
 581
 582                        for unshared_project_id in old_projects.difference(&new_projects) {
 583                            this.joined_projects.retain(|project| {
 584                                if let Some(project) = project.upgrade(cx) {
 585                                    project.update(cx, |project, cx| {
 586                                        if project.remote_id() == Some(*unshared_project_id) {
 587                                            project.disconnected_from_host(cx);
 588                                            false
 589                                        } else {
 590                                            true
 591                                        }
 592                                    })
 593                                } else {
 594                                    false
 595                                }
 596                            });
 597                            cx.emit(Event::RemoteProjectUnshared {
 598                                project_id: *unshared_project_id,
 599                            });
 600                        }
 601
 602                        let location = ParticipantLocation::from_proto(participant.location)
 603                            .unwrap_or(ParticipantLocation::External);
 604                        if let Some(remote_participant) =
 605                            this.remote_participants.get_mut(&participant.user_id)
 606                        {
 607                            remote_participant.projects = participant.projects;
 608                            remote_participant.peer_id = peer_id;
 609                            if location != remote_participant.location {
 610                                remote_participant.location = location;
 611                                cx.emit(Event::ParticipantLocationChanged {
 612                                    participant_id: peer_id,
 613                                });
 614                            }
 615                        } else {
 616                            this.remote_participants.insert(
 617                                participant.user_id,
 618                                RemoteParticipant {
 619                                    user: user.clone(),
 620                                    peer_id,
 621                                    projects: participant.projects,
 622                                    location,
 623                                    tracks: Default::default(),
 624                                },
 625                            );
 626
 627                            if let Some(live_kit) = this.live_kit.as_ref() {
 628                                let tracks =
 629                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 630                                for track in tracks {
 631                                    this.remote_video_track_updated(
 632                                        RemoteVideoTrackUpdate::Subscribed(track),
 633                                        cx,
 634                                    )
 635                                    .log_err();
 636                                }
 637                            }
 638                        }
 639                    }
 640
 641                    this.remote_participants.retain(|user_id, participant| {
 642                        if this.participant_user_ids.contains(user_id) {
 643                            true
 644                        } else {
 645                            for project in &participant.projects {
 646                                cx.emit(Event::RemoteProjectUnshared {
 647                                    project_id: project.id,
 648                                });
 649                            }
 650                            false
 651                        }
 652                    });
 653                }
 654
 655                if let Some(pending_participants) = pending_participants.log_err() {
 656                    this.pending_participants = pending_participants;
 657                    for participant in &this.pending_participants {
 658                        this.participant_user_ids.insert(participant.id);
 659                    }
 660                }
 661
 662                this.follows_by_leader_id_project_id.clear();
 663                for follower in room.followers {
 664                    let project_id = follower.project_id;
 665                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 666                        (Some(leader), Some(follower)) => (leader, follower),
 667
 668                        _ => {
 669                            log::error!("Follower message {follower:?} missing some state");
 670                            continue;
 671                        }
 672                    };
 673
 674                    let list = this
 675                        .follows_by_leader_id_project_id
 676                        .entry((leader, project_id))
 677                        .or_insert(Vec::new());
 678                    if !list.contains(&follower) {
 679                        list.push(follower);
 680                    }
 681                }
 682
 683                this.pending_room_update.take();
 684                if this.should_leave() {
 685                    log::info!("room is empty, leaving");
 686                    let _ = this.leave(cx);
 687                }
 688
 689                this.check_invariants();
 690                cx.notify();
 691            });
 692        }));
 693
 694        cx.notify();
 695        Ok(())
 696    }
 697
 698    fn remote_video_track_updated(
 699        &mut self,
 700        change: RemoteVideoTrackUpdate,
 701        cx: &mut ModelContext<Self>,
 702    ) -> Result<()> {
 703        match change {
 704            RemoteVideoTrackUpdate::Subscribed(track) => {
 705                let user_id = track.publisher_id().parse()?;
 706                let track_id = track.sid().to_string();
 707                let participant = self
 708                    .remote_participants
 709                    .get_mut(&user_id)
 710                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 711                participant.tracks.insert(
 712                    track_id.clone(),
 713                    Arc::new(RemoteVideoTrack {
 714                        live_kit_track: track,
 715                    }),
 716                );
 717                cx.emit(Event::RemoteVideoTracksChanged {
 718                    participant_id: participant.peer_id,
 719                });
 720            }
 721            RemoteVideoTrackUpdate::Unsubscribed {
 722                publisher_id,
 723                track_id,
 724            } => {
 725                let user_id = publisher_id.parse()?;
 726                let participant = self
 727                    .remote_participants
 728                    .get_mut(&user_id)
 729                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 730                participant.tracks.remove(&track_id);
 731                cx.emit(Event::RemoteVideoTracksChanged {
 732                    participant_id: participant.peer_id,
 733                });
 734            }
 735        }
 736
 737        cx.notify();
 738        Ok(())
 739    }
 740
 741    fn check_invariants(&self) {
 742        #[cfg(any(test, feature = "test-support"))]
 743        {
 744            for participant in self.remote_participants.values() {
 745                assert!(self.participant_user_ids.contains(&participant.user.id));
 746                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 747            }
 748
 749            for participant in &self.pending_participants {
 750                assert!(self.participant_user_ids.contains(&participant.id));
 751                assert_ne!(participant.id, self.client.user_id().unwrap());
 752            }
 753
 754            assert_eq!(
 755                self.participant_user_ids.len(),
 756                self.remote_participants.len() + self.pending_participants.len()
 757            );
 758        }
 759    }
 760
 761    pub(crate) fn call(
 762        &mut self,
 763        called_user_id: u64,
 764        initial_project_id: Option<u64>,
 765        cx: &mut ModelContext<Self>,
 766    ) -> Task<Result<()>> {
 767        if self.status.is_offline() {
 768            return Task::ready(Err(anyhow!("room is offline")));
 769        }
 770
 771        cx.notify();
 772        let client = self.client.clone();
 773        let room_id = self.id;
 774        self.pending_call_count += 1;
 775        cx.spawn(|this, mut cx| async move {
 776            let result = client
 777                .request(proto::Call {
 778                    room_id,
 779                    called_user_id,
 780                    initial_project_id,
 781                })
 782                .await;
 783            this.update(&mut cx, |this, cx| {
 784                this.pending_call_count -= 1;
 785                if this.should_leave() {
 786                    this.leave(cx).detach_and_log_err(cx);
 787                }
 788            });
 789            result?;
 790            Ok(())
 791        })
 792    }
 793
 794    pub fn join_project(
 795        &mut self,
 796        id: u64,
 797        language_registry: Arc<LanguageRegistry>,
 798        fs: Arc<dyn Fs>,
 799        cx: &mut ModelContext<Self>,
 800    ) -> Task<Result<ModelHandle<Project>>> {
 801        let client = self.client.clone();
 802        let user_store = self.user_store.clone();
 803        cx.spawn(|this, mut cx| async move {
 804            let project =
 805                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 806            this.update(&mut cx, |this, cx| {
 807                this.joined_projects.retain(|project| {
 808                    if let Some(project) = project.upgrade(cx) {
 809                        !project.read(cx).is_read_only()
 810                    } else {
 811                        false
 812                    }
 813                });
 814                this.joined_projects.insert(project.downgrade());
 815            });
 816            Ok(project)
 817        })
 818    }
 819
 820    pub(crate) fn share_project(
 821        &mut self,
 822        project: ModelHandle<Project>,
 823        cx: &mut ModelContext<Self>,
 824    ) -> Task<Result<u64>> {
 825        if let Some(project_id) = project.read(cx).remote_id() {
 826            return Task::ready(Ok(project_id));
 827        }
 828
 829        let request = self.client.request(proto::ShareProject {
 830            room_id: self.id(),
 831            worktrees: project.read(cx).worktree_metadata_protos(cx),
 832        });
 833        cx.spawn(|this, mut cx| async move {
 834            let response = request.await?;
 835
 836            project.update(&mut cx, |project, cx| {
 837                project.shared(response.project_id, cx)
 838            })?;
 839
 840            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 841            this.update(&mut cx, |this, cx| {
 842                this.shared_projects.insert(project.downgrade());
 843                let active_project = this.local_participant.active_project.as_ref();
 844                if active_project.map_or(false, |location| *location == project) {
 845                    this.set_location(Some(&project), cx)
 846                } else {
 847                    Task::ready(Ok(()))
 848                }
 849            })
 850            .await?;
 851
 852            Ok(response.project_id)
 853        })
 854    }
 855
 856    pub(crate) fn unshare_project(
 857        &mut self,
 858        project: ModelHandle<Project>,
 859        cx: &mut ModelContext<Self>,
 860    ) -> Result<()> {
 861        let project_id = match project.read(cx).remote_id() {
 862            Some(project_id) => project_id,
 863            None => return Ok(()),
 864        };
 865
 866        self.client.send(proto::UnshareProject { project_id })?;
 867        project.update(cx, |this, cx| this.unshare(cx))
 868    }
 869
 870    pub(crate) fn set_location(
 871        &mut self,
 872        project: Option<&ModelHandle<Project>>,
 873        cx: &mut ModelContext<Self>,
 874    ) -> Task<Result<()>> {
 875        if self.status.is_offline() {
 876            return Task::ready(Err(anyhow!("room is offline")));
 877        }
 878
 879        let client = self.client.clone();
 880        let room_id = self.id;
 881        let location = if let Some(project) = project {
 882            self.local_participant.active_project = Some(project.downgrade());
 883            if let Some(project_id) = project.read(cx).remote_id() {
 884                proto::participant_location::Variant::SharedProject(
 885                    proto::participant_location::SharedProject { id: project_id },
 886                )
 887            } else {
 888                proto::participant_location::Variant::UnsharedProject(
 889                    proto::participant_location::UnsharedProject {},
 890                )
 891            }
 892        } else {
 893            self.local_participant.active_project = None;
 894            proto::participant_location::Variant::External(proto::participant_location::External {})
 895        };
 896
 897        cx.notify();
 898        cx.foreground().spawn(async move {
 899            client
 900                .request(proto::UpdateParticipantLocation {
 901                    room_id,
 902                    location: Some(proto::ParticipantLocation {
 903                        variant: Some(location),
 904                    }),
 905                })
 906                .await?;
 907            Ok(())
 908        })
 909    }
 910
 911    pub fn is_screen_sharing(&self) -> bool {
 912        self.live_kit.as_ref().map_or(false, |live_kit| {
 913            !matches!(live_kit.screen_track, ScreenTrack::None)
 914        })
 915    }
 916
 917    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 918        if self.status.is_offline() {
 919            return Task::ready(Err(anyhow!("room is offline")));
 920        } else if self.is_screen_sharing() {
 921            return Task::ready(Err(anyhow!("screen was already shared")));
 922        }
 923
 924        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 925            let publish_id = post_inc(&mut live_kit.next_publish_id);
 926            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 927            cx.notify();
 928            (live_kit.room.display_sources(), publish_id)
 929        } else {
 930            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 931        };
 932
 933        cx.spawn_weak(|this, mut cx| async move {
 934            let publish_track = async {
 935                let displays = displays.await?;
 936                let display = displays
 937                    .first()
 938                    .ok_or_else(|| anyhow!("no display found"))?;
 939                let track = LocalVideoTrack::screen_share_for_display(&display);
 940                this.upgrade(&cx)
 941                    .ok_or_else(|| anyhow!("room was dropped"))?
 942                    .read_with(&cx, |this, _| {
 943                        this.live_kit
 944                            .as_ref()
 945                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 946                    })
 947                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 948                    .await
 949            };
 950
 951            let publication = publish_track.await;
 952            this.upgrade(&cx)
 953                .ok_or_else(|| anyhow!("room was dropped"))?
 954                .update(&mut cx, |this, cx| {
 955                    let live_kit = this
 956                        .live_kit
 957                        .as_mut()
 958                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 959
 960                    let canceled = if let ScreenTrack::Pending {
 961                        publish_id: cur_publish_id,
 962                    } = &live_kit.screen_track
 963                    {
 964                        *cur_publish_id != publish_id
 965                    } else {
 966                        true
 967                    };
 968
 969                    match publication {
 970                        Ok(publication) => {
 971                            if canceled {
 972                                live_kit.room.unpublish_track(publication);
 973                            } else {
 974                                live_kit.screen_track = ScreenTrack::Published(publication);
 975                                cx.notify();
 976                            }
 977                            Ok(())
 978                        }
 979                        Err(error) => {
 980                            if canceled {
 981                                Ok(())
 982                            } else {
 983                                live_kit.screen_track = ScreenTrack::None;
 984                                cx.notify();
 985                                Err(error)
 986                            }
 987                        }
 988                    }
 989                })
 990        })
 991    }
 992
 993    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 994        if self.status.is_offline() {
 995            return Err(anyhow!("room is offline"));
 996        }
 997
 998        let live_kit = self
 999            .live_kit
1000            .as_mut()
1001            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1002        match mem::take(&mut live_kit.screen_track) {
1003            ScreenTrack::None => Err(anyhow!("screen was not shared")),
1004            ScreenTrack::Pending { .. } => {
1005                cx.notify();
1006                Ok(())
1007            }
1008            ScreenTrack::Published(track) => {
1009                live_kit.room.unpublish_track(track);
1010                cx.notify();
1011                Ok(())
1012            }
1013        }
1014    }
1015
1016    #[cfg(any(test, feature = "test-support"))]
1017    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1018        self.live_kit
1019            .as_ref()
1020            .unwrap()
1021            .room
1022            .set_display_sources(sources);
1023    }
1024}
1025
1026struct LiveKitRoom {
1027    room: Arc<live_kit_client::Room>,
1028    screen_track: ScreenTrack,
1029    next_publish_id: usize,
1030    _maintain_room: Task<()>,
1031    _maintain_tracks: Task<()>,
1032}
1033
1034enum ScreenTrack {
1035    None,
1036    Pending { publish_id: usize },
1037    Published(LocalTrackPublication),
1038}
1039
1040impl Default for ScreenTrack {
1041    fn default() -> Self {
1042        Self::None
1043    }
1044}
1045
1046#[derive(Copy, Clone, PartialEq, Eq)]
1047pub enum RoomStatus {
1048    Online,
1049    Rejoining,
1050    Offline,
1051}
1052
1053impl RoomStatus {
1054    pub fn is_offline(&self) -> bool {
1055        matches!(self, RoomStatus::Offline)
1056    }
1057
1058    pub fn is_online(&self) -> bool {
1059        matches!(self, RoomStatus::Online)
1060    }
1061}