room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  14use language::LanguageRegistry;
  15use live_kit_client::{LocalTrackPublication, LocalVideoTrack, RemoteVideoTrackUpdate};
  16use postage::stream::Stream;
  17use project::Project;
  18use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  19use util::{post_inc, ResultExt, TryFutureExt};
  20
  21pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  22
  23#[derive(Clone, Debug, PartialEq, Eq)]
  24pub enum Event {
  25    ParticipantLocationChanged {
  26        participant_id: proto::PeerId,
  27    },
  28    RemoteVideoTracksChanged {
  29        participant_id: proto::PeerId,
  30    },
  31    RemoteProjectShared {
  32        owner: Arc<User>,
  33        project_id: u64,
  34        worktree_root_names: Vec<String>,
  35    },
  36    RemoteProjectUnshared {
  37        project_id: u64,
  38    },
  39    Left,
  40}
  41
  42pub struct Room {
  43    id: u64,
  44    live_kit: Option<LiveKitRoom>,
  45    status: RoomStatus,
  46    shared_projects: HashSet<WeakModelHandle<Project>>,
  47    joined_projects: HashSet<WeakModelHandle<Project>>,
  48    local_participant: LocalParticipant,
  49    remote_participants: BTreeMap<u64, RemoteParticipant>,
  50    pending_participants: Vec<Arc<User>>,
  51    participant_user_ids: HashSet<u64>,
  52    pending_call_count: usize,
  53    leave_when_empty: bool,
  54    client: Arc<Client>,
  55    user_store: ModelHandle<UserStore>,
  56    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  57    subscriptions: Vec<client::Subscription>,
  58    pending_room_update: Option<Task<()>>,
  59    maintain_connection: Option<Task<Option<()>>>,
  60}
  61
  62impl Entity for Room {
  63    type Event = Event;
  64
  65    fn release(&mut self, cx: &mut AppContext) {
  66        if self.status.is_online() {
  67            self.leave_internal(cx).detach_and_log_err(cx);
  68        }
  69    }
  70
  71    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  72        if self.status.is_online() {
  73            let leave = self.leave_internal(cx);
  74            Some(
  75                cx.background()
  76                    .spawn(async move {
  77                        leave.await.log_err();
  78                    })
  79                    .boxed(),
  80            )
  81        } else {
  82            None
  83        }
  84    }
  85}
  86
  87impl Room {
  88    fn new(
  89        id: u64,
  90        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  91        client: Arc<Client>,
  92        user_store: ModelHandle<UserStore>,
  93        cx: &mut ModelContext<Self>,
  94    ) -> Self {
  95        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
  96            let room = live_kit_client::Room::new();
  97            let mut status = room.status();
  98            // Consume the initial status of the room.
  99            let _ = status.try_recv();
 100            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 101                while let Some(status) = status.next().await {
 102                    let this = if let Some(this) = this.upgrade(&cx) {
 103                        this
 104                    } else {
 105                        break;
 106                    };
 107
 108                    if status == live_kit_client::ConnectionState::Disconnected {
 109                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 110                        break;
 111                    }
 112                }
 113            });
 114
 115            let mut track_changes = room.remote_video_track_updates();
 116            let _maintain_tracks = cx.spawn_weak(|this, mut cx| async move {
 117                while let Some(track_change) = track_changes.next().await {
 118                    let this = if let Some(this) = this.upgrade(&cx) {
 119                        this
 120                    } else {
 121                        break;
 122                    };
 123
 124                    this.update(&mut cx, |this, cx| {
 125                        this.remote_video_track_updated(track_change, cx).log_err()
 126                    });
 127                }
 128            });
 129
 130            cx.foreground()
 131                .spawn(room.connect(&connection_info.server_url, &connection_info.token))
 132                .detach_and_log_err(cx);
 133
 134            Some(LiveKitRoom {
 135                room,
 136                screen_track: ScreenTrack::None,
 137                next_publish_id: 0,
 138                _maintain_room,
 139                _maintain_tracks,
 140            })
 141        } else {
 142            None
 143        };
 144
 145        let maintain_connection =
 146            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 147
 148        Self {
 149            id,
 150            live_kit: live_kit_room,
 151            status: RoomStatus::Online,
 152            shared_projects: Default::default(),
 153            joined_projects: Default::default(),
 154            participant_user_ids: Default::default(),
 155            local_participant: Default::default(),
 156            remote_participants: Default::default(),
 157            pending_participants: Default::default(),
 158            pending_call_count: 0,
 159            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 160            leave_when_empty: false,
 161            pending_room_update: None,
 162            client,
 163            user_store,
 164            follows_by_leader_id_project_id: Default::default(),
 165            maintain_connection: Some(maintain_connection),
 166        }
 167    }
 168
 169    pub(crate) fn create(
 170        called_user_id: u64,
 171        initial_project: Option<ModelHandle<Project>>,
 172        client: Arc<Client>,
 173        user_store: ModelHandle<UserStore>,
 174        cx: &mut AppContext,
 175    ) -> Task<Result<ModelHandle<Self>>> {
 176        cx.spawn(|mut cx| async move {
 177            let response = client.request(proto::CreateRoom {}).await?;
 178            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 179            let room = cx.add_model(|cx| {
 180                Self::new(
 181                    room_proto.id,
 182                    response.live_kit_connection_info,
 183                    client,
 184                    user_store,
 185                    cx,
 186                )
 187            });
 188
 189            let initial_project_id = if let Some(initial_project) = initial_project {
 190                let initial_project_id = room
 191                    .update(&mut cx, |room, cx| {
 192                        room.share_project(initial_project.clone(), cx)
 193                    })
 194                    .await?;
 195                Some(initial_project_id)
 196            } else {
 197                None
 198            };
 199
 200            match room
 201                .update(&mut cx, |room, cx| {
 202                    room.leave_when_empty = true;
 203                    room.call(called_user_id, initial_project_id, cx)
 204                })
 205                .await
 206            {
 207                Ok(()) => Ok(room),
 208                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 209            }
 210        })
 211    }
 212
 213    pub(crate) fn join(
 214        call: &IncomingCall,
 215        client: Arc<Client>,
 216        user_store: ModelHandle<UserStore>,
 217        cx: &mut AppContext,
 218    ) -> Task<Result<ModelHandle<Self>>> {
 219        let room_id = call.room_id;
 220        cx.spawn(|mut cx| async move {
 221            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 222            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 223            let room = cx.add_model(|cx| {
 224                Self::new(
 225                    room_id,
 226                    response.live_kit_connection_info,
 227                    client,
 228                    user_store,
 229                    cx,
 230                )
 231            });
 232            room.update(&mut cx, |room, cx| {
 233                room.leave_when_empty = true;
 234                room.apply_room_update(room_proto, cx)?;
 235                anyhow::Ok(())
 236            })?;
 237            Ok(room)
 238        })
 239    }
 240
 241    fn should_leave(&self) -> bool {
 242        self.leave_when_empty
 243            && self.pending_room_update.is_none()
 244            && self.pending_participants.is_empty()
 245            && self.remote_participants.is_empty()
 246            && self.pending_call_count == 0
 247    }
 248
 249    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 250        cx.notify();
 251        cx.emit(Event::Left);
 252        self.leave_internal(cx)
 253    }
 254
 255    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 256        if self.status.is_offline() {
 257            return Task::ready(Err(anyhow!("room is offline")));
 258        }
 259
 260        log::info!("leaving room");
 261
 262        for project in self.shared_projects.drain() {
 263            if let Some(project) = project.upgrade(cx) {
 264                project.update(cx, |project, cx| {
 265                    project.unshare(cx).log_err();
 266                });
 267            }
 268        }
 269        for project in self.joined_projects.drain() {
 270            if let Some(project) = project.upgrade(cx) {
 271                project.update(cx, |project, cx| {
 272                    project.disconnected_from_host(cx);
 273                    project.close(cx);
 274                });
 275            }
 276        }
 277
 278        self.status = RoomStatus::Offline;
 279        self.remote_participants.clear();
 280        self.pending_participants.clear();
 281        self.participant_user_ids.clear();
 282        self.subscriptions.clear();
 283        self.live_kit.take();
 284        self.pending_room_update.take();
 285        self.maintain_connection.take();
 286
 287        let leave_room = self.client.request(proto::LeaveRoom {});
 288        cx.background().spawn(async move {
 289            leave_room.await?;
 290            anyhow::Ok(())
 291        })
 292    }
 293
 294    async fn maintain_connection(
 295        this: WeakModelHandle<Self>,
 296        client: Arc<Client>,
 297        mut cx: AsyncAppContext,
 298    ) -> Result<()> {
 299        let mut client_status = client.status();
 300        loop {
 301            let _ = client_status.try_recv();
 302            let is_connected = client_status.borrow().is_connected();
 303            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 304            if !is_connected || client_status.next().await.is_some() {
 305                log::info!("detected client disconnection");
 306
 307                this.upgrade(&cx)
 308                    .ok_or_else(|| anyhow!("room was dropped"))?
 309                    .update(&mut cx, |this, cx| {
 310                        this.status = RoomStatus::Rejoining;
 311                        cx.notify();
 312                    });
 313
 314                // Wait for client to re-establish a connection to the server.
 315                {
 316                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 317                    let client_reconnection = async {
 318                        let mut remaining_attempts = 3;
 319                        while remaining_attempts > 0 {
 320                            if client_status.borrow().is_connected() {
 321                                log::info!("client reconnected, attempting to rejoin room");
 322
 323                                let Some(this) = this.upgrade(&cx) else { break };
 324                                if this
 325                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 326                                    .await
 327                                    .log_err()
 328                                    .is_some()
 329                                {
 330                                    return true;
 331                                } else {
 332                                    remaining_attempts -= 1;
 333                                }
 334                            } else if client_status.borrow().is_signed_out() {
 335                                return false;
 336                            }
 337
 338                            log::info!(
 339                                "waiting for client status change, remaining attempts {}",
 340                                remaining_attempts
 341                            );
 342                            client_status.next().await;
 343                        }
 344                        false
 345                    }
 346                    .fuse();
 347                    futures::pin_mut!(client_reconnection);
 348
 349                    futures::select_biased! {
 350                        reconnected = client_reconnection => {
 351                            if reconnected {
 352                                log::info!("successfully reconnected to room");
 353                                // If we successfully joined the room, go back around the loop
 354                                // waiting for future connection status changes.
 355                                continue;
 356                            }
 357                        }
 358                        _ = reconnection_timeout => {
 359                            log::info!("room reconnection timeout expired");
 360                        }
 361                    }
 362                }
 363
 364                break;
 365            }
 366        }
 367
 368        // The client failed to re-establish a connection to the server
 369        // or an error occurred while trying to re-join the room. Either way
 370        // we leave the room and return an error.
 371        if let Some(this) = this.upgrade(&cx) {
 372            log::info!("reconnection failed, leaving room");
 373            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 374        }
 375        Err(anyhow!(
 376            "can't reconnect to room: client failed to re-establish connection"
 377        ))
 378    }
 379
 380    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 381        let mut projects = HashMap::default();
 382        let mut reshared_projects = Vec::new();
 383        let mut rejoined_projects = Vec::new();
 384        self.shared_projects.retain(|project| {
 385            if let Some(handle) = project.upgrade(cx) {
 386                let project = handle.read(cx);
 387                if let Some(project_id) = project.remote_id() {
 388                    projects.insert(project_id, handle.clone());
 389                    reshared_projects.push(proto::UpdateProject {
 390                        project_id,
 391                        worktrees: project.worktree_metadata_protos(cx),
 392                    });
 393                    return true;
 394                }
 395            }
 396            false
 397        });
 398        self.joined_projects.retain(|project| {
 399            if let Some(handle) = project.upgrade(cx) {
 400                let project = handle.read(cx);
 401                if let Some(project_id) = project.remote_id() {
 402                    projects.insert(project_id, handle.clone());
 403                    rejoined_projects.push(proto::RejoinProject {
 404                        id: project_id,
 405                        worktrees: project
 406                            .worktrees(cx)
 407                            .map(|worktree| {
 408                                let worktree = worktree.read(cx);
 409                                proto::RejoinWorktree {
 410                                    id: worktree.id().to_proto(),
 411                                    scan_id: worktree.completed_scan_id() as u64,
 412                                }
 413                            })
 414                            .collect(),
 415                    });
 416                }
 417                return true;
 418            }
 419            false
 420        });
 421
 422        let response = self.client.request_envelope(proto::RejoinRoom {
 423            id: self.id,
 424            reshared_projects,
 425            rejoined_projects,
 426        });
 427
 428        cx.spawn(|this, mut cx| async move {
 429            let response = response.await?;
 430            let message_id = response.message_id;
 431            let response = response.payload;
 432            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 433            this.update(&mut cx, |this, cx| {
 434                this.status = RoomStatus::Online;
 435                this.apply_room_update(room_proto, cx)?;
 436
 437                for reshared_project in response.reshared_projects {
 438                    if let Some(project) = projects.get(&reshared_project.id) {
 439                        project.update(cx, |project, cx| {
 440                            project.reshared(reshared_project, cx).log_err();
 441                        });
 442                    }
 443                }
 444
 445                for rejoined_project in response.rejoined_projects {
 446                    if let Some(project) = projects.get(&rejoined_project.id) {
 447                        project.update(cx, |project, cx| {
 448                            project.rejoined(rejoined_project, message_id, cx).log_err();
 449                        });
 450                    }
 451                }
 452
 453                anyhow::Ok(())
 454            })
 455        })
 456    }
 457
 458    pub fn id(&self) -> u64 {
 459        self.id
 460    }
 461
 462    pub fn status(&self) -> RoomStatus {
 463        self.status
 464    }
 465
 466    pub fn local_participant(&self) -> &LocalParticipant {
 467        &self.local_participant
 468    }
 469
 470    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 471        &self.remote_participants
 472    }
 473
 474    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 475        self.remote_participants
 476            .values()
 477            .find(|p| p.peer_id == peer_id)
 478    }
 479
 480    pub fn pending_participants(&self) -> &[Arc<User>] {
 481        &self.pending_participants
 482    }
 483
 484    pub fn contains_participant(&self, user_id: u64) -> bool {
 485        self.participant_user_ids.contains(&user_id)
 486    }
 487
 488    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 489        self.follows_by_leader_id_project_id
 490            .get(&(leader_id, project_id))
 491            .map_or(&[], |v| v.as_slice())
 492    }
 493
 494    async fn handle_room_updated(
 495        this: ModelHandle<Self>,
 496        envelope: TypedEnvelope<proto::RoomUpdated>,
 497        _: Arc<Client>,
 498        mut cx: AsyncAppContext,
 499    ) -> Result<()> {
 500        let room = envelope
 501            .payload
 502            .room
 503            .ok_or_else(|| anyhow!("invalid room"))?;
 504        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 505    }
 506
 507    fn apply_room_update(
 508        &mut self,
 509        mut room: proto::Room,
 510        cx: &mut ModelContext<Self>,
 511    ) -> Result<()> {
 512        // Filter ourselves out from the room's participants.
 513        let local_participant_ix = room
 514            .participants
 515            .iter()
 516            .position(|participant| Some(participant.user_id) == self.client.user_id());
 517        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 518
 519        let pending_participant_user_ids = room
 520            .pending_participants
 521            .iter()
 522            .map(|p| p.user_id)
 523            .collect::<Vec<_>>();
 524
 525        let remote_participant_user_ids = room
 526            .participants
 527            .iter()
 528            .map(|p| p.user_id)
 529            .collect::<Vec<_>>();
 530
 531        let (remote_participants, pending_participants) =
 532            self.user_store.update(cx, move |user_store, cx| {
 533                (
 534                    user_store.get_users(remote_participant_user_ids, cx),
 535                    user_store.get_users(pending_participant_user_ids, cx),
 536                )
 537            });
 538
 539        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 540            let (remote_participants, pending_participants) =
 541                futures::join!(remote_participants, pending_participants);
 542
 543            this.update(&mut cx, |this, cx| {
 544                this.participant_user_ids.clear();
 545
 546                if let Some(participant) = local_participant {
 547                    this.local_participant.projects = participant.projects;
 548                } else {
 549                    this.local_participant.projects.clear();
 550                }
 551
 552                if let Some(participants) = remote_participants.log_err() {
 553                    for (participant, user) in room.participants.into_iter().zip(participants) {
 554                        let Some(peer_id) = participant.peer_id else { continue };
 555                        this.participant_user_ids.insert(participant.user_id);
 556
 557                        let old_projects = this
 558                            .remote_participants
 559                            .get(&participant.user_id)
 560                            .into_iter()
 561                            .flat_map(|existing| &existing.projects)
 562                            .map(|project| project.id)
 563                            .collect::<HashSet<_>>();
 564                        let new_projects = participant
 565                            .projects
 566                            .iter()
 567                            .map(|project| project.id)
 568                            .collect::<HashSet<_>>();
 569
 570                        for project in &participant.projects {
 571                            if !old_projects.contains(&project.id) {
 572                                cx.emit(Event::RemoteProjectShared {
 573                                    owner: user.clone(),
 574                                    project_id: project.id,
 575                                    worktree_root_names: project.worktree_root_names.clone(),
 576                                });
 577                            }
 578                        }
 579
 580                        for unshared_project_id in old_projects.difference(&new_projects) {
 581                            this.joined_projects.retain(|project| {
 582                                if let Some(project) = project.upgrade(cx) {
 583                                    project.update(cx, |project, cx| {
 584                                        if project.remote_id() == Some(*unshared_project_id) {
 585                                            project.disconnected_from_host(cx);
 586                                            false
 587                                        } else {
 588                                            true
 589                                        }
 590                                    })
 591                                } else {
 592                                    false
 593                                }
 594                            });
 595                            cx.emit(Event::RemoteProjectUnshared {
 596                                project_id: *unshared_project_id,
 597                            });
 598                        }
 599
 600                        let location = ParticipantLocation::from_proto(participant.location)
 601                            .unwrap_or(ParticipantLocation::External);
 602                        if let Some(remote_participant) =
 603                            this.remote_participants.get_mut(&participant.user_id)
 604                        {
 605                            remote_participant.projects = participant.projects;
 606                            remote_participant.peer_id = peer_id;
 607                            if location != remote_participant.location {
 608                                remote_participant.location = location;
 609                                cx.emit(Event::ParticipantLocationChanged {
 610                                    participant_id: peer_id,
 611                                });
 612                            }
 613                        } else {
 614                            this.remote_participants.insert(
 615                                participant.user_id,
 616                                RemoteParticipant {
 617                                    user: user.clone(),
 618                                    peer_id,
 619                                    projects: participant.projects,
 620                                    location,
 621                                    tracks: Default::default(),
 622                                },
 623                            );
 624
 625                            if let Some(live_kit) = this.live_kit.as_ref() {
 626                                let tracks =
 627                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 628                                for track in tracks {
 629                                    this.remote_video_track_updated(
 630                                        RemoteVideoTrackUpdate::Subscribed(track),
 631                                        cx,
 632                                    )
 633                                    .log_err();
 634                                }
 635                            }
 636                        }
 637                    }
 638
 639                    this.remote_participants.retain(|user_id, participant| {
 640                        if this.participant_user_ids.contains(user_id) {
 641                            true
 642                        } else {
 643                            for project in &participant.projects {
 644                                cx.emit(Event::RemoteProjectUnshared {
 645                                    project_id: project.id,
 646                                });
 647                            }
 648                            false
 649                        }
 650                    });
 651                }
 652
 653                if let Some(pending_participants) = pending_participants.log_err() {
 654                    this.pending_participants = pending_participants;
 655                    for participant in &this.pending_participants {
 656                        this.participant_user_ids.insert(participant.id);
 657                    }
 658                }
 659
 660                this.follows_by_leader_id_project_id.clear();
 661                for follower in room.followers {
 662                    let project_id = follower.project_id;
 663                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 664                        (Some(leader), Some(follower)) => (leader, follower),
 665
 666                        _ => {
 667                            log::error!("Follower message {follower:?} missing some state");
 668                            continue;
 669                        }
 670                    };
 671
 672                    let list = this
 673                        .follows_by_leader_id_project_id
 674                        .entry((leader, project_id))
 675                        .or_insert(Vec::new());
 676                    if !list.contains(&follower) {
 677                        list.push(follower);
 678                    }
 679                }
 680
 681                this.pending_room_update.take();
 682                if this.should_leave() {
 683                    log::info!("room is empty, leaving");
 684                    let _ = this.leave(cx);
 685                }
 686
 687                this.check_invariants();
 688                cx.notify();
 689            });
 690        }));
 691
 692        cx.notify();
 693        Ok(())
 694    }
 695
 696    fn remote_video_track_updated(
 697        &mut self,
 698        change: RemoteVideoTrackUpdate,
 699        cx: &mut ModelContext<Self>,
 700    ) -> Result<()> {
 701        match change {
 702            RemoteVideoTrackUpdate::Subscribed(track) => {
 703                let user_id = track.publisher_id().parse()?;
 704                let track_id = track.sid().to_string();
 705                let participant = self
 706                    .remote_participants
 707                    .get_mut(&user_id)
 708                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 709                participant.tracks.insert(
 710                    track_id.clone(),
 711                    Arc::new(RemoteVideoTrack {
 712                        live_kit_track: track,
 713                    }),
 714                );
 715                cx.emit(Event::RemoteVideoTracksChanged {
 716                    participant_id: participant.peer_id,
 717                });
 718            }
 719            RemoteVideoTrackUpdate::Unsubscribed {
 720                publisher_id,
 721                track_id,
 722            } => {
 723                let user_id = publisher_id.parse()?;
 724                let participant = self
 725                    .remote_participants
 726                    .get_mut(&user_id)
 727                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 728                participant.tracks.remove(&track_id);
 729                cx.emit(Event::RemoteVideoTracksChanged {
 730                    participant_id: participant.peer_id,
 731                });
 732            }
 733        }
 734
 735        cx.notify();
 736        Ok(())
 737    }
 738
 739    fn check_invariants(&self) {
 740        #[cfg(any(test, feature = "test-support"))]
 741        {
 742            for participant in self.remote_participants.values() {
 743                assert!(self.participant_user_ids.contains(&participant.user.id));
 744                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 745            }
 746
 747            for participant in &self.pending_participants {
 748                assert!(self.participant_user_ids.contains(&participant.id));
 749                assert_ne!(participant.id, self.client.user_id().unwrap());
 750            }
 751
 752            assert_eq!(
 753                self.participant_user_ids.len(),
 754                self.remote_participants.len() + self.pending_participants.len()
 755            );
 756        }
 757    }
 758
 759    pub(crate) fn call(
 760        &mut self,
 761        called_user_id: u64,
 762        initial_project_id: Option<u64>,
 763        cx: &mut ModelContext<Self>,
 764    ) -> Task<Result<()>> {
 765        if self.status.is_offline() {
 766            return Task::ready(Err(anyhow!("room is offline")));
 767        }
 768
 769        cx.notify();
 770        let client = self.client.clone();
 771        let room_id = self.id;
 772        self.pending_call_count += 1;
 773        cx.spawn(|this, mut cx| async move {
 774            let result = client
 775                .request(proto::Call {
 776                    room_id,
 777                    called_user_id,
 778                    initial_project_id,
 779                })
 780                .await;
 781            this.update(&mut cx, |this, cx| {
 782                this.pending_call_count -= 1;
 783                if this.should_leave() {
 784                    this.leave(cx).detach_and_log_err(cx);
 785                }
 786            });
 787            result?;
 788            Ok(())
 789        })
 790    }
 791
 792    pub fn join_project(
 793        &mut self,
 794        id: u64,
 795        language_registry: Arc<LanguageRegistry>,
 796        fs: Arc<dyn Fs>,
 797        cx: &mut ModelContext<Self>,
 798    ) -> Task<Result<ModelHandle<Project>>> {
 799        let client = self.client.clone();
 800        let user_store = self.user_store.clone();
 801        cx.spawn(|this, mut cx| async move {
 802            let project =
 803                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 804            this.update(&mut cx, |this, cx| {
 805                this.joined_projects.retain(|project| {
 806                    if let Some(project) = project.upgrade(cx) {
 807                        !project.read(cx).is_read_only()
 808                    } else {
 809                        false
 810                    }
 811                });
 812                this.joined_projects.insert(project.downgrade());
 813            });
 814            Ok(project)
 815        })
 816    }
 817
 818    pub(crate) fn share_project(
 819        &mut self,
 820        project: ModelHandle<Project>,
 821        cx: &mut ModelContext<Self>,
 822    ) -> Task<Result<u64>> {
 823        if let Some(project_id) = project.read(cx).remote_id() {
 824            return Task::ready(Ok(project_id));
 825        }
 826
 827        let request = self.client.request(proto::ShareProject {
 828            room_id: self.id(),
 829            worktrees: project.read(cx).worktree_metadata_protos(cx),
 830        });
 831        cx.spawn(|this, mut cx| async move {
 832            let response = request.await?;
 833
 834            project.update(&mut cx, |project, cx| {
 835                project.shared(response.project_id, cx)
 836            })?;
 837
 838            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 839            this.update(&mut cx, |this, cx| {
 840                this.shared_projects.insert(project.downgrade());
 841                let active_project = this.local_participant.active_project.as_ref();
 842                if active_project.map_or(false, |location| *location == project) {
 843                    this.set_location(Some(&project), cx)
 844                } else {
 845                    Task::ready(Ok(()))
 846                }
 847            })
 848            .await?;
 849
 850            Ok(response.project_id)
 851        })
 852    }
 853
 854    pub(crate) fn unshare_project(
 855        &mut self,
 856        project: ModelHandle<Project>,
 857        cx: &mut ModelContext<Self>,
 858    ) -> Result<()> {
 859        let project_id = match project.read(cx).remote_id() {
 860            Some(project_id) => project_id,
 861            None => return Ok(()),
 862        };
 863
 864        self.client.send(proto::UnshareProject { project_id })?;
 865        project.update(cx, |this, cx| this.unshare(cx))
 866    }
 867
 868    pub(crate) fn set_location(
 869        &mut self,
 870        project: Option<&ModelHandle<Project>>,
 871        cx: &mut ModelContext<Self>,
 872    ) -> Task<Result<()>> {
 873        if self.status.is_offline() {
 874            return Task::ready(Err(anyhow!("room is offline")));
 875        }
 876
 877        let client = self.client.clone();
 878        let room_id = self.id;
 879        let location = if let Some(project) = project {
 880            self.local_participant.active_project = Some(project.downgrade());
 881            if let Some(project_id) = project.read(cx).remote_id() {
 882                proto::participant_location::Variant::SharedProject(
 883                    proto::participant_location::SharedProject { id: project_id },
 884                )
 885            } else {
 886                proto::participant_location::Variant::UnsharedProject(
 887                    proto::participant_location::UnsharedProject {},
 888                )
 889            }
 890        } else {
 891            self.local_participant.active_project = None;
 892            proto::participant_location::Variant::External(proto::participant_location::External {})
 893        };
 894
 895        cx.notify();
 896        cx.foreground().spawn(async move {
 897            client
 898                .request(proto::UpdateParticipantLocation {
 899                    room_id,
 900                    location: Some(proto::ParticipantLocation {
 901                        variant: Some(location),
 902                    }),
 903                })
 904                .await?;
 905            Ok(())
 906        })
 907    }
 908
 909    pub fn is_screen_sharing(&self) -> bool {
 910        self.live_kit.as_ref().map_or(false, |live_kit| {
 911            !matches!(live_kit.screen_track, ScreenTrack::None)
 912        })
 913    }
 914
 915    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 916        if self.status.is_offline() {
 917            return Task::ready(Err(anyhow!("room is offline")));
 918        } else if self.is_screen_sharing() {
 919            return Task::ready(Err(anyhow!("screen was already shared")));
 920        }
 921
 922        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
 923            let publish_id = post_inc(&mut live_kit.next_publish_id);
 924            live_kit.screen_track = ScreenTrack::Pending { publish_id };
 925            cx.notify();
 926            (live_kit.room.display_sources(), publish_id)
 927        } else {
 928            return Task::ready(Err(anyhow!("live-kit was not initialized")));
 929        };
 930
 931        cx.spawn_weak(|this, mut cx| async move {
 932            let publish_track = async {
 933                let displays = displays.await?;
 934                let display = displays
 935                    .first()
 936                    .ok_or_else(|| anyhow!("no display found"))?;
 937                let track = LocalVideoTrack::screen_share_for_display(&display);
 938                this.upgrade(&cx)
 939                    .ok_or_else(|| anyhow!("room was dropped"))?
 940                    .read_with(&cx, |this, _| {
 941                        this.live_kit
 942                            .as_ref()
 943                            .map(|live_kit| live_kit.room.publish_video_track(&track))
 944                    })
 945                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
 946                    .await
 947            };
 948
 949            let publication = publish_track.await;
 950            this.upgrade(&cx)
 951                .ok_or_else(|| anyhow!("room was dropped"))?
 952                .update(&mut cx, |this, cx| {
 953                    let live_kit = this
 954                        .live_kit
 955                        .as_mut()
 956                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
 957
 958                    let canceled = if let ScreenTrack::Pending {
 959                        publish_id: cur_publish_id,
 960                    } = &live_kit.screen_track
 961                    {
 962                        *cur_publish_id != publish_id
 963                    } else {
 964                        true
 965                    };
 966
 967                    match publication {
 968                        Ok(publication) => {
 969                            if canceled {
 970                                live_kit.room.unpublish_track(publication);
 971                            } else {
 972                                live_kit.screen_track = ScreenTrack::Published(publication);
 973                                cx.notify();
 974                            }
 975                            Ok(())
 976                        }
 977                        Err(error) => {
 978                            if canceled {
 979                                Ok(())
 980                            } else {
 981                                live_kit.screen_track = ScreenTrack::None;
 982                                cx.notify();
 983                                Err(error)
 984                            }
 985                        }
 986                    }
 987                })
 988        })
 989    }
 990
 991    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 992        if self.status.is_offline() {
 993            return Err(anyhow!("room is offline"));
 994        }
 995
 996        let live_kit = self
 997            .live_kit
 998            .as_mut()
 999            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1000        match mem::take(&mut live_kit.screen_track) {
1001            ScreenTrack::None => Err(anyhow!("screen was not shared")),
1002            ScreenTrack::Pending { .. } => {
1003                cx.notify();
1004                Ok(())
1005            }
1006            ScreenTrack::Published(track) => {
1007                live_kit.room.unpublish_track(track);
1008                cx.notify();
1009                Ok(())
1010            }
1011        }
1012    }
1013
1014    #[cfg(any(test, feature = "test-support"))]
1015    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1016        self.live_kit
1017            .as_ref()
1018            .unwrap()
1019            .room
1020            .set_display_sources(sources);
1021    }
1022}
1023
1024struct LiveKitRoom {
1025    room: Arc<live_kit_client::Room>,
1026    screen_track: ScreenTrack,
1027    next_publish_id: usize,
1028    _maintain_room: Task<()>,
1029    _maintain_tracks: Task<()>,
1030}
1031
1032enum ScreenTrack {
1033    None,
1034    Pending { publish_id: usize },
1035    Published(LocalTrackPublication),
1036}
1037
1038impl Default for ScreenTrack {
1039    fn default() -> Self {
1040        Self::None
1041    }
1042}
1043
1044#[derive(Copy, Clone, PartialEq, Eq)]
1045pub enum RoomStatus {
1046    Online,
1047    Rejoining,
1048    Offline,
1049}
1050
1051impl RoomStatus {
1052    pub fn is_offline(&self) -> bool {
1053        matches!(self, RoomStatus::Offline)
1054    }
1055
1056    pub fn is_online(&self) -> bool {
1057        matches!(self, RoomStatus::Online)
1058    }
1059}