room.rs

   1use crate::{
   2    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant, RemoteVideoTrack},
   3    IncomingCall,
   4};
   5use anyhow::{anyhow, Result};
   6use client::{
   7    proto::{self, PeerId},
   8    Client, TypedEnvelope, User, UserStore,
   9};
  10use collections::{BTreeMap, HashMap, HashSet};
  11use fs::Fs;
  12use futures::{FutureExt, StreamExt};
  13use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  14use language::LanguageRegistry;
  15use live_kit_client::{
  16    LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate,
  17    RemoteVideoTrackUpdate,
  18};
  19use postage::stream::Stream;
  20use project::Project;
  21use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration};
  22use util::{post_inc, ResultExt, TryFutureExt};
  23
  24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  25
  26#[derive(Clone, Debug, PartialEq, Eq)]
  27pub enum Event {
  28    ParticipantLocationChanged {
  29        participant_id: proto::PeerId,
  30    },
  31    RemoteVideoTracksChanged {
  32        participant_id: proto::PeerId,
  33    },
  34    RemoteAudioTracksChanged {
  35        participant_id: proto::PeerId,
  36    },
  37    RemoteProjectShared {
  38        owner: Arc<User>,
  39        project_id: u64,
  40        worktree_root_names: Vec<String>,
  41    },
  42    RemoteProjectUnshared {
  43        project_id: u64,
  44    },
  45    Left,
  46}
  47
  48pub struct Room {
  49    id: u64,
  50    live_kit: Option<LiveKitRoom>,
  51    status: RoomStatus,
  52    shared_projects: HashSet<WeakModelHandle<Project>>,
  53    joined_projects: HashSet<WeakModelHandle<Project>>,
  54    local_participant: LocalParticipant,
  55    remote_participants: BTreeMap<u64, RemoteParticipant>,
  56    pending_participants: Vec<Arc<User>>,
  57    participant_user_ids: HashSet<u64>,
  58    pending_call_count: usize,
  59    leave_when_empty: bool,
  60    client: Arc<Client>,
  61    user_store: ModelHandle<UserStore>,
  62    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  63    subscriptions: Vec<client::Subscription>,
  64    pending_room_update: Option<Task<()>>,
  65    maintain_connection: Option<Task<Option<()>>>,
  66}
  67
  68impl Entity for Room {
  69    type Event = Event;
  70
  71    fn release(&mut self, cx: &mut AppContext) {
  72        if self.status.is_online() {
  73            self.leave_internal(cx).detach_and_log_err(cx);
  74        }
  75    }
  76
  77    fn app_will_quit(&mut self, cx: &mut AppContext) -> Option<Pin<Box<dyn Future<Output = ()>>>> {
  78        if self.status.is_online() {
  79            let leave = self.leave_internal(cx);
  80            Some(
  81                cx.background()
  82                    .spawn(async move {
  83                        leave.await.log_err();
  84                    })
  85                    .boxed(),
  86            )
  87        } else {
  88            None
  89        }
  90    }
  91}
  92
  93impl Room {
  94    fn new(
  95        id: u64,
  96        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  97        client: Arc<Client>,
  98        user_store: ModelHandle<UserStore>,
  99        cx: &mut ModelContext<Self>,
 100    ) -> Self {
 101        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 102            let room = live_kit_client::Room::new();
 103            let mut status = room.status();
 104            // Consume the initial status of the room.
 105            let _ = status.try_recv();
 106            let _maintain_room = cx.spawn_weak(|this, mut cx| async move {
 107                while let Some(status) = status.next().await {
 108                    let this = if let Some(this) = this.upgrade(&cx) {
 109                        this
 110                    } else {
 111                        break;
 112                    };
 113
 114                    if status == live_kit_client::ConnectionState::Disconnected {
 115                        this.update(&mut cx, |this, cx| this.leave(cx).log_err());
 116                        break;
 117                    }
 118                }
 119            });
 120
 121            let mut track_video_changes = room.remote_video_track_updates();
 122            let _maintain_video_tracks = cx.spawn_weak(|this, mut cx| async move {
 123                while let Some(track_change) = track_video_changes.next().await {
 124                    let this = if let Some(this) = this.upgrade(&cx) {
 125                        this
 126                    } else {
 127                        break;
 128                    };
 129
 130                    this.update(&mut cx, |this, cx| {
 131                        this.remote_video_track_updated(track_change, cx).log_err()
 132                    });
 133                }
 134            });
 135
 136            let mut track_audio_changes = room.remote_audio_track_updates();
 137            let _maintain_audio_tracks = cx.spawn_weak(|this, mut cx| async move {
 138                while let Some(track_change) = track_audio_changes.next().await {
 139                    let this = if let Some(this) = this.upgrade(&cx) {
 140                        this
 141                    } else {
 142                        break;
 143                    };
 144
 145                    this.update(&mut cx, |this, cx| {
 146                        this.remote_audio_track_updated(track_change, cx).log_err()
 147                    });
 148                }
 149            });
 150
 151            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 152            cx.spawn(|this, mut cx| async move {
 153                connect.await?;
 154                this.update(&mut cx, |this, cx| this.share_microphone(cx))
 155                    .await?;
 156
 157                anyhow::Ok(())
 158            })
 159            .detach_and_log_err(cx);
 160
 161            Some(LiveKitRoom {
 162                room,
 163                screen_track: LocalTrack::None,
 164                microphone_track: LocalTrack::None,
 165                next_publish_id: 0,
 166                muted_by_user: false,
 167                deafened: false,
 168                speaking: false,
 169                _maintain_room,
 170                _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks],
 171            })
 172        } else {
 173            None
 174        };
 175
 176        let maintain_connection =
 177            cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx).log_err());
 178
 179        Self {
 180            id,
 181            live_kit: live_kit_room,
 182            status: RoomStatus::Online,
 183            shared_projects: Default::default(),
 184            joined_projects: Default::default(),
 185            participant_user_ids: Default::default(),
 186            local_participant: Default::default(),
 187            remote_participants: Default::default(),
 188            pending_participants: Default::default(),
 189            pending_call_count: 0,
 190            subscriptions: vec![client.add_message_handler(cx.handle(), Self::handle_room_updated)],
 191            leave_when_empty: false,
 192            pending_room_update: None,
 193            client,
 194            user_store,
 195            follows_by_leader_id_project_id: Default::default(),
 196            maintain_connection: Some(maintain_connection),
 197        }
 198    }
 199
 200    pub(crate) fn create(
 201        called_user_id: u64,
 202        initial_project: Option<ModelHandle<Project>>,
 203        client: Arc<Client>,
 204        user_store: ModelHandle<UserStore>,
 205        cx: &mut AppContext,
 206    ) -> Task<Result<ModelHandle<Self>>> {
 207        cx.spawn(|mut cx| async move {
 208            let response = client.request(proto::CreateRoom {}).await?;
 209            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 210            let room = cx.add_model(|cx| {
 211                Self::new(
 212                    room_proto.id,
 213                    response.live_kit_connection_info,
 214                    client,
 215                    user_store,
 216                    cx,
 217                )
 218            });
 219
 220            let initial_project_id = if let Some(initial_project) = initial_project {
 221                let initial_project_id = room
 222                    .update(&mut cx, |room, cx| {
 223                        room.share_project(initial_project.clone(), cx)
 224                    })
 225                    .await?;
 226                Some(initial_project_id)
 227            } else {
 228                None
 229            };
 230
 231            match room
 232                .update(&mut cx, |room, cx| {
 233                    room.leave_when_empty = true;
 234                    room.call(called_user_id, initial_project_id, cx)
 235                })
 236                .await
 237            {
 238                Ok(()) => Ok(room),
 239                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 240            }
 241        })
 242    }
 243
 244    pub(crate) fn join(
 245        call: &IncomingCall,
 246        client: Arc<Client>,
 247        user_store: ModelHandle<UserStore>,
 248        cx: &mut AppContext,
 249    ) -> Task<Result<ModelHandle<Self>>> {
 250        let room_id = call.room_id;
 251        cx.spawn(|mut cx| async move {
 252            let response = client.request(proto::JoinRoom { id: room_id }).await?;
 253            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 254            let room = cx.add_model(|cx| {
 255                Self::new(
 256                    room_id,
 257                    response.live_kit_connection_info,
 258                    client,
 259                    user_store,
 260                    cx,
 261                )
 262            });
 263            room.update(&mut cx, |room, cx| {
 264                room.leave_when_empty = true;
 265                room.apply_room_update(room_proto, cx)?;
 266                anyhow::Ok(())
 267            })?;
 268            Ok(room)
 269        })
 270    }
 271
 272    fn should_leave(&self) -> bool {
 273        self.leave_when_empty
 274            && self.pending_room_update.is_none()
 275            && self.pending_participants.is_empty()
 276            && self.remote_participants.is_empty()
 277            && self.pending_call_count == 0
 278    }
 279
 280    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 281        cx.notify();
 282        cx.emit(Event::Left);
 283        self.leave_internal(cx)
 284    }
 285
 286    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 287        if self.status.is_offline() {
 288            return Task::ready(Err(anyhow!("room is offline")));
 289        }
 290
 291        log::info!("leaving room");
 292
 293        for project in self.shared_projects.drain() {
 294            if let Some(project) = project.upgrade(cx) {
 295                project.update(cx, |project, cx| {
 296                    project.unshare(cx).log_err();
 297                });
 298            }
 299        }
 300        for project in self.joined_projects.drain() {
 301            if let Some(project) = project.upgrade(cx) {
 302                project.update(cx, |project, cx| {
 303                    project.disconnected_from_host(cx);
 304                    project.close(cx);
 305                });
 306            }
 307        }
 308
 309        self.status = RoomStatus::Offline;
 310        self.remote_participants.clear();
 311        self.pending_participants.clear();
 312        self.participant_user_ids.clear();
 313        self.subscriptions.clear();
 314        self.live_kit.take();
 315        self.pending_room_update.take();
 316        self.maintain_connection.take();
 317
 318        let leave_room = self.client.request(proto::LeaveRoom {});
 319        cx.background().spawn(async move {
 320            leave_room.await?;
 321            anyhow::Ok(())
 322        })
 323    }
 324
 325    async fn maintain_connection(
 326        this: WeakModelHandle<Self>,
 327        client: Arc<Client>,
 328        mut cx: AsyncAppContext,
 329    ) -> Result<()> {
 330        let mut client_status = client.status();
 331        loop {
 332            let _ = client_status.try_recv();
 333            let is_connected = client_status.borrow().is_connected();
 334            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 335            if !is_connected || client_status.next().await.is_some() {
 336                log::info!("detected client disconnection");
 337
 338                this.upgrade(&cx)
 339                    .ok_or_else(|| anyhow!("room was dropped"))?
 340                    .update(&mut cx, |this, cx| {
 341                        this.status = RoomStatus::Rejoining;
 342                        cx.notify();
 343                    });
 344
 345                // Wait for client to re-establish a connection to the server.
 346                {
 347                    let mut reconnection_timeout = cx.background().timer(RECONNECT_TIMEOUT).fuse();
 348                    let client_reconnection = async {
 349                        let mut remaining_attempts = 3;
 350                        while remaining_attempts > 0 {
 351                            if client_status.borrow().is_connected() {
 352                                log::info!("client reconnected, attempting to rejoin room");
 353
 354                                let Some(this) = this.upgrade(&cx) else { break };
 355                                if this
 356                                    .update(&mut cx, |this, cx| this.rejoin(cx))
 357                                    .await
 358                                    .log_err()
 359                                    .is_some()
 360                                {
 361                                    return true;
 362                                } else {
 363                                    remaining_attempts -= 1;
 364                                }
 365                            } else if client_status.borrow().is_signed_out() {
 366                                return false;
 367                            }
 368
 369                            log::info!(
 370                                "waiting for client status change, remaining attempts {}",
 371                                remaining_attempts
 372                            );
 373                            client_status.next().await;
 374                        }
 375                        false
 376                    }
 377                    .fuse();
 378                    futures::pin_mut!(client_reconnection);
 379
 380                    futures::select_biased! {
 381                        reconnected = client_reconnection => {
 382                            if reconnected {
 383                                log::info!("successfully reconnected to room");
 384                                // If we successfully joined the room, go back around the loop
 385                                // waiting for future connection status changes.
 386                                continue;
 387                            }
 388                        }
 389                        _ = reconnection_timeout => {
 390                            log::info!("room reconnection timeout expired");
 391                        }
 392                    }
 393                }
 394
 395                break;
 396            }
 397        }
 398
 399        // The client failed to re-establish a connection to the server
 400        // or an error occurred while trying to re-join the room. Either way
 401        // we leave the room and return an error.
 402        if let Some(this) = this.upgrade(&cx) {
 403            log::info!("reconnection failed, leaving room");
 404            let _ = this.update(&mut cx, |this, cx| this.leave(cx));
 405        }
 406        Err(anyhow!(
 407            "can't reconnect to room: client failed to re-establish connection"
 408        ))
 409    }
 410
 411    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 412        let mut projects = HashMap::default();
 413        let mut reshared_projects = Vec::new();
 414        let mut rejoined_projects = Vec::new();
 415        self.shared_projects.retain(|project| {
 416            if let Some(handle) = project.upgrade(cx) {
 417                let project = handle.read(cx);
 418                if let Some(project_id) = project.remote_id() {
 419                    projects.insert(project_id, handle.clone());
 420                    reshared_projects.push(proto::UpdateProject {
 421                        project_id,
 422                        worktrees: project.worktree_metadata_protos(cx),
 423                    });
 424                    return true;
 425                }
 426            }
 427            false
 428        });
 429        self.joined_projects.retain(|project| {
 430            if let Some(handle) = project.upgrade(cx) {
 431                let project = handle.read(cx);
 432                if let Some(project_id) = project.remote_id() {
 433                    projects.insert(project_id, handle.clone());
 434                    rejoined_projects.push(proto::RejoinProject {
 435                        id: project_id,
 436                        worktrees: project
 437                            .worktrees(cx)
 438                            .map(|worktree| {
 439                                let worktree = worktree.read(cx);
 440                                proto::RejoinWorktree {
 441                                    id: worktree.id().to_proto(),
 442                                    scan_id: worktree.completed_scan_id() as u64,
 443                                }
 444                            })
 445                            .collect(),
 446                    });
 447                }
 448                return true;
 449            }
 450            false
 451        });
 452
 453        let response = self.client.request_envelope(proto::RejoinRoom {
 454            id: self.id,
 455            reshared_projects,
 456            rejoined_projects,
 457        });
 458
 459        cx.spawn(|this, mut cx| async move {
 460            let response = response.await?;
 461            let message_id = response.message_id;
 462            let response = response.payload;
 463            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 464            this.update(&mut cx, |this, cx| {
 465                this.status = RoomStatus::Online;
 466                this.apply_room_update(room_proto, cx)?;
 467
 468                for reshared_project in response.reshared_projects {
 469                    if let Some(project) = projects.get(&reshared_project.id) {
 470                        project.update(cx, |project, cx| {
 471                            project.reshared(reshared_project, cx).log_err();
 472                        });
 473                    }
 474                }
 475
 476                for rejoined_project in response.rejoined_projects {
 477                    if let Some(project) = projects.get(&rejoined_project.id) {
 478                        project.update(cx, |project, cx| {
 479                            project.rejoined(rejoined_project, message_id, cx).log_err();
 480                        });
 481                    }
 482                }
 483
 484                anyhow::Ok(())
 485            })
 486        })
 487    }
 488
 489    pub fn id(&self) -> u64 {
 490        self.id
 491    }
 492
 493    pub fn status(&self) -> RoomStatus {
 494        self.status
 495    }
 496
 497    pub fn local_participant(&self) -> &LocalParticipant {
 498        &self.local_participant
 499    }
 500
 501    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 502        &self.remote_participants
 503    }
 504
 505    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 506        self.remote_participants
 507            .values()
 508            .find(|p| p.peer_id == peer_id)
 509    }
 510
 511    pub fn pending_participants(&self) -> &[Arc<User>] {
 512        &self.pending_participants
 513    }
 514
 515    pub fn contains_participant(&self, user_id: u64) -> bool {
 516        self.participant_user_ids.contains(&user_id)
 517    }
 518
 519    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 520        self.follows_by_leader_id_project_id
 521            .get(&(leader_id, project_id))
 522            .map_or(&[], |v| v.as_slice())
 523    }
 524
 525    async fn handle_room_updated(
 526        this: ModelHandle<Self>,
 527        envelope: TypedEnvelope<proto::RoomUpdated>,
 528        _: Arc<Client>,
 529        mut cx: AsyncAppContext,
 530    ) -> Result<()> {
 531        let room = envelope
 532            .payload
 533            .room
 534            .ok_or_else(|| anyhow!("invalid room"))?;
 535        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
 536    }
 537
 538    fn apply_room_update(
 539        &mut self,
 540        mut room: proto::Room,
 541        cx: &mut ModelContext<Self>,
 542    ) -> Result<()> {
 543        // Filter ourselves out from the room's participants.
 544        let local_participant_ix = room
 545            .participants
 546            .iter()
 547            .position(|participant| Some(participant.user_id) == self.client.user_id());
 548        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 549
 550        let pending_participant_user_ids = room
 551            .pending_participants
 552            .iter()
 553            .map(|p| p.user_id)
 554            .collect::<Vec<_>>();
 555
 556        let remote_participant_user_ids = room
 557            .participants
 558            .iter()
 559            .map(|p| p.user_id)
 560            .collect::<Vec<_>>();
 561
 562        let (remote_participants, pending_participants) =
 563            self.user_store.update(cx, move |user_store, cx| {
 564                (
 565                    user_store.get_users(remote_participant_user_ids, cx),
 566                    user_store.get_users(pending_participant_user_ids, cx),
 567                )
 568            });
 569
 570        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 571            let (remote_participants, pending_participants) =
 572                futures::join!(remote_participants, pending_participants);
 573
 574            this.update(&mut cx, |this, cx| {
 575                this.participant_user_ids.clear();
 576
 577                if let Some(participant) = local_participant {
 578                    this.local_participant.projects = participant.projects;
 579                } else {
 580                    this.local_participant.projects.clear();
 581                }
 582
 583                if let Some(participants) = remote_participants.log_err() {
 584                    for (participant, user) in room.participants.into_iter().zip(participants) {
 585                        let Some(peer_id) = participant.peer_id else { continue };
 586                        this.participant_user_ids.insert(participant.user_id);
 587
 588                        let old_projects = this
 589                            .remote_participants
 590                            .get(&participant.user_id)
 591                            .into_iter()
 592                            .flat_map(|existing| &existing.projects)
 593                            .map(|project| project.id)
 594                            .collect::<HashSet<_>>();
 595                        let new_projects = participant
 596                            .projects
 597                            .iter()
 598                            .map(|project| project.id)
 599                            .collect::<HashSet<_>>();
 600
 601                        for project in &participant.projects {
 602                            if !old_projects.contains(&project.id) {
 603                                cx.emit(Event::RemoteProjectShared {
 604                                    owner: user.clone(),
 605                                    project_id: project.id,
 606                                    worktree_root_names: project.worktree_root_names.clone(),
 607                                });
 608                            }
 609                        }
 610
 611                        for unshared_project_id in old_projects.difference(&new_projects) {
 612                            this.joined_projects.retain(|project| {
 613                                if let Some(project) = project.upgrade(cx) {
 614                                    project.update(cx, |project, cx| {
 615                                        if project.remote_id() == Some(*unshared_project_id) {
 616                                            project.disconnected_from_host(cx);
 617                                            false
 618                                        } else {
 619                                            true
 620                                        }
 621                                    })
 622                                } else {
 623                                    false
 624                                }
 625                            });
 626                            cx.emit(Event::RemoteProjectUnshared {
 627                                project_id: *unshared_project_id,
 628                            });
 629                        }
 630
 631                        let location = ParticipantLocation::from_proto(participant.location)
 632                            .unwrap_or(ParticipantLocation::External);
 633                        if let Some(remote_participant) =
 634                            this.remote_participants.get_mut(&participant.user_id)
 635                        {
 636                            remote_participant.projects = participant.projects;
 637                            remote_participant.peer_id = peer_id;
 638                            if location != remote_participant.location {
 639                                remote_participant.location = location;
 640                                cx.emit(Event::ParticipantLocationChanged {
 641                                    participant_id: peer_id,
 642                                });
 643                            }
 644                        } else {
 645                            this.remote_participants.insert(
 646                                participant.user_id,
 647                                RemoteParticipant {
 648                                    user: user.clone(),
 649                                    peer_id,
 650                                    projects: participant.projects,
 651                                    location,
 652                                    muted: false,
 653                                    speaking: false,
 654                                    video_tracks: Default::default(),
 655                                    audio_tracks: Default::default(),
 656                                },
 657                            );
 658
 659                            if let Some(live_kit) = this.live_kit.as_ref() {
 660                                let video_tracks =
 661                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 662                                let audio_tracks =
 663                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 664                                for track in video_tracks {
 665                                    this.remote_video_track_updated(
 666                                        RemoteVideoTrackUpdate::Subscribed(track),
 667                                        cx,
 668                                    )
 669                                    .log_err();
 670                                }
 671                                for track in audio_tracks {
 672                                    this.remote_audio_track_updated(
 673                                        RemoteAudioTrackUpdate::Subscribed(track),
 674                                        cx,
 675                                    )
 676                                    .log_err();
 677                                }
 678                            }
 679                        }
 680                    }
 681
 682                    this.remote_participants.retain(|user_id, participant| {
 683                        if this.participant_user_ids.contains(user_id) {
 684                            true
 685                        } else {
 686                            for project in &participant.projects {
 687                                cx.emit(Event::RemoteProjectUnshared {
 688                                    project_id: project.id,
 689                                });
 690                            }
 691                            false
 692                        }
 693                    });
 694                }
 695
 696                if let Some(pending_participants) = pending_participants.log_err() {
 697                    this.pending_participants = pending_participants;
 698                    for participant in &this.pending_participants {
 699                        this.participant_user_ids.insert(participant.id);
 700                    }
 701                }
 702
 703                this.follows_by_leader_id_project_id.clear();
 704                for follower in room.followers {
 705                    let project_id = follower.project_id;
 706                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 707                        (Some(leader), Some(follower)) => (leader, follower),
 708
 709                        _ => {
 710                            log::error!("Follower message {follower:?} missing some state");
 711                            continue;
 712                        }
 713                    };
 714
 715                    let list = this
 716                        .follows_by_leader_id_project_id
 717                        .entry((leader, project_id))
 718                        .or_insert(Vec::new());
 719                    if !list.contains(&follower) {
 720                        list.push(follower);
 721                    }
 722                }
 723
 724                this.pending_room_update.take();
 725                if this.should_leave() {
 726                    log::info!("room is empty, leaving");
 727                    let _ = this.leave(cx);
 728                }
 729
 730                this.check_invariants();
 731                cx.notify();
 732            });
 733        }));
 734
 735        cx.notify();
 736        Ok(())
 737    }
 738
 739    fn remote_video_track_updated(
 740        &mut self,
 741        change: RemoteVideoTrackUpdate,
 742        cx: &mut ModelContext<Self>,
 743    ) -> Result<()> {
 744        match change {
 745            RemoteVideoTrackUpdate::Subscribed(track) => {
 746                let user_id = track.publisher_id().parse()?;
 747                let track_id = track.sid().to_string();
 748                let participant = self
 749                    .remote_participants
 750                    .get_mut(&user_id)
 751                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 752                participant.video_tracks.insert(
 753                    track_id.clone(),
 754                    Arc::new(RemoteVideoTrack {
 755                        live_kit_track: track,
 756                    }),
 757                );
 758                cx.emit(Event::RemoteVideoTracksChanged {
 759                    participant_id: participant.peer_id,
 760                });
 761            }
 762            RemoteVideoTrackUpdate::Unsubscribed {
 763                publisher_id,
 764                track_id,
 765            } => {
 766                let user_id = publisher_id.parse()?;
 767                let participant = self
 768                    .remote_participants
 769                    .get_mut(&user_id)
 770                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 771                participant.video_tracks.remove(&track_id);
 772                cx.emit(Event::RemoteVideoTracksChanged {
 773                    participant_id: participant.peer_id,
 774                });
 775            }
 776        }
 777
 778        cx.notify();
 779        Ok(())
 780    }
 781
 782    fn remote_audio_track_updated(
 783        &mut self,
 784        change: RemoteAudioTrackUpdate,
 785        cx: &mut ModelContext<Self>,
 786    ) -> Result<()> {
 787        match change {
 788            RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => {
 789                let mut speaker_ids = speakers
 790                    .into_iter()
 791                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 792                    .collect::<Vec<u64>>();
 793                speaker_ids.sort_unstable();
 794                for (sid, participant) in &mut self.remote_participants {
 795                    if let Ok(_) = speaker_ids.binary_search(sid) {
 796                        participant.speaking = true;
 797                    } else {
 798                        participant.speaking = false;
 799                    }
 800                }
 801                if let Some(id) = self.client.user_id() {
 802                    if let Some(room) = &mut self.live_kit {
 803                        if let Ok(_) = speaker_ids.binary_search(&id) {
 804                            room.speaking = true;
 805                        } else {
 806                            room.speaking = false;
 807                        }
 808                    }
 809                }
 810                cx.notify();
 811            }
 812            RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => {
 813                for participant in &mut self.remote_participants.values_mut() {
 814                    let mut found = false;
 815                    for track in participant.audio_tracks.values() {
 816                        if track.sid() == track_id {
 817                            found = true;
 818                            break;
 819                        }
 820                    }
 821                    if found {
 822                        participant.muted = muted;
 823                        break;
 824                    }
 825                }
 826                cx.notify();
 827            }
 828            RemoteAudioTrackUpdate::Subscribed(track) => {
 829                let user_id = track.publisher_id().parse()?;
 830                let track_id = track.sid().to_string();
 831                let participant = self
 832                    .remote_participants
 833                    .get_mut(&user_id)
 834                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 835                participant.audio_tracks.insert(track_id.clone(), track);
 836                cx.emit(Event::RemoteAudioTracksChanged {
 837                    participant_id: participant.peer_id,
 838                });
 839            }
 840            RemoteAudioTrackUpdate::Unsubscribed {
 841                publisher_id,
 842                track_id,
 843            } => {
 844                let user_id = publisher_id.parse()?;
 845                let participant = self
 846                    .remote_participants
 847                    .get_mut(&user_id)
 848                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 849                participant.audio_tracks.remove(&track_id);
 850                cx.emit(Event::RemoteAudioTracksChanged {
 851                    participant_id: participant.peer_id,
 852                });
 853            }
 854        }
 855
 856        cx.notify();
 857        Ok(())
 858    }
 859
 860    fn check_invariants(&self) {
 861        #[cfg(any(test, feature = "test-support"))]
 862        {
 863            for participant in self.remote_participants.values() {
 864                assert!(self.participant_user_ids.contains(&participant.user.id));
 865                assert_ne!(participant.user.id, self.client.user_id().unwrap());
 866            }
 867
 868            for participant in &self.pending_participants {
 869                assert!(self.participant_user_ids.contains(&participant.id));
 870                assert_ne!(participant.id, self.client.user_id().unwrap());
 871            }
 872
 873            assert_eq!(
 874                self.participant_user_ids.len(),
 875                self.remote_participants.len() + self.pending_participants.len()
 876            );
 877        }
 878    }
 879
 880    pub(crate) fn call(
 881        &mut self,
 882        called_user_id: u64,
 883        initial_project_id: Option<u64>,
 884        cx: &mut ModelContext<Self>,
 885    ) -> Task<Result<()>> {
 886        if self.status.is_offline() {
 887            return Task::ready(Err(anyhow!("room is offline")));
 888        }
 889
 890        cx.notify();
 891        let client = self.client.clone();
 892        let room_id = self.id;
 893        self.pending_call_count += 1;
 894        cx.spawn(|this, mut cx| async move {
 895            let result = client
 896                .request(proto::Call {
 897                    room_id,
 898                    called_user_id,
 899                    initial_project_id,
 900                })
 901                .await;
 902            this.update(&mut cx, |this, cx| {
 903                this.pending_call_count -= 1;
 904                if this.should_leave() {
 905                    this.leave(cx).detach_and_log_err(cx);
 906                }
 907            });
 908            result?;
 909            Ok(())
 910        })
 911    }
 912
 913    pub fn join_project(
 914        &mut self,
 915        id: u64,
 916        language_registry: Arc<LanguageRegistry>,
 917        fs: Arc<dyn Fs>,
 918        cx: &mut ModelContext<Self>,
 919    ) -> Task<Result<ModelHandle<Project>>> {
 920        let client = self.client.clone();
 921        let user_store = self.user_store.clone();
 922        cx.spawn(|this, mut cx| async move {
 923            let project =
 924                Project::remote(id, client, user_store, language_registry, fs, cx.clone()).await?;
 925            this.update(&mut cx, |this, cx| {
 926                this.joined_projects.retain(|project| {
 927                    if let Some(project) = project.upgrade(cx) {
 928                        !project.read(cx).is_read_only()
 929                    } else {
 930                        false
 931                    }
 932                });
 933                this.joined_projects.insert(project.downgrade());
 934            });
 935            Ok(project)
 936        })
 937    }
 938
 939    pub(crate) fn share_project(
 940        &mut self,
 941        project: ModelHandle<Project>,
 942        cx: &mut ModelContext<Self>,
 943    ) -> Task<Result<u64>> {
 944        if let Some(project_id) = project.read(cx).remote_id() {
 945            return Task::ready(Ok(project_id));
 946        }
 947
 948        let request = self.client.request(proto::ShareProject {
 949            room_id: self.id(),
 950            worktrees: project.read(cx).worktree_metadata_protos(cx),
 951        });
 952        cx.spawn(|this, mut cx| async move {
 953            let response = request.await?;
 954
 955            project.update(&mut cx, |project, cx| {
 956                project.shared(response.project_id, cx)
 957            })?;
 958
 959            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
 960            this.update(&mut cx, |this, cx| {
 961                this.shared_projects.insert(project.downgrade());
 962                let active_project = this.local_participant.active_project.as_ref();
 963                if active_project.map_or(false, |location| *location == project) {
 964                    this.set_location(Some(&project), cx)
 965                } else {
 966                    Task::ready(Ok(()))
 967                }
 968            })
 969            .await?;
 970
 971            Ok(response.project_id)
 972        })
 973    }
 974
 975    pub(crate) fn unshare_project(
 976        &mut self,
 977        project: ModelHandle<Project>,
 978        cx: &mut ModelContext<Self>,
 979    ) -> Result<()> {
 980        let project_id = match project.read(cx).remote_id() {
 981            Some(project_id) => project_id,
 982            None => return Ok(()),
 983        };
 984
 985        self.client.send(proto::UnshareProject { project_id })?;
 986        project.update(cx, |this, cx| this.unshare(cx))
 987    }
 988
 989    pub(crate) fn set_location(
 990        &mut self,
 991        project: Option<&ModelHandle<Project>>,
 992        cx: &mut ModelContext<Self>,
 993    ) -> Task<Result<()>> {
 994        if self.status.is_offline() {
 995            return Task::ready(Err(anyhow!("room is offline")));
 996        }
 997
 998        let client = self.client.clone();
 999        let room_id = self.id;
1000        let location = if let Some(project) = project {
1001            self.local_participant.active_project = Some(project.downgrade());
1002            if let Some(project_id) = project.read(cx).remote_id() {
1003                proto::participant_location::Variant::SharedProject(
1004                    proto::participant_location::SharedProject { id: project_id },
1005                )
1006            } else {
1007                proto::participant_location::Variant::UnsharedProject(
1008                    proto::participant_location::UnsharedProject {},
1009                )
1010            }
1011        } else {
1012            self.local_participant.active_project = None;
1013            proto::participant_location::Variant::External(proto::participant_location::External {})
1014        };
1015
1016        cx.notify();
1017        cx.foreground().spawn(async move {
1018            client
1019                .request(proto::UpdateParticipantLocation {
1020                    room_id,
1021                    location: Some(proto::ParticipantLocation {
1022                        variant: Some(location),
1023                    }),
1024                })
1025                .await?;
1026            Ok(())
1027        })
1028    }
1029
1030    pub fn is_screen_sharing(&self) -> bool {
1031        self.live_kit.as_ref().map_or(false, |live_kit| {
1032            !matches!(live_kit.screen_track, LocalTrack::None)
1033        })
1034    }
1035
1036    pub fn is_sharing_mic(&self) -> bool {
1037        self.live_kit.as_ref().map_or(false, |live_kit| {
1038            !matches!(live_kit.microphone_track, LocalTrack::None)
1039        })
1040    }
1041
1042    pub fn is_muted(&self) -> bool {
1043        self.live_kit
1044            .as_ref()
1045            .and_then(|live_kit| match &live_kit.microphone_track {
1046                LocalTrack::None => None,
1047                LocalTrack::Pending { muted, .. } => Some(*muted),
1048                LocalTrack::Published { muted, .. } => Some(*muted),
1049            })
1050            .unwrap_or(false)
1051    }
1052
1053    pub fn is_speaking(&self) -> bool {
1054        self.live_kit
1055            .as_ref()
1056            .map_or(false, |live_kit| live_kit.speaking)
1057    }
1058
1059    pub fn is_deafened(&self) -> Option<bool> {
1060        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1061    }
1062
1063    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1064        if self.status.is_offline() {
1065            return Task::ready(Err(anyhow!("room is offline")));
1066        } else if self.is_sharing_mic() {
1067            return Task::ready(Err(anyhow!("microphone was already shared")));
1068        }
1069
1070        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1071            let publish_id = post_inc(&mut live_kit.next_publish_id);
1072            live_kit.microphone_track = LocalTrack::Pending {
1073                publish_id,
1074                muted: false,
1075            };
1076            cx.notify();
1077            publish_id
1078        } else {
1079            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1080        };
1081
1082        cx.spawn_weak(|this, mut cx| async move {
1083            let publish_track = async {
1084                let track = LocalAudioTrack::create();
1085                this.upgrade(&cx)
1086                    .ok_or_else(|| anyhow!("room was dropped"))?
1087                    .read_with(&cx, |this, _| {
1088                        this.live_kit
1089                            .as_ref()
1090                            .map(|live_kit| live_kit.room.publish_audio_track(&track))
1091                    })
1092                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1093                    .await
1094            };
1095
1096            let publication = publish_track.await;
1097            this.upgrade(&cx)
1098                .ok_or_else(|| anyhow!("room was dropped"))?
1099                .update(&mut cx, |this, cx| {
1100                    let live_kit = this
1101                        .live_kit
1102                        .as_mut()
1103                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1104
1105                    let (canceled, muted) = if let LocalTrack::Pending {
1106                        publish_id: cur_publish_id,
1107                        muted,
1108                    } = &live_kit.microphone_track
1109                    {
1110                        (*cur_publish_id != publish_id, *muted)
1111                    } else {
1112                        (true, false)
1113                    };
1114
1115                    match publication {
1116                        Ok(publication) => {
1117                            if canceled {
1118                                live_kit.room.unpublish_track(publication);
1119                            } else {
1120                                if muted {
1121                                    cx.background().spawn(publication.set_mute(muted)).detach();
1122                                }
1123                                live_kit.microphone_track = LocalTrack::Published {
1124                                    track_publication: publication,
1125                                    muted,
1126                                };
1127                                cx.notify();
1128                            }
1129                            Ok(())
1130                        }
1131                        Err(error) => {
1132                            if canceled {
1133                                Ok(())
1134                            } else {
1135                                live_kit.microphone_track = LocalTrack::None;
1136                                cx.notify();
1137                                Err(error)
1138                            }
1139                        }
1140                    }
1141                })
1142        })
1143    }
1144
1145    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1146        if self.status.is_offline() {
1147            return Task::ready(Err(anyhow!("room is offline")));
1148        } else if self.is_screen_sharing() {
1149            return Task::ready(Err(anyhow!("screen was already shared")));
1150        }
1151
1152        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1153            let publish_id = post_inc(&mut live_kit.next_publish_id);
1154            live_kit.screen_track = LocalTrack::Pending {
1155                publish_id,
1156                muted: false,
1157            };
1158            cx.notify();
1159            (live_kit.room.display_sources(), publish_id)
1160        } else {
1161            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1162        };
1163
1164        cx.spawn_weak(|this, mut cx| async move {
1165            let publish_track = async {
1166                let displays = displays.await?;
1167                let display = displays
1168                    .first()
1169                    .ok_or_else(|| anyhow!("no display found"))?;
1170                let track = LocalVideoTrack::screen_share_for_display(&display);
1171                this.upgrade(&cx)
1172                    .ok_or_else(|| anyhow!("room was dropped"))?
1173                    .read_with(&cx, |this, _| {
1174                        this.live_kit
1175                            .as_ref()
1176                            .map(|live_kit| live_kit.room.publish_video_track(&track))
1177                    })
1178                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1179                    .await
1180            };
1181
1182            let publication = publish_track.await;
1183            this.upgrade(&cx)
1184                .ok_or_else(|| anyhow!("room was dropped"))?
1185                .update(&mut cx, |this, cx| {
1186                    let live_kit = this
1187                        .live_kit
1188                        .as_mut()
1189                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1190
1191                    let (canceled, muted) = if let LocalTrack::Pending {
1192                        publish_id: cur_publish_id,
1193                        muted,
1194                    } = &live_kit.screen_track
1195                    {
1196                        (*cur_publish_id != publish_id, *muted)
1197                    } else {
1198                        (true, false)
1199                    };
1200
1201                    match publication {
1202                        Ok(publication) => {
1203                            if canceled {
1204                                live_kit.room.unpublish_track(publication);
1205                            } else {
1206                                if muted {
1207                                    cx.background().spawn(publication.set_mute(muted)).detach();
1208                                }
1209                                live_kit.screen_track = LocalTrack::Published {
1210                                    track_publication: publication,
1211                                    muted,
1212                                };
1213                                cx.notify();
1214                            }
1215                            Ok(())
1216                        }
1217                        Err(error) => {
1218                            if canceled {
1219                                Ok(())
1220                            } else {
1221                                live_kit.screen_track = LocalTrack::None;
1222                                cx.notify();
1223                                Err(error)
1224                            }
1225                        }
1226                    }
1227                })
1228        })
1229    }
1230    fn set_mute(
1231        live_kit: &mut LiveKitRoom,
1232        should_mute: bool,
1233        cx: &mut ModelContext<Self>,
1234    ) -> Result<Task<Result<()>>> {
1235        if !should_mute {
1236            // clear user muting state.
1237            live_kit.muted_by_user = false;
1238        }
1239        match &mut live_kit.microphone_track {
1240            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1241            LocalTrack::Pending { muted, .. } => {
1242                *muted = should_mute;
1243                cx.notify();
1244                Ok(Task::Ready(Some(Ok(()))))
1245            }
1246            LocalTrack::Published {
1247                track_publication,
1248                muted,
1249            } => {
1250                *muted = should_mute;
1251                cx.notify();
1252                Ok(cx.background().spawn(track_publication.set_mute(*muted)))
1253            }
1254        }
1255    }
1256    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1257        let should_mute = !self.is_muted();
1258        if let Some(live_kit) = self.live_kit.as_mut() {
1259            let ret = Self::set_mute(live_kit, should_mute, cx);
1260            live_kit.muted_by_user = should_mute;
1261            ret
1262        } else {
1263            Err(anyhow!("LiveKit not started"))
1264        }
1265    }
1266
1267    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1268        if let Some(live_kit) = self.live_kit.as_mut() {
1269            (*live_kit).deafened = !live_kit.deafened;
1270
1271            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1272            // Context notification is sent within set_mute itself.
1273            let mut mute_task = None;
1274            // When deafening, mute user's mic as well.
1275            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1276            if live_kit.deafened || !live_kit.muted_by_user {
1277                mute_task = Some(Self::set_mute(live_kit, live_kit.deafened, cx)?);
1278            };
1279            for participant in self.remote_participants.values() {
1280                for track in live_kit
1281                    .room
1282                    .remote_audio_track_publications(&participant.user.id.to_string())
1283                {
1284                    tasks.push(cx.foreground().spawn(track.set_enabled(!live_kit.deafened)));
1285                }
1286            }
1287
1288            Ok(cx.foreground().spawn(async move {
1289                if let Some(mute_task) = mute_task {
1290                    mute_task.await?;
1291                }
1292                for task in tasks {
1293                    task.await?;
1294                }
1295                Ok(())
1296            }))
1297        } else {
1298            Err(anyhow!("LiveKit not started"))
1299        }
1300    }
1301
1302    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1303        if self.status.is_offline() {
1304            return Err(anyhow!("room is offline"));
1305        }
1306
1307        let live_kit = self
1308            .live_kit
1309            .as_mut()
1310            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1311        match mem::take(&mut live_kit.screen_track) {
1312            LocalTrack::None => Err(anyhow!("screen was not shared")),
1313            LocalTrack::Pending { .. } => {
1314                cx.notify();
1315                Ok(())
1316            }
1317            LocalTrack::Published {
1318                track_publication, ..
1319            } => {
1320                live_kit.room.unpublish_track(track_publication);
1321                cx.notify();
1322                Ok(())
1323            }
1324        }
1325    }
1326
1327    #[cfg(any(test, feature = "test-support"))]
1328    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1329        self.live_kit
1330            .as_ref()
1331            .unwrap()
1332            .room
1333            .set_display_sources(sources);
1334    }
1335}
1336
1337struct LiveKitRoom {
1338    room: Arc<live_kit_client::Room>,
1339    screen_track: LocalTrack,
1340    microphone_track: LocalTrack,
1341    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1342    muted_by_user: bool,
1343    deafened: bool,
1344    speaking: bool,
1345    next_publish_id: usize,
1346    _maintain_room: Task<()>,
1347    _maintain_tracks: [Task<()>; 2],
1348}
1349
1350enum LocalTrack {
1351    None,
1352    Pending {
1353        publish_id: usize,
1354        muted: bool,
1355    },
1356    Published {
1357        track_publication: LocalTrackPublication,
1358        muted: bool,
1359    },
1360}
1361
1362impl Default for LocalTrack {
1363    fn default() -> Self {
1364        Self::None
1365    }
1366}
1367
1368#[derive(Copy, Clone, PartialEq, Eq)]
1369pub enum RoomStatus {
1370    Online,
1371    Rejoining,
1372    Offline,
1373}
1374
1375impl RoomStatus {
1376    pub fn is_offline(&self) -> bool {
1377        matches!(self, RoomStatus::Offline)
1378    }
1379
1380    pub fn is_online(&self) -> bool {
1381        matches!(self, RoomStatus::Online)
1382    }
1383}