room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{
  14    AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
  15};
  16use language::LanguageRegistry;
  17use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  18use postage::stream::Stream;
  19use project::Project;
  20use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  21use util::{post_inc, ResultExt, TryFutureExt};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25#[derive(Clone, Debug, PartialEq, Eq)]
  26pub enum Event {
  27    ParticipantLocationChanged {
  28        participant_id: proto::PeerId,
  29    },
  30    RemoteVideoTracksChanged {
  31        participant_id: proto::PeerId,
  32    },
  33    RemoteProjectShared {
  34        owner: Arc<User>,
  35        project_id: u64,
  36        worktree_root_names: Vec<String>,
  37    },
  38    RemoteProjectUnshared {
  39        project_id: u64,
  40    },
  41    Left,
  42}
  43
  44pub struct Room {
  45    id: u64,
  46    live_kit: Option<LiveKitRoom>,
  47    status: RoomStatus,
  48    shared_projects: HashSet<WeakModelHandle<Project>>,
  49    joined_projects: HashSet<WeakModelHandle<Project>>,
  50    local_participant: LocalParticipant,
  51    remote_participants: BTreeMap<u64, RemoteParticipant>,
  52    pending_participants: Vec<Arc<User>>,
  53    participant_user_ids: HashSet<u64>,
  54    pending_call_count: usize,
  55    leave_when_empty: bool,
  56    client: Arc<Client>,
  57    user_store: ModelHandle<UserStore>,
  58    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  59    subscriptions: Vec<client::Subscription>,
  60    pending_room_update: Option<Task<()>>,
  61    maintain_connection: Option<Task<Option<()>>>,
  62}
  63
  64impl Entity for Room {
  65    type Event = Event;
  66
  67    fn release(&mut self, cx: &mut MutableAppContext) {
  68        if self.status.is_online() {
  69            self.leave_internal(cx).detach_and_log_err(cx);
  70        }
  71    }
  72
  73    fn app_will_quit(
  74        &mut self,
  75        cx: &mut MutableAppContext,
  76    ) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  77        if self.status.is_online() {
  78            let leave = self.leave_internal(cx);
  79            Some(
  80                cx.background()
  81                    .spawn(async move {
  82                        leave.await.log_err();
  83                    })
  84                    .boxed(),
  85            )
  86        } else {
  87            None
  88        }
  89    }
  90}
  91
  92impl Room {
  93    fn new(
  94        id: u64,
  95        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  96        client: Arc<Client>,
  97        user_store: ModelHandle<UserStore>,
  98        cx: &mut ModelContext<Self>,
  99    ) -> Self {
 100        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 101            let room = live_kit_client::Room::new();
 102            let mut status = room.status();
 103            // Consume the initial status of the room.
 104            let _ = status.try_recv();
 105            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 106                while let Some(status) = status.next().await {
 107                    let this = if let Some(this) = this.upgrade(&cx) {
 108                        this
 109                    } else {
 110                        break;
 111                    };
 112
 113                    if status == live_kit_client::ConnectionState::Disconnected {
 114                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 115                        break;
 116                    }
 117                }
 118            });
 119
 120            let mut track_changes = room.remote_video_track_updates();
 121            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 122                while let Some(track_change) = track_changes.next().await {
 123                    let this = if let Some(this) = this.upgrade(&cx) {
 124                        this
 125                    } else {
 126                        break;
 127                    };
 128
 129                    this.update(&mut cx, |this, cx| {
 130                        this.remote_video_track_updated(track_change, cx).log_err()
 131                    });
 132                }
 133            });
 134
 135            cx.foreground()
 136                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 137                .detach_and_log_err(cx);
 138
 139            Some(LiveKitRoom {
 140                room,
 141                screen_track: ScreenTrack::None,
 142                next_publish_id: 0,
 143                _maintain_room,
 144                _maintain_tracks,
 145            })
 146        } else {
 147            None
 148        };
 149
 150        let maintain_connection =
 151            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 152
 153        Self {
 154            id,
 155            live_kit: live_kit_room,
 156            status: RoomStatus::Online,
 157            shared_projects: Default::default(),
 158            joined_projects: Default::default(),
 159            participant_user_ids: Default::default(),
 160            local_participant: Default::default(),
 161            remote_participants: Default::default(),
 162            pending_participants: Default::default(),
 163            pending_call_count: 0,
 164            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 165            leave_when_empty: false,
 166            pending_room_update: None,
 167            client,
 168            user_store,
 169            follows_by_leader_id_project_id: Default::default(),
 170            maintain_connection: Some(maintain_connection),
 171        }
 172    }
 173
 174    pub(crate) fn create(
 175        called_user_id: u64,
 176        initial_project: Option<ModelHandle<Project>>,
 177        client: Arc<Client>,
 178        user_store: ModelHandle<UserStore>,
 179        cx: &mut MutableAppContext,
 180    ) -> Task<Result<ModelHandle<Self>>> {
 181        cx.spawn(|mut cx| async move {
 182            let response = client.request(proto::CreateRoom {}).await?;
 183            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 184            let room = cx.add_model(|cx| {
 185                Self::new(
 186                    room_proto.id,
 187                    response.live_kit_connection_info,
 188                    client,
 189                    user_store,
 190                    cx,
 191                )
 192            });
 193
 194            let initial_project_id = if let Some(initial_project) = initial_project {
 195                let initial_project_id = room
 196                    .update(&mut cx, |room, cx| {
 197                        room.share_project(initial_project.clone(), cx)
 198                    })
 199                    .await?;
 200                Some(initial_project_id)
 201            } else {
 202                None
 203            };
 204
 205            match room
 206                .update(&mut cx, |room, cx| {
 207                    room.leave_when_empty = true;
 208                    room.call(called_user_id, initial_project_id, cx)
 209                })
 210                .await
 211            {
 212                Ok(()) => Ok(room),
 213                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 214            }
 215        })
 216    }
 217
 218    pub(crate) fn join(
 219        call: &IncomingCall,
 220        client: Arc<Client>,
 221        user_store: ModelHandle<UserStore>,
 222        cx: &mut MutableAppContext,
 223    ) -> Task<Result<ModelHandle<Self>>> {
 224        let room_id = call.room_id;
 225        cx.spawn(|mut cx| async move {
 226            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 227            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 228            let room = cx.add_model(|cx| {
 229                Self::new(
 230                    room_id,
 231                    response.live_kit_connection_info,
 232                    client,
 233                    user_store,
 234                    cx,
 235                )
 236            });
 237            room.update(&mut cx, |room, cx| {
 238                room.leave_when_empty = true;
 239                room.apply_room_update(room_proto, cx)?;
 240                anyhow::Ok(())
 241            })?;
 242            Ok(room)
 243        })
 244    }
 245
 246    fn should_leave(&self) -> bool {
 247        self.leave_when_empty
 248            && self.pending_room_update.is_none()
 249            && self.pending_participants.is_empty()
 250            && self.remote_participants.is_empty()
 251            && self.pending_call_count == 0
 252    }
 253
 254    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 255        cx.notify();
 256        cx.emit(Event::Left);
 257        self.leave_internal(cx)
 258    }
 259
 260    fn leave_internal(&mut self, cx: &mut MutableAppContext) -> Task<Result<()>> {
 261        if self.status.is_offline() {
 262            return Task::ready(Err(anyhow!("room is offline")));
 263        }
 264
 265        log::info!("leaving room");
 266
 267        for project in self.shared_projects.drain() {
 268            if let Some(project) = project.upgrade(cx) {
 269                project.update(cx, |project, cx| {
 270                    project.unshare(cx).log_err();
 271                });
 272            }
 273        }
 274        for project in self.joined_projects.drain() {
 275            if let Some(project) = project.upgrade(cx) {
 276                project.update(cx, |project, cx| {
 277                    project.disconnected_from_host(cx);
 278                    project.close(cx);
 279                });
 280            }
 281        }
 282
 283        self.status = RoomStatus::Offline;
 284        self.remote_participants.clear();
 285        self.pending_participants.clear();
 286        self.participant_user_ids.clear();
 287        self.subscriptions.clear();
 288        self.live_kit.take();
 289        self.pending_room_update.take();
 290        self.maintain_connection.take();
 291
 292        let leave_room = self.client.request(proto::LeaveRoom {});
 293        cx.background().spawn(async move {
 294            leave_room.await?;
 295            anyhow::Ok(())
 296        })
 297    }
 298
 299    async fn maintain_connection(
 300        this: WeakModelHandle<Self>,
 301        client: Arc<Client>,
 302        mut cx: AsyncAppContext,
 303    ) -> Result<()> {
 304        let mut client_status = client.status();
 305        loop {
 306            let _ = client_status.try_recv();
 307            let is_connected = client_status.borrow().is_connected();
 308            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 309            if !is_connected || client_status.next().await.is_some() {
 310                log::info!("detected client disconnection");
 311
 312                this.upgrade(&cx)
 313                    .ok_or_else(|| anyhow!("room was dropped"))?
 314                    .update(&mut cx, |this, cx| {
 315                        this.status = RoomStatus::Rejoining;
 316                        cx.notify();
 317                    });
 318
 319                // Wait for client to re-establish a connection to the server.
 320                {
 321                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 322                    let client_reconnection = async {
 323                        let mut remaining_attempts = 3;
 324                        while remaining_attempts > 0 {
 325                            if client_status.borrow().is_connected() {
 326                                log::info!("client reconnected, attempting to rejoin room");
 327
 328                                let Some(this) = this.upgrade(&cx) else { break };
 329                                if this
 330                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 331                                    .await
 332                                    .log_err()
 333                                    .is_some()
 334                                {
 335                                    return true;
 336                                } else {
 337                                    remaining_attempts -= 1;
 338                                }
 339                            } else if client_status.borrow().is_signed_out() {
 340                                return false;
 341                            }
 342
 343                            log::info!(
 344                                "waiting for client status change, remaining attempts {}",
 345                                remaining_attempts
 346                            );
 347                            client_status.next().await;
 348                        }
 349                        false
 350                    }
 351                    .fuse();
 352                    futures::pin_mut!(client_reconnection);
 353
 354                    futures::select_biased! {
 355                        reconnected = client_reconnection => {
 356                            if reconnected {
 357                                log::info!("successfully reconnected to room");
 358                                // If we successfully joined the room, go back around the loop
 359                                // waiting for future connection status changes.
 360                                continue;
 361                            }
 362                        }
 363                        _ = reconnection_timeout => {
 364                            log::info!("room reconnection timeout expired");
 365                        }
 366                    }
 367                }
 368
 369                break;
 370            }
 371        }
 372
 373        // The client failed to re-establish a connection to the server
 374        // or an error occurred while trying to re-join the room. Either way
 375        // we leave the room and return an error.
 376        if let Some(this) = this.upgrade(&cx) {
 377            log::info!("reconnection failed, leaving room");
 378            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 379        }
 380        Err(anyhow!(
 381            "can't reconnect to room: client failed to re-establish connection"
 382        ))
 383    }
 384
 385    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 386        let mut projects = HashMap::default();
 387        let mut reshared_projects = Vec::new();
 388        let mut rejoined_projects = Vec::new();
 389        self.shared_projects.retain(|project| {
 390            if let Some(handle) = project.upgrade(cx) {
 391                let project = handle.read(cx);
 392                if let Some(project_id) = project.remote_id() {
 393                    projects.insert(project_id, handle.clone());
 394                    reshared_projects.push(proto::UpdateProject {
 395                        project_id,
 396                        worktrees: project.worktree_metadata_protos(cx),
 397                    });
 398                    return true;
 399                }
 400            }
 401            false
 402        });
 403        self.joined_projects.retain(|project| {
 404            if let Some(handle) = project.upgrade(cx) {
 405                let project = handle.read(cx);
 406                if let Some(project_id) = project.remote_id() {
 407                    projects.insert(project_id, handle.clone());
 408                    rejoined_projects.push(proto::RejoinProject {
 409                        id: project_id,
 410                        worktrees: project
 411                            .worktrees(cx)
 412                            .map(|worktree| {
 413                                let worktree = worktree.read(cx);
 414                                proto::RejoinWorktree {
 415                                    id: worktree.id().to_proto(),
 416                                    scan_id: worktree.completed_scan_id() as u64,
 417                                }
 418                            })
 419                            .collect(),
 420                    });
 421                }
 422                return true;
 423            }
 424            false
 425        });
 426
 427        let response = self.client.request(proto::RejoinRoom {
 428            id: self.id,
 429            reshared_projects,
 430            rejoined_projects,
 431        });
 432
 433        cx.spawn(|this, mut cx| async move {
 434            let response = response.await?;
 435            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 436            this.update(&mut cx, |this, cx| {
 437                this.status = RoomStatus::Online;
 438                this.apply_room_update(room_proto, cx)?;
 439
 440                for reshared_project in response.reshared_projects {
 441                    if let Some(project) = projects.get(&reshared_project.id) {
 442                        project.update(cx, |project, cx| {
 443                            project.reshared(reshared_project, cx).log_err();
 444                        });
 445                    }
 446                }
 447
 448                for rejoined_project in response.rejoined_projects {
 449                    if let Some(project) = projects.get(&rejoined_project.id) {
 450                        project.update(cx, |project, cx| {
 451                            project.rejoined(rejoined_project, cx).log_err();
 452                        });
 453                    }
 454                }
 455
 456                anyhow::Ok(())
 457            })
 458        })
 459    }
 460
 461    pub fn id(&self) -> u64 {
 462        self.id
 463    }
 464
 465    pub fn status(&self) -> RoomStatus {
 466        self.status
 467    }
 468
 469    pub fn local_participant(&self) -> &LocalParticipant {
 470        &self.local_participant
 471    }
 472
 473    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 474        &self.remote_participants
 475    }
 476
 477    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 478        self.remote_participants
 479            .values()
 480            .find(|p| p.peer_id == peer_id)
 481    }
 482
 483    pub fn pending_participants(&self) -> &[Arc<User>] {
 484        &self.pending_participants
 485    }
 486
 487    pub fn contains_participant(&self, user_id: u64) -> bool {
 488        self.participant_user_ids.contains(&user_id)
 489    }
 490
 491    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 492        self.follows_by_leader_id_project_id
 493            .get(&(leader_id, project_id))
 494            .map_or(&[], |v| v.as_slice())
 495    }
 496
 497    async fn handle_room_updated(
 498        this: ModelHandle<Self>,
 499        envelope: TypedEnvelope<proto::RoomUpdated>,
 500        _: Arc<Client>,
 501        mut cx: AsyncAppContext,
 502    ) -> Result<()> {
 503        let room = envelope
 504            .payload
 505            .room
 506            .ok_or_else(|| anyhow!("invalid room"))?;
 507        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 508    }
 509
 510    fn apply_room_update(
 511        &mut self,
 512        mut room: proto::Room,
 513        cx: &mut ModelContext<Self>,
 514    ) -> Result<()> {
 515        // Filter ourselves out from the room's participants.
 516        let local_participant_ix = room
 517            .participants
 518            .iter()
 519            .position(|participant| Some(participant.user_id) == self.client.user_id());
 520        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 521
 522        let pending_participant_user_ids = room
 523            .pending_participants
 524            .iter()
 525            .map(|p| p.user_id)
 526            .collect::<Vec<_>>();
 527
 528        let remote_participant_user_ids = room
 529            .participants
 530            .iter()
 531            .map(|p| p.user_id)
 532            .collect::<Vec<_>>();
 533
 534        let (remote_participants, pending_participants) =
 535            self.user_store.update(cx, move |user_store, cx| {
 536                (
 537                    user_store.get_users(remote_participant_user_ids, cx),
 538                    user_store.get_users(pending_participant_user_ids, cx),
 539                )
 540            });
 541
 542        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 543            let (remote_participants, pending_participants) =
 544                futures::join!(remote_participants, pending_participants);
 545
 546            this.update(&mut cx, |this, cx| {
 547                this.participant_user_ids.clear();
 548
 549                if let Some(participant) = local_participant {
 550                    this.local_participant.projects = participant.projects;
 551                } else {
 552                    this.local_participant.projects.clear();
 553                }
 554
 555                if let Some(participants) = remote_participants.log_err() {
 556                    for (participant, user) in room.participants.into_iter().zip(participants) {
 557                        let Some(peer_id) = participant.peer_id else { continue };
 558                        this.participant_user_ids.insert(participant.user_id);
 559
 560                        let old_projects = this
 561                            .remote_participants
 562                            .get(&participant.user_id)
 563                            .into_iter()
 564                            .flat_map(|existing| &existing.projects)
 565                            .map(|project| project.id)
 566                            .collect::<HashSet<_>>();
 567                        let new_projects = participant
 568                            .projects
 569                            .iter()
 570                            .map(|project| project.id)
 571                            .collect::<HashSet<_>>();
 572
 573                        for project in &participant.projects {
 574                            if !old_projects.contains(&project.id) {
 575                                cx.emit(Event::RemoteProjectShared {
 576                                    owner: user.clone(),
 577                                    project_id: project.id,
 578                                    worktree_root_names: project.worktree_root_names.clone(),
 579                                });
 580                            }
 581                        }
 582
 583                        for unshared_project_id in old_projects.difference(&new_projects) {
 584                            this.joined_projects.retain(|project| {
 585                                if let Some(project) = project.upgrade(cx) {
 586                                    project.update(cx, |project, cx| {
 587                                        if project.remote_id() == Some(*unshared_project_id) {
 588                                            project.disconnected_from_host(cx);
 589                                            false
 590                                        } else {
 591                                            true
 592                                        }
 593                                    })
 594                                } else {
 595                                    false
 596                                }
 597                            });
 598                            cx.emit(Event::RemoteProjectUnshared {
 599                                project_id: *unshared_project_id,
 600                            });
 601                        }
 602
 603                        let location = ParticipantLocation::from_proto(participant.location)
 604                            .unwrap_or(ParticipantLocation::External);
 605                        if let Some(remote_participant) =
 606                            this.remote_participants.get_mut(&participant.user_id)
 607                        {
 608                            remote_participant.projects = participant.projects;
 609                            remote_participant.peer_id = peer_id;
 610                            if location != remote_participant.location {
 611                                remote_participant.location = location;
 612                                cx.emit(Event::ParticipantLocationChanged {
 613                                    participant_id: peer_id,
 614                                });
 615                            }
 616                        } else {
 617                            this.remote_participants.insert(
 618                                participant.user_id,
 619                                RemoteParticipant {
 620                                    user: user.clone(),
 621                                    peer_id,
 622                                    projects: participant.projects,
 623                                    location,
 624                                    tracks: Default::default(),
 625                                },
 626                            );
 627
 628                            if let Some(live_kit) = this.live_kit.as_ref() {
 629                                let tracks =
 630                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 631                                for track in tracks {
 632                                    this.remote_video_track_updated(
 633                                        RemoteVideoTrackUpdate::Subscribed(track),
 634                                        cx,
 635                                    )
 636                                    .log_err();
 637                                }
 638                            }
 639                        }
 640                    }
 641
 642                    this.remote_participants.retain(|user_id, participant| {
 643                        if this.participant_user_ids.contains(user_id) {
 644                            true
 645                        } else {
 646                            for project in &participant.projects {
 647                                cx.emit(Event::RemoteProjectUnshared {
 648                                    project_id: project.id,
 649                                });
 650                            }
 651                            false
 652                        }
 653                    });
 654                }
 655
 656                if let Some(pending_participants) = pending_participants.log_err() {
 657                    this.pending_participants = pending_participants;
 658                    for participant in &this.pending_participants {
 659                        this.participant_user_ids.insert(participant.id);
 660                    }
 661                }
 662
 663                this.follows_by_leader_id_project_id.clear();
 664                for follower in room.followers {
 665                    let project_id = follower.project_id;
 666                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 667                        (Some(leader), Some(follower)) => (leader, follower),
 668
 669                        _ => {
 670                            log::error!("Follower message {follower:?} missing some state");
 671                            continue;
 672                        }
 673                    };
 674
 675                    let list = this
 676                        .follows_by_leader_id_project_id
 677                        .entry((leader, project_id))
 678                        .or_insert(Vec::new());
 679                    if !list.contains(&follower) {
 680                        list.push(follower);
 681                    }
 682                }
 683
 684                this.pending_room_update.take();
 685                if this.should_leave() {
 686                    log::info!("room is empty, leaving");
 687                    let _ = this.leave(cx);
 688                }
 689
 690                this.check_invariants();
 691                cx.notify();
 692            });
 693        }));
 694
 695        cx.notify();
 696        Ok(())
 697    }
 698
 699    fn remote_video_track_updated(
 700        &mut self,
 701        change: RemoteVideoTrackUpdate,
 702        cx: &mut ModelContext<Self>,
 703    ) -> Result<()> {
 704        match change {
 705            RemoteVideoTrackUpdate::Subscribed(track) => {
 706                let user_id = track.publisher_id().parse()?;
 707                let track_id = track.sid().to_string();
 708                let participant = self
 709                    .remote_participants
 710                    .get_mut(&user_id)
 711                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 712                participant.tracks.insert(
 713                    track_id.clone(),
 714                    Arc::new(RemoteVideoTrack {
 715                        live_kit_track: track,
 716                    }),
 717                );
 718                cx.emit(Event::RemoteVideoTracksChanged {
 719                    participant_id: participant.peer_id,
 720                });
 721            }
 722            RemoteVideoTrackUpdate::Unsubscribed {
 723                publisher_id,
 724                track_id,
 725            } => {
 726                let user_id = publisher_id.parse()?;
 727                let participant = self
 728                    .remote_participants
 729                    .get_mut(&user_id)
 730                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 731                participant.tracks.remove(&track_id);
 732                cx.emit(Event::RemoteVideoTracksChanged {
 733                    participant_id: participant.peer_id,
 734                });
 735            }
 736        }
 737
 738        cx.notify();
 739        Ok(())
 740    }
 741
 742    fn check_invariants(&self) {
 743        #[cfg(any(test, feature = "test-support"))]
 744        {
 745            for participant in self.remote_participants.values() {
 746                assert!(self.participant_user_ids.contains(&participant.user.id));
 747                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 748            }
 749
 750            for participant in &self.pending_participants {
 751                assert!(self.participant_user_ids.contains(&participant.id));
 752                assert_ne!(participant.id, self.client.user_id().unwrap());
 753            }
 754
 755            assert_eq!(
 756                self.participant_user_ids.len(),
 757                self.remote_participants.len() + self.pending_participants.len()
 758            );
 759        }
 760    }
 761
 762    pub(crate) fn call(
 763        &mut self,
 764        called_user_id: u64,
 765        initial_project_id: Option<u64>,
 766        cx: &mut ModelContext<Self>,
 767    ) -> Task<Result<()>> {
 768        if self.status.is_offline() {
 769            return Task::ready(Err(anyhow!("room is offline")));
 770        }
 771
 772        cx.notify();
 773        let client = self.client.clone();
 774        let room_id = self.id;
 775        self.pending_call_count += 1;
 776        cx.spawn(|this, mut cx| async move {
 777            let result = client
 778                .request(proto::Call {
 779                    room_id,
 780                    called_user_id,
 781                    initial_project_id,
 782                })
 783                .await;
 784            this.update(&mut cx, |this, cx| {
 785                this.pending_call_count -= 1;
 786                if this.should_leave() {
 787                    this.leave(cx).detach_and_log_err(cx);
 788                }
 789            });
 790            result?;
 791            Ok(())
 792        })
 793    }
 794
 795    pub fn join_project(
 796        &mut self,
 797        id: u64,
 798        language_registry: Arc<LanguageRegistry>,
 799        fs: Arc<dyn Fs>,
 800        cx: &mut ModelContext<Self>,
 801    ) -> Task<Result<ModelHandle<Project>>> {
 802        let client = self.client.clone();
 803        let user_store = self.user_store.clone();
 804        cx.spawn(|this, mut cx| async move {
 805            let project =
 806                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 807            this.update(&mut cx, |this, cx| {
 808                this.joined_projects.retain(|project| {
 809                    if let Some(project) = project.upgrade(cx) {
 810                        !project.read(cx).is_read_only()
 811                    } else {
 812                        false
 813                    }
 814                });
 815                this.joined_projects.insert(project.downgrade());
 816            });
 817            Ok(project)
 818        })
 819    }
 820
 821    pub(crate) fn share_project(
 822        &mut self,
 823        project: ModelHandle<Project>,
 824        cx: &mut ModelContext<Self>,
 825    ) -> Task<Result<u64>> {
 826        if let Some(project_id) = project.read(cx).remote_id() {
 827            return Task::ready(Ok(project_id));
 828        }
 829
 830        let request = self.client.request(proto::ShareProject {
 831            room_id: self.id(),
 832            worktrees: project.read(cx).worktree_metadata_protos(cx),
 833        });
 834        cx.spawn(|this, mut cx| async move {
 835            let response = request.await?;
 836
 837            project.update(&mut cx, |project, cx| {
 838                project.shared(response.project_id, cx)
 839            })?;
 840
 841            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 842            this.update(&mut cx, |this, cx| {
 843                this.shared_projects.insert(project.downgrade());
 844                let active_project = this.local_participant.active_project.as_ref();
 845                if active_project.map_or(false, |location| *location == project) {
 846                    this.set_location(Some(&project), cx)
 847                } else {
 848                    Task::ready(Ok(()))
 849                }
 850            })
 851            .await?;
 852
 853            Ok(response.project_id)
 854        })
 855    }
 856
 857    pub(crate) fn unshare_project(
 858        &mut self,
 859        project: ModelHandle<Project>,
 860        cx: &mut ModelContext<Self>,
 861    ) -> Result<()> {
 862        let project_id = match project.read(cx).remote_id() {
 863            Some(project_id) => project_id,
 864            None => return Ok(()),
 865        };
 866
 867        self.client.send(proto::UnshareProject { project_id })?;
 868        project.update(cx, |this, cx| this.unshare(cx))
 869    }
 870
 871    pub(crate) fn set_location(
 872        &mut self,
 873        project: Option<&ModelHandle<Project>>,
 874        cx: &mut ModelContext<Self>,
 875    ) -> Task<Result<()>> {
 876        if self.status.is_offline() {
 877            return Task::ready(Err(anyhow!("room is offline")));
 878        }
 879
 880        let client = self.client.clone();
 881        let room_id = self.id;
 882        let location = if let Some(project) = project {
 883            self.local_participant.active_project = Some(project.downgrade());
 884            if let Some(project_id) = project.read(cx).remote_id() {
 885                proto::participant_location::Variant::SharedProject(
 886                    proto::participant_location::SharedProject { id: project_id },
 887                )
 888            } else {
 889                proto::participant_location::Variant::UnsharedProject(
 890                    proto::participant_location::UnsharedProject {},
 891                )
 892            }
 893        } else {
 894            self.local_participant.active_project = None;
 895            proto::participant_location::Variant::External(proto::participant_location::External {})
 896        };
 897
 898        cx.notify();
 899        cx.foreground().spawn(async move {
 900            client
 901                .request(proto::UpdateParticipantLocation {
 902                    room_id,
 903                    location: Some(proto::ParticipantLocation {
 904                        variant: Some(location),
 905                    }),
 906                })
 907                .await?;
 908            Ok(())
 909        })
 910    }
 911
 912    pub fn is_screen_sharing(&self) -> bool {
 913        self.live_kit.as_ref().map_or(false, |live_kit| {
 914            !matches!(live_kit.screen_track, ScreenTrack::None)
 915        })
 916    }
 917
 918    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 919        if self.status.is_offline() {
 920            return Task::ready(Err(anyhow!("room is offline")));
 921        } else if self.is_screen_sharing() {
 922            return Task::ready(Err(anyhow!("screen was already shared")));
 923        }
 924
 925        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 926            let publish_id = post_inc(&mut live_kit.next_publish_id);
 927            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 928            cx.notify();
 929            (live_kit.room.display_sources(), publish_id)
 930        } else {
 931            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 932        };
 933
 934        cx.spawn_weak(|this, mut cx| async move {
 935            let publish_track = async {
 936                let displays = displays.await?;
 937                let display = displays
 938                    .first()
 939                    .ok_or_else(|| anyhow!("no display found"))?;
 940                let track = LocalVideoTrack::screen_share_for_display(&display);
 941                this.upgrade(&cx)
 942                    .ok_or_else(|| anyhow!("room was dropped"))?
 943                    .read_with(&cx, |this, _| {
 944                        this.live_kit
 945                            .as_ref()
 946                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 947                    })
 948                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 949                    .await
 950            };
 951
 952            let publication = publish_track.await;
 953            this.upgrade(&cx)
 954                .ok_or_else(|| anyhow!("room was dropped"))?
 955                .update(&mut cx, |this, cx| {
 956                    let live_kit = this
 957                        .live_kit
 958                        .as_mut()
 959                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 960
 961                    let canceled = if let ScreenTrack::Pending {
 962                        publish_id: cur_publish_id,
 963                    } = &live_kit.screen_track
 964                    {
 965                        *cur_publish_id != publish_id
 966                    } else {
 967                        true
 968                    };
 969
 970                    match publication {
 971                        Ok(publication) => {
 972                            if canceled {
 973                                live_kit.room.unpublish_track(publication);
 974                            } else {
 975                                live_kit.screen_track = ScreenTrack::Published(publication);
 976                                cx.notify();
 977                            }
 978                            Ok(())
 979                        }
 980                        Err(error) => {
 981                            if canceled {
 982                                Ok(())
 983                            } else {
 984                                live_kit.screen_track = ScreenTrack::None;
 985                                cx.notify();
 986                                Err(error)
 987                            }
 988                        }
 989                    }
 990                })
 991        })
 992    }
 993
 994    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 995        if self.status.is_offline() {
 996            return Err(anyhow!("room is offline"));
 997        }
 998
 999        let live_kit = self
1000            .live_kit
1001            .as_mut()
1002            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1003        match mem::take(&mut live_kit.screen_track) {
1004            ScreenTrack::None => Err(anyhow!("screen was not shared")),
1005            ScreenTrack::Pending { .. } => {
1006                cx.notify();
1007                Ok(())
1008            }
1009            ScreenTrack::Published(track) => {
1010                live_kit.room.unpublish_track(track);
1011                cx.notify();
1012                Ok(())
1013            }
1014        }
1015    }
1016
1017    #[cfg(any(test, feature = "test-support"))]
1018    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1019        self.live_kit
1020            .as_ref()
1021            .unwrap()
1022            .room
1023            .set_display_sources(sources);
1024    }
1025}
1026
1027struct LiveKitRoom {
1028    room: Arc<live_kit_client::Room>,
1029    screen_track: ScreenTrack,
1030    next_publish_id: usize,
1031    _maintain_room: Task<()>,
1032    _maintain_tracks: Task<()>,
1033}
1034
1035enum ScreenTrack {
1036    None,
1037    Pending { publish_id: usize },
1038    Published(LocalTrackPublication),
1039}
1040
1041impl Default for ScreenTrack {
1042    fn default() -> Self {
1043        Self::None
1044    }
1045}
1046
1047#[derive(Copy, Clone, PartialEq, Eq)]
1048pub enum RoomStatus {
1049    Online,
1050    Rejoining,
1051    Offline,
1052}
1053
1054impl RoomStatus {
1055    pub fn is_offline(&self) -> bool {
1056        matches!(self, RoomStatus::Offline)
1057    }
1058
1059    pub fn is_online(&self) -> bool {
1060        matches!(self, RoomStatus::Online)
1061    }
1062}