room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  14use language::LanguageRegistry;
  15use live_kit_client::{
  16    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  17    RemoteVideoTrackUpdate,
  18};
  19use postage::stream::Stream;
  20use project::Project;
  21use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  22use util::{post_inc, ResultExt, TryFutureExt};
  23
  24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  25
  26#[derive(Clone, Debug, PartialEq, Eq)]
  27pub enum Event {
  28    ParticipantLocationChanged {
  29        participant_id: proto::PeerId,
  30    },
  31    RemoteVideoTracksChanged {
  32        participant_id: proto::PeerId,
  33    },
  34    RemoteAudioTracksChanged {
  35        participant_id: proto::PeerId,
  36    },
  37    RemoteProjectShared {
  38        owner: Arc<User>,
  39        project_id: u64,
  40        worktree_root_names: Vec<String>,
  41    },
  42    RemoteProjectUnshared {
  43        project_id: u64,
  44    },
  45    Left,
  46}
  47
  48pub struct Room {
  49    id: u64,
  50    live_kit: Option<LiveKitRoom>,
  51    status: RoomStatus,
  52    shared_projects: HashSet<WeakModelHandle<Project>>,
  53    joined_projects: HashSet<WeakModelHandle<Project>>,
  54    local_participant: LocalParticipant,
  55    remote_participants: BTreeMap<u64, RemoteParticipant>,
  56    pending_participants: Vec<Arc<User>>,
  57    participant_user_ids: HashSet<u64>,
  58    pending_call_count: usize,
  59    leave_when_empty: bool,
  60    client: Arc<Client>,
  61    user_store: ModelHandle<UserStore>,
  62    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  63    subscriptions: Vec<client::Subscription>,
  64    pending_room_update: Option<Task<()>>,
  65    maintain_connection: Option<Task<Option<()>>>,
  66}
  67
  68impl Entity for Room {
  69    type Event = Event;
  70
  71    fn release(&mut self, cx: &mut AppContext) {
  72        if self.status.is_online() {
  73            self.leave_internal(cx).detach_and_log_err(cx);
  74        }
  75    }
  76
  77    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  78        if self.status.is_online() {
  79            let leave = self.leave_internal(cx);
  80            Some(
  81                cx.background()
  82                    .spawn(async move {
  83                        leave.await.log_err();
  84                    })
  85                    .boxed(),
  86            )
  87        } else {
  88            None
  89        }
  90    }
  91}
  92
  93impl Room {
  94    fn new(
  95        id: u64,
  96        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  97        client: Arc<Client>,
  98        user_store: ModelHandle<UserStore>,
  99        cx: &mut ModelContext<Self>,
 100    ) -> Self {
 101        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 102            let room = live_kit_client::Room::new();
 103            let mut status = room.status();
 104            // Consume the initial status of the room.
 105            let _ = status.try_recv();
 106            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 107                while let Some(status) = status.next().await {
 108                    let this = if let Some(this) = this.upgrade(&cx) {
 109                        this
 110                    } else {
 111                        break;
 112                    };
 113
 114                    if status == live_kit_client::ConnectionState::Disconnected {
 115                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 116                        break;
 117                    }
 118                }
 119            });
 120
 121            let mut track_video_changes = room.remote_video_track_updates();
 122            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 123                while let Some(track_change) = track_video_changes.next().await {
 124                    let this = if let Some(this) = this.upgrade(&cx) {
 125                        this
 126                    } else {
 127                        break;
 128                    };
 129
 130                    this.update(&mut cx, |this, cx| {
 131                        this.remote_video_track_updated(track_change, cx).log_err()
 132                    });
 133                }
 134            });
 135
 136            let mut track_audio_changes = room.remote_audio_track_updates();
 137            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 138                while let Some(track_change) = track_audio_changes.next().await {
 139                    let this = if let Some(this) = this.upgrade(&cx) {
 140                        this
 141                    } else {
 142                        break;
 143                    };
 144
 145                    this.update(&mut cx, |this, cx| {
 146                        this.remote_audio_track_updated(track_change, cx).log_err()
 147                    });
 148                }
 149            });
 150
 151            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 152            cx.spawn(|this, mut cx| async move {
 153                connect.await?;
 154                this.update(&mut cx, |this, cx| this.share_microphone(cx))
 155                    .await?;
 156
 157                anyhow::Ok(())
 158            })
 159            .detach_and_log_err(cx);
 160
 161            Some(LiveKitRoom {
 162                room,
 163                screen_track: LocalTrack::None,
 164                microphone_track: LocalTrack::None,
 165                next_publish_id: 0,
 166                deafened: false,
 167                _maintain_room,
 168                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 169            })
 170        } else {
 171            None
 172        };
 173
 174        let maintain_connection =
 175            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 176
 177        Self {
 178            id,
 179            live_kit: live_kit_room,
 180            status: RoomStatus::Online,
 181            shared_projects: Default::default(),
 182            joined_projects: Default::default(),
 183            participant_user_ids: Default::default(),
 184            local_participant: Default::default(),
 185            remote_participants: Default::default(),
 186            pending_participants: Default::default(),
 187            pending_call_count: 0,
 188            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 189            leave_when_empty: false,
 190            pending_room_update: None,
 191            client,
 192            user_store,
 193            follows_by_leader_id_project_id: Default::default(),
 194            maintain_connection: Some(maintain_connection),
 195        }
 196    }
 197
 198    pub(crate) fn create(
 199        called_user_id: u64,
 200        initial_project: Option<ModelHandle<Project>>,
 201        client: Arc<Client>,
 202        user_store: ModelHandle<UserStore>,
 203        cx: &mut AppContext,
 204    ) -> Task<Result<ModelHandle<Self>>> {
 205        cx.spawn(|mut cx| async move {
 206            let response = client.request(proto::CreateRoom {}).await?;
 207            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 208            let room = cx.add_model(|cx| {
 209                Self::new(
 210                    room_proto.id,
 211                    response.live_kit_connection_info,
 212                    client,
 213                    user_store,
 214                    cx,
 215                )
 216            });
 217
 218            let initial_project_id = if let Some(initial_project) = initial_project {
 219                let initial_project_id = room
 220                    .update(&mut cx, |room, cx| {
 221                        room.share_project(initial_project.clone(), cx)
 222                    })
 223                    .await?;
 224                Some(initial_project_id)
 225            } else {
 226                None
 227            };
 228
 229            match room
 230                .update(&mut cx, |room, cx| {
 231                    room.leave_when_empty = true;
 232                    room.call(called_user_id, initial_project_id, cx)
 233                })
 234                .await
 235            {
 236                Ok(()) => Ok(room),
 237                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 238            }
 239        })
 240    }
 241
 242    pub(crate) fn join(
 243        call: &IncomingCall,
 244        client: Arc<Client>,
 245        user_store: ModelHandle<UserStore>,
 246        cx: &mut AppContext,
 247    ) -> Task<Result<ModelHandle<Self>>> {
 248        let room_id = call.room_id;
 249        cx.spawn(|mut cx| async move {
 250            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 251            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 252            let room = cx.add_model(|cx| {
 253                Self::new(
 254                    room_id,
 255                    response.live_kit_connection_info,
 256                    client,
 257                    user_store,
 258                    cx,
 259                )
 260            });
 261            room.update(&mut cx, |room, cx| {
 262                room.leave_when_empty = true;
 263                room.apply_room_update(room_proto, cx)?;
 264                anyhow::Ok(())
 265            })?;
 266            Ok(room)
 267        })
 268    }
 269
 270    fn should_leave(&self) -> bool {
 271        self.leave_when_empty
 272            && self.pending_room_update.is_none()
 273            && self.pending_participants.is_empty()
 274            && self.remote_participants.is_empty()
 275            && self.pending_call_count == 0
 276    }
 277
 278    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 279        cx.notify();
 280        cx.emit(Event::Left);
 281        self.leave_internal(cx)
 282    }
 283
 284    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 285        if self.status.is_offline() {
 286            return Task::ready(Err(anyhow!("room is offline")));
 287        }
 288
 289        log::info!("leaving room");
 290
 291        for project in self.shared_projects.drain() {
 292            if let Some(project) = project.upgrade(cx) {
 293                project.update(cx, |project, cx| {
 294                    project.unshare(cx).log_err();
 295                });
 296            }
 297        }
 298        for project in self.joined_projects.drain() {
 299            if let Some(project) = project.upgrade(cx) {
 300                project.update(cx, |project, cx| {
 301                    project.disconnected_from_host(cx);
 302                    project.close(cx);
 303                });
 304            }
 305        }
 306
 307        self.status = RoomStatus::Offline;
 308        self.remote_participants.clear();
 309        self.pending_participants.clear();
 310        self.participant_user_ids.clear();
 311        self.subscriptions.clear();
 312        self.live_kit.take();
 313        self.pending_room_update.take();
 314        self.maintain_connection.take();
 315
 316        let leave_room = self.client.request(proto::LeaveRoom {});
 317        cx.background().spawn(async move {
 318            leave_room.await?;
 319            anyhow::Ok(())
 320        })
 321    }
 322
 323    async fn maintain_connection(
 324        this: WeakModelHandle<Self>,
 325        client: Arc<Client>,
 326        mut cx: AsyncAppContext,
 327    ) -> Result<()> {
 328        let mut client_status = client.status();
 329        loop {
 330            let _ = client_status.try_recv();
 331            let is_connected = client_status.borrow().is_connected();
 332            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 333            if !is_connected || client_status.next().await.is_some() {
 334                log::info!("detected client disconnection");
 335
 336                this.upgrade(&cx)
 337                    .ok_or_else(|| anyhow!("room was dropped"))?
 338                    .update(&mut cx, |this, cx| {
 339                        this.status = RoomStatus::Rejoining;
 340                        cx.notify();
 341                    });
 342
 343                // Wait for client to re-establish a connection to the server.
 344                {
 345                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 346                    let client_reconnection = async {
 347                        let mut remaining_attempts = 3;
 348                        while remaining_attempts > 0 {
 349                            if client_status.borrow().is_connected() {
 350                                log::info!("client reconnected, attempting to rejoin room");
 351
 352                                let Some(this) = this.upgrade(&cx) else { break };
 353                                if this
 354                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 355                                    .await
 356                                    .log_err()
 357                                    .is_some()
 358                                {
 359                                    return true;
 360                                } else {
 361                                    remaining_attempts -= 1;
 362                                }
 363                            } else if client_status.borrow().is_signed_out() {
 364                                return false;
 365                            }
 366
 367                            log::info!(
 368                                "waiting for client status change, remaining attempts {}",
 369                                remaining_attempts
 370                            );
 371                            client_status.next().await;
 372                        }
 373                        false
 374                    }
 375                    .fuse();
 376                    futures::pin_mut!(client_reconnection);
 377
 378                    futures::select_biased! {
 379                        reconnected = client_reconnection => {
 380                            if reconnected {
 381                                log::info!("successfully reconnected to room");
 382                                // If we successfully joined the room, go back around the loop
 383                                // waiting for future connection status changes.
 384                                continue;
 385                            }
 386                        }
 387                        _ = reconnection_timeout => {
 388                            log::info!("room reconnection timeout expired");
 389                        }
 390                    }
 391                }
 392
 393                break;
 394            }
 395        }
 396
 397        // The client failed to re-establish a connection to the server
 398        // or an error occurred while trying to re-join the room. Either way
 399        // we leave the room and return an error.
 400        if let Some(this) = this.upgrade(&cx) {
 401            log::info!("reconnection failed, leaving room");
 402            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 403        }
 404        Err(anyhow!(
 405            "can't reconnect to room: client failed to re-establish connection"
 406        ))
 407    }
 408
 409    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 410        let mut projects = HashMap::default();
 411        let mut reshared_projects = Vec::new();
 412        let mut rejoined_projects = Vec::new();
 413        self.shared_projects.retain(|project| {
 414            if let Some(handle) = project.upgrade(cx) {
 415                let project = handle.read(cx);
 416                if let Some(project_id) = project.remote_id() {
 417                    projects.insert(project_id, handle.clone());
 418                    reshared_projects.push(proto::UpdateProject {
 419                        project_id,
 420                        worktrees: project.worktree_metadata_protos(cx),
 421                    });
 422                    return true;
 423                }
 424            }
 425            false
 426        });
 427        self.joined_projects.retain(|project| {
 428            if let Some(handle) = project.upgrade(cx) {
 429                let project = handle.read(cx);
 430                if let Some(project_id) = project.remote_id() {
 431                    projects.insert(project_id, handle.clone());
 432                    rejoined_projects.push(proto::RejoinProject {
 433                        id: project_id,
 434                        worktrees: project
 435                            .worktrees(cx)
 436                            .map(|worktree| {
 437                                let worktree = worktree.read(cx);
 438                                proto::RejoinWorktree {
 439                                    id: worktree.id().to_proto(),
 440                                    scan_id: worktree.completed_scan_id() as u64,
 441                                }
 442                            })
 443                            .collect(),
 444                    });
 445                }
 446                return true;
 447            }
 448            false
 449        });
 450
 451        let response = self.client.request_envelope(proto::RejoinRoom {
 452            id: self.id,
 453            reshared_projects,
 454            rejoined_projects,
 455        });
 456
 457        cx.spawn(|this, mut cx| async move {
 458            let response = response.await?;
 459            let message_id = response.message_id;
 460            let response = response.payload;
 461            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 462            this.update(&mut cx, |this, cx| {
 463                this.status = RoomStatus::Online;
 464                this.apply_room_update(room_proto, cx)?;
 465
 466                for reshared_project in response.reshared_projects {
 467                    if let Some(project) = projects.get(&reshared_project.id) {
 468                        project.update(cx, |project, cx| {
 469                            project.reshared(reshared_project, cx).log_err();
 470                        });
 471                    }
 472                }
 473
 474                for rejoined_project in response.rejoined_projects {
 475                    if let Some(project) = projects.get(&rejoined_project.id) {
 476                        project.update(cx, |project, cx| {
 477                            project.rejoined(rejoined_project, message_id, cx).log_err();
 478                        });
 479                    }
 480                }
 481
 482                anyhow::Ok(())
 483            })
 484        })
 485    }
 486
 487    pub fn id(&self) -> u64 {
 488        self.id
 489    }
 490
 491    pub fn status(&self) -> RoomStatus {
 492        self.status
 493    }
 494
 495    pub fn local_participant(&self) -> &LocalParticipant {
 496        &self.local_participant
 497    }
 498
 499    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 500        &self.remote_participants
 501    }
 502
 503    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 504        self.remote_participants
 505            .values()
 506            .find(|p| p.peer_id == peer_id)
 507    }
 508
 509    pub fn pending_participants(&self) -> &[Arc<User>] {
 510        &self.pending_participants
 511    }
 512
 513    pub fn contains_participant(&self, user_id: u64) -> bool {
 514        self.participant_user_ids.contains(&user_id)
 515    }
 516
 517    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 518        self.follows_by_leader_id_project_id
 519            .get(&(leader_id, project_id))
 520            .map_or(&[], |v| v.as_slice())
 521    }
 522
 523    async fn handle_room_updated(
 524        this: ModelHandle<Self>,
 525        envelope: TypedEnvelope<proto::RoomUpdated>,
 526        _: Arc<Client>,
 527        mut cx: AsyncAppContext,
 528    ) -> Result<()> {
 529        let room = envelope
 530            .payload
 531            .room
 532            .ok_or_else(|| anyhow!("invalid room"))?;
 533        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 534    }
 535
 536    fn apply_room_update(
 537        &mut self,
 538        mut room: proto::Room,
 539        cx: &mut ModelContext<Self>,
 540    ) -> Result<()> {
 541        // Filter ourselves out from the room's participants.
 542        let local_participant_ix = room
 543            .participants
 544            .iter()
 545            .position(|participant| Some(participant.user_id) == self.client.user_id());
 546        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 547
 548        let pending_participant_user_ids = room
 549            .pending_participants
 550            .iter()
 551            .map(|p| p.user_id)
 552            .collect::<Vec<_>>();
 553
 554        let remote_participant_user_ids = room
 555            .participants
 556            .iter()
 557            .map(|p| p.user_id)
 558            .collect::<Vec<_>>();
 559
 560        let (remote_participants, pending_participants) =
 561            self.user_store.update(cx, move |user_store, cx| {
 562                (
 563                    user_store.get_users(remote_participant_user_ids, cx),
 564                    user_store.get_users(pending_participant_user_ids, cx),
 565                )
 566            });
 567
 568        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 569            let (remote_participants, pending_participants) =
 570                futures::join!(remote_participants, pending_participants);
 571
 572            this.update(&mut cx, |this, cx| {
 573                this.participant_user_ids.clear();
 574
 575                if let Some(participant) = local_participant {
 576                    this.local_participant.projects = participant.projects;
 577                } else {
 578                    this.local_participant.projects.clear();
 579                }
 580
 581                if let Some(participants) = remote_participants.log_err() {
 582                    for (participant, user) in room.participants.into_iter().zip(participants) {
 583                        let Some(peer_id) = participant.peer_id else { continue };
 584                        this.participant_user_ids.insert(participant.user_id);
 585
 586                        let old_projects = this
 587                            .remote_participants
 588                            .get(&participant.user_id)
 589                            .into_iter()
 590                            .flat_map(|existing| &existing.projects)
 591                            .map(|project| project.id)
 592                            .collect::<HashSet<_>>();
 593                        let new_projects = participant
 594                            .projects
 595                            .iter()
 596                            .map(|project| project.id)
 597                            .collect::<HashSet<_>>();
 598
 599                        for project in &participant.projects {
 600                            if !old_projects.contains(&project.id) {
 601                                cx.emit(Event::RemoteProjectShared {
 602                                    owner: user.clone(),
 603                                    project_id: project.id,
 604                                    worktree_root_names: project.worktree_root_names.clone(),
 605                                });
 606                            }
 607                        }
 608
 609                        for unshared_project_id in old_projects.difference(&new_projects) {
 610                            this.joined_projects.retain(|project| {
 611                                if let Some(project) = project.upgrade(cx) {
 612                                    project.update(cx, |project, cx| {
 613                                        if project.remote_id() == Some(*unshared_project_id) {
 614                                            project.disconnected_from_host(cx);
 615                                            false
 616                                        } else {
 617                                            true
 618                                        }
 619                                    })
 620                                } else {
 621                                    false
 622                                }
 623                            });
 624                            cx.emit(Event::RemoteProjectUnshared {
 625                                project_id: *unshared_project_id,
 626                            });
 627                        }
 628
 629                        let location = ParticipantLocation::from_proto(participant.location)
 630                            .unwrap_or(ParticipantLocation::External);
 631                        if let Some(remote_participant) =
 632                            this.remote_participants.get_mut(&participant.user_id)
 633                        {
 634                            remote_participant.projects = participant.projects;
 635                            remote_participant.peer_id = peer_id;
 636                            if location != remote_participant.location {
 637                                remote_participant.location = location;
 638                                cx.emit(Event::ParticipantLocationChanged {
 639                                    participant_id: peer_id,
 640                                });
 641                            }
 642                        } else {
 643                            this.remote_participants.insert(
 644                                participant.user_id,
 645                                RemoteParticipant {
 646                                    user: user.clone(),
 647                                    peer_id,
 648                                    projects: participant.projects,
 649                                    location,
 650                                    video_tracks: Default::default(),
 651                                    audio_tracks: Default::default(),
 652                                },
 653                            );
 654
 655                            if let Some(live_kit) = this.live_kit.as_ref() {
 656                                let video_tracks =
 657                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 658                                let audio_tracks =
 659                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 660                                for track in video_tracks {
 661                                    this.remote_video_track_updated(
 662                                        RemoteVideoTrackUpdate::Subscribed(track),
 663                                        cx,
 664                                    )
 665                                    .log_err();
 666                                }
 667                                for track in audio_tracks {
 668                                    this.remote_audio_track_updated(
 669                                        RemoteAudioTrackUpdate::Subscribed(track),
 670                                        cx,
 671                                    )
 672                                    .log_err();
 673                                }
 674                            }
 675                        }
 676                    }
 677
 678                    this.remote_participants.retain(|user_id, participant| {
 679                        if this.participant_user_ids.contains(user_id) {
 680                            true
 681                        } else {
 682                            for project in &participant.projects {
 683                                cx.emit(Event::RemoteProjectUnshared {
 684                                    project_id: project.id,
 685                                });
 686                            }
 687                            false
 688                        }
 689                    });
 690                }
 691
 692                if let Some(pending_participants) = pending_participants.log_err() {
 693                    this.pending_participants = pending_participants;
 694                    for participant in &this.pending_participants {
 695                        this.participant_user_ids.insert(participant.id);
 696                    }
 697                }
 698
 699                this.follows_by_leader_id_project_id.clear();
 700                for follower in room.followers {
 701                    let project_id = follower.project_id;
 702                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 703                        (Some(leader), Some(follower)) => (leader, follower),
 704
 705                        _ => {
 706                            log::error!("Follower message {follower:?} missing some state");
 707                            continue;
 708                        }
 709                    };
 710
 711                    let list = this
 712                        .follows_by_leader_id_project_id
 713                        .entry((leader, project_id))
 714                        .or_insert(Vec::new());
 715                    if !list.contains(&follower) {
 716                        list.push(follower);
 717                    }
 718                }
 719
 720                this.pending_room_update.take();
 721                if this.should_leave() {
 722                    log::info!("room is empty, leaving");
 723                    let _ = this.leave(cx);
 724                }
 725
 726                this.check_invariants();
 727                cx.notify();
 728            });
 729        }));
 730
 731        cx.notify();
 732        Ok(())
 733    }
 734
 735    fn remote_video_track_updated(
 736        &mut self,
 737        change: RemoteVideoTrackUpdate,
 738        cx: &mut ModelContext<Self>,
 739    ) -> Result<()> {
 740        match change {
 741            RemoteVideoTrackUpdate::Subscribed(track) => {
 742                let user_id = track.publisher_id().parse()?;
 743                let track_id = track.sid().to_string();
 744                let participant = self
 745                    .remote_participants
 746                    .get_mut(&user_id)
 747                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 748                participant.video_tracks.insert(
 749                    track_id.clone(),
 750                    Arc::new(RemoteVideoTrack {
 751                        live_kit_track: track,
 752                    }),
 753                );
 754                cx.emit(Event::RemoteVideoTracksChanged {
 755                    participant_id: participant.peer_id,
 756                });
 757            }
 758            RemoteVideoTrackUpdate::Unsubscribed {
 759                publisher_id,
 760                track_id,
 761            } => {
 762                let user_id = publisher_id.parse()?;
 763                let participant = self
 764                    .remote_participants
 765                    .get_mut(&user_id)
 766                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 767                participant.video_tracks.remove(&track_id);
 768                cx.emit(Event::RemoteVideoTracksChanged {
 769                    participant_id: participant.peer_id,
 770                });
 771            }
 772        }
 773
 774        cx.notify();
 775        Ok(())
 776    }
 777
 778    fn remote_audio_track_updated(
 779        &mut self,
 780        change: RemoteAudioTrackUpdate,
 781        cx: &mut ModelContext<Self>,
 782    ) -> Result<()> {
 783        match change {
 784            RemoteAudioTrackUpdate::Subscribed(track) => {
 785                let user_id = track.publisher_id().parse()?;
 786                let track_id = track.sid().to_string();
 787                let participant = self
 788                    .remote_participants
 789                    .get_mut(&user_id)
 790                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 791                participant.audio_tracks.insert(track_id.clone(), track);
 792                cx.emit(Event::RemoteAudioTracksChanged {
 793                    participant_id: participant.peer_id,
 794                });
 795            }
 796            RemoteAudioTrackUpdate::Unsubscribed {
 797                publisher_id,
 798                track_id,
 799            } => {
 800                let user_id = publisher_id.parse()?;
 801                let participant = self
 802                    .remote_participants
 803                    .get_mut(&user_id)
 804                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 805                participant.audio_tracks.remove(&track_id);
 806                cx.emit(Event::RemoteAudioTracksChanged {
 807                    participant_id: participant.peer_id,
 808                });
 809            }
 810        }
 811
 812        cx.notify();
 813        Ok(())
 814    }
 815
 816    fn check_invariants(&self) {
 817        #[cfg(any(test, feature = "test-support"))]
 818        {
 819            for participant in self.remote_participants.values() {
 820                assert!(self.participant_user_ids.contains(&participant.user.id));
 821                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 822            }
 823
 824            for participant in &self.pending_participants {
 825                assert!(self.participant_user_ids.contains(&participant.id));
 826                assert_ne!(participant.id, self.client.user_id().unwrap());
 827            }
 828
 829            assert_eq!(
 830                self.participant_user_ids.len(),
 831                self.remote_participants.len() + self.pending_participants.len()
 832            );
 833        }
 834    }
 835
 836    pub(crate) fn call(
 837        &mut self,
 838        called_user_id: u64,
 839        initial_project_id: Option<u64>,
 840        cx: &mut ModelContext<Self>,
 841    ) -> Task<Result<()>> {
 842        if self.status.is_offline() {
 843            return Task::ready(Err(anyhow!("room is offline")));
 844        }
 845
 846        cx.notify();
 847        let client = self.client.clone();
 848        let room_id = self.id;
 849        self.pending_call_count += 1;
 850        cx.spawn(|this, mut cx| async move {
 851            let result = client
 852                .request(proto::Call {
 853                    room_id,
 854                    called_user_id,
 855                    initial_project_id,
 856                })
 857                .await;
 858            this.update(&mut cx, |this, cx| {
 859                this.pending_call_count -= 1;
 860                if this.should_leave() {
 861                    this.leave(cx).detach_and_log_err(cx);
 862                }
 863            });
 864            result?;
 865            Ok(())
 866        })
 867    }
 868
 869    pub fn join_project(
 870        &mut self,
 871        id: u64,
 872        language_registry: Arc<LanguageRegistry>,
 873        fs: Arc<dyn Fs>,
 874        cx: &mut ModelContext<Self>,
 875    ) -> Task<Result<ModelHandle<Project>>> {
 876        let client = self.client.clone();
 877        let user_store = self.user_store.clone();
 878        cx.spawn(|this, mut cx| async move {
 879            let project =
 880                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 881            this.update(&mut cx, |this, cx| {
 882                this.joined_projects.retain(|project| {
 883                    if let Some(project) = project.upgrade(cx) {
 884                        !project.read(cx).is_read_only()
 885                    } else {
 886                        false
 887                    }
 888                });
 889                this.joined_projects.insert(project.downgrade());
 890            });
 891            Ok(project)
 892        })
 893    }
 894
 895    pub(crate) fn share_project(
 896        &mut self,
 897        project: ModelHandle<Project>,
 898        cx: &mut ModelContext<Self>,
 899    ) -> Task<Result<u64>> {
 900        if let Some(project_id) = project.read(cx).remote_id() {
 901            return Task::ready(Ok(project_id));
 902        }
 903
 904        let request = self.client.request(proto::ShareProject {
 905            room_id: self.id(),
 906            worktrees: project.read(cx).worktree_metadata_protos(cx),
 907        });
 908        cx.spawn(|this, mut cx| async move {
 909            let response = request.await?;
 910
 911            project.update(&mut cx, |project, cx| {
 912                project.shared(response.project_id, cx)
 913            })?;
 914
 915            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 916            this.update(&mut cx, |this, cx| {
 917                this.shared_projects.insert(project.downgrade());
 918                let active_project = this.local_participant.active_project.as_ref();
 919                if active_project.map_or(false, |location| *location == project) {
 920                    this.set_location(Some(&project), cx)
 921                } else {
 922                    Task::ready(Ok(()))
 923                }
 924            })
 925            .await?;
 926
 927            Ok(response.project_id)
 928        })
 929    }
 930
 931    pub(crate) fn unshare_project(
 932        &mut self,
 933        project: ModelHandle<Project>,
 934        cx: &mut ModelContext<Self>,
 935    ) -> Result<()> {
 936        let project_id = match project.read(cx).remote_id() {
 937            Some(project_id) => project_id,
 938            None => return Ok(()),
 939        };
 940
 941        self.client.send(proto::UnshareProject { project_id })?;
 942        project.update(cx, |this, cx| this.unshare(cx))
 943    }
 944
 945    pub(crate) fn set_location(
 946        &mut self,
 947        project: Option<&ModelHandle<Project>>,
 948        cx: &mut ModelContext<Self>,
 949    ) -> Task<Result<()>> {
 950        if self.status.is_offline() {
 951            return Task::ready(Err(anyhow!("room is offline")));
 952        }
 953
 954        let client = self.client.clone();
 955        let room_id = self.id;
 956        let location = if let Some(project) = project {
 957            self.local_participant.active_project = Some(project.downgrade());
 958            if let Some(project_id) = project.read(cx).remote_id() {
 959                proto::participant_location::Variant::SharedProject(
 960                    proto::participant_location::SharedProject { id: project_id },
 961                )
 962            } else {
 963                proto::participant_location::Variant::UnsharedProject(
 964                    proto::participant_location::UnsharedProject {},
 965                )
 966            }
 967        } else {
 968            self.local_participant.active_project = None;
 969            proto::participant_location::Variant::External(proto::participant_location::External {})
 970        };
 971
 972        cx.notify();
 973        cx.foreground().spawn(async move {
 974            client
 975                .request(proto::UpdateParticipantLocation {
 976                    room_id,
 977                    location: Some(proto::ParticipantLocation {
 978                        variant: Some(location),
 979                    }),
 980                })
 981                .await?;
 982            Ok(())
 983        })
 984    }
 985
 986    pub fn is_screen_sharing(&self) -> bool {
 987        self.live_kit.as_ref().map_or(false, |live_kit| {
 988            !matches!(live_kit.screen_track, LocalTrack::None)
 989        })
 990    }
 991
 992    pub fn is_sharing_mic(&self) -> bool {
 993        self.live_kit.as_ref().map_or(false, |live_kit| {
 994            !matches!(live_kit.microphone_track, LocalTrack::None)
 995        })
 996    }
 997
 998    pub fn is_muted(&self) -> Option<bool> {
 999        self.live_kit
1000            .as_ref()
1001            .and_then(|live_kit| match &live_kit.microphone_track {
1002                LocalTrack::None => None,
1003                LocalTrack::Pending { muted, .. } => Some(*muted),
1004                LocalTrack::Published { muted, .. } => Some(*muted),
1005            })
1006    }
1007
1008    pub fn is_deafened(&self) -> Option<bool> {
1009        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1010    }
1011
1012    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1013        if self.status.is_offline() {
1014            return Task::ready(Err(anyhow!("room is offline")));
1015        } else if self.is_sharing_mic() {
1016            return Task::ready(Err(anyhow!("microphone was already shared")));
1017        }
1018
1019        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1020            let publish_id = post_inc(&mut live_kit.next_publish_id);
1021            live_kit.microphone_track = LocalTrack::Pending {
1022                publish_id,
1023                muted: false,
1024            };
1025            cx.notify();
1026            publish_id
1027        } else {
1028            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1029        };
1030
1031        cx.spawn_weak(|this, mut cx| async move {
1032            let publish_track = async {
1033                let track = LocalAudioTrack::create();
1034                this.upgrade(&cx)
1035                    .ok_or_else(|| anyhow!("room was dropped"))?
1036                    .read_with(&cx, |this, _| {
1037                        this.live_kit
1038                            .as_ref()
1039                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1040                    })
1041                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1042                    .await
1043            };
1044
1045            let publication = publish_track.await;
1046            this.upgrade(&cx)
1047                .ok_or_else(|| anyhow!("room was dropped"))?
1048                .update(&mut cx, |this, cx| {
1049                    let live_kit = this
1050                        .live_kit
1051                        .as_mut()
1052                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1053
1054                    let (canceled, muted) = if let LocalTrack::Pending {
1055                        publish_id: cur_publish_id,
1056                        muted,
1057                    } = &live_kit.microphone_track
1058                    {
1059                        (*cur_publish_id != publish_id, *muted)
1060                    } else {
1061                        (true, false)
1062                    };
1063
1064                    match publication {
1065                        Ok(publication) => {
1066                            if canceled {
1067                                live_kit.room.unpublish_track(publication);
1068                            } else {
1069                                if muted {
1070                                    cx.background().spawn(publication.set_mute(muted)).detach();
1071                                }
1072                                live_kit.microphone_track = LocalTrack::Published {
1073                                    track_publication: publication,
1074                                    muted,
1075                                };
1076                                cx.notify();
1077                            }
1078                            Ok(())
1079                        }
1080                        Err(error) => {
1081                            if canceled {
1082                                Ok(())
1083                            } else {
1084                                live_kit.microphone_track = LocalTrack::None;
1085                                cx.notify();
1086                                Err(error)
1087                            }
1088                        }
1089                    }
1090                })
1091        })
1092    }
1093
1094    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1095        if self.status.is_offline() {
1096            return Task::ready(Err(anyhow!("room is offline")));
1097        } else if self.is_screen_sharing() {
1098            return Task::ready(Err(anyhow!("screen was already shared")));
1099        }
1100
1101        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1102            let publish_id = post_inc(&mut live_kit.next_publish_id);
1103            live_kit.screen_track = LocalTrack::Pending {
1104                publish_id,
1105                muted: false,
1106            };
1107            cx.notify();
1108            (live_kit.room.display_sources(), publish_id)
1109        } else {
1110            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1111        };
1112
1113        cx.spawn_weak(|this, mut cx| async move {
1114            let publish_track = async {
1115                let displays = displays.await?;
1116                let display = displays
1117                    .first()
1118                    .ok_or_else(|| anyhow!("no display found"))?;
1119                let track = LocalVideoTrack::screen_share_for_display(&display);
1120                this.upgrade(&cx)
1121                    .ok_or_else(|| anyhow!("room was dropped"))?
1122                    .read_with(&cx, |this, _| {
1123                        this.live_kit
1124                            .as_ref()
1125                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1126                    })
1127                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1128                    .await
1129            };
1130
1131            let publication = publish_track.await;
1132            this.upgrade(&cx)
1133                .ok_or_else(|| anyhow!("room was dropped"))?
1134                .update(&mut cx, |this, cx| {
1135                    let live_kit = this
1136                        .live_kit
1137                        .as_mut()
1138                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1139
1140                    let (canceled, muted) = if let LocalTrack::Pending {
1141                        publish_id: cur_publish_id,
1142                        muted,
1143                    } = &live_kit.screen_track
1144                    {
1145                        (*cur_publish_id != publish_id, *muted)
1146                    } else {
1147                        (true, false)
1148                    };
1149
1150                    match publication {
1151                        Ok(publication) => {
1152                            if canceled {
1153                                live_kit.room.unpublish_track(publication);
1154                            } else {
1155                                if muted {
1156                                    cx.background().spawn(publication.set_mute(muted)).detach();
1157                                }
1158                                live_kit.screen_track = LocalTrack::Published {
1159                                    track_publication: publication,
1160                                    muted,
1161                                };
1162                                cx.notify();
1163                            }
1164                            Ok(())
1165                        }
1166                        Err(error) => {
1167                            if canceled {
1168                                Ok(())
1169                            } else {
1170                                live_kit.screen_track = LocalTrack::None;
1171                                cx.notify();
1172                                Err(error)
1173                            }
1174                        }
1175                    }
1176                })
1177        })
1178    }
1179
1180    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1181        if let Some(live_kit) = self.live_kit.as_mut() {
1182            match &mut live_kit.microphone_track {
1183                LocalTrack::None => Err(anyhow!("microphone was not shared")),
1184                LocalTrack::Pending { muted, .. } => {
1185                    *muted = !*muted;
1186                    Ok(Task::Ready(Some(Ok(()))))
1187                }
1188                LocalTrack::Published {
1189                    track_publication,
1190                    muted,
1191                } => {
1192                    *muted = !*muted;
1193
1194                    Ok(cx.background().spawn(track_publication.set_mute(*muted)))
1195                }
1196            }
1197        } else {
1198            Err(anyhow!("LiveKit not started"))
1199        }
1200    }
1201
1202    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1203        if let Some(live_kit) = &mut self.live_kit {
1204            (*live_kit).deafened = !live_kit.deafened
1205        }
1206
1207        if let Some(live_kit) = &self.live_kit {
1208            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1209
1210            for participant in self.remote_participants.values() {
1211                for track in live_kit
1212                    .room
1213                    .remote_audio_track_publications(&participant.user.id.to_string())
1214                {
1215                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1216                }
1217            }
1218
1219            Ok(cx.foreground().spawn(async move {
1220                for task in tasks {
1221                    task.await?;
1222                }
1223                Ok(())
1224            }))
1225        } else {
1226            Err(anyhow!("LiveKit not started"))
1227        }
1228    }
1229
1230    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1231        if self.status.is_offline() {
1232            return Err(anyhow!("room is offline"));
1233        }
1234
1235        let live_kit = self
1236            .live_kit
1237            .as_mut()
1238            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1239        match mem::take(&mut live_kit.screen_track) {
1240            LocalTrack::None => Err(anyhow!("screen was not shared")),
1241            LocalTrack::Pending { .. } => {
1242                cx.notify();
1243                Ok(())
1244            }
1245            LocalTrack::Published {
1246                track_publication, ..
1247            } => {
1248                live_kit.room.unpublish_track(track_publication);
1249                cx.notify();
1250                Ok(())
1251            }
1252        }
1253    }
1254
1255    #[cfg(any(test, feature = "test-support"))]
1256    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1257        self.live_kit
1258            .as_ref()
1259            .unwrap()
1260            .room
1261            .set_display_sources(sources);
1262    }
1263}
1264
1265struct LiveKitRoom {
1266    room: Arc<live_kit_client::Room>,
1267    screen_track: LocalTrack,
1268    microphone_track: LocalTrack,
1269    deafened: bool,
1270    next_publish_id: usize,
1271    _maintain_room: Task<()>,
1272    _maintain_tracks: [Task<()>; 2],
1273}
1274
1275enum LocalTrack {
1276    None,
1277    Pending {
1278        publish_id: usize,
1279        muted: bool,
1280    },
1281    Published {
1282        track_publication: LocalTrackPublication,
1283        muted: bool,
1284    },
1285}
1286
1287impl Default for LocalTrack {
1288    fn default() -> Self {
1289        Self::None
1290    }
1291}
1292
1293#[derive(Copy, Clone, PartialEq, Eq)]
1294pub enum RoomStatus {
1295    Online,
1296    Rejoining,
1297    Offline,
1298}
1299
1300impl RoomStatus {
1301    pub fn is_offline(&self) -> bool {
1302        matches!(self, RoomStatus::Offline)
1303    }
1304
1305    pub fn is_online(&self) -> bool {
1306        matches!(self, RoomStatus::Online)
1307    }
1308}