room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
  19use postage::{sink::Sink, stream::Stream, watch};
  20use project::Project;
  21use settings::Settings as _;
  22use std::{future::Future, mem, sync::Arc, time::Duration};
  23use util::{post_inc, ResultExt, TryFutureExt};
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27#[derive(Clone, Debug, PartialEq, Eq)]
  28pub enum Event {
  29    RoomJoined {
  30        channel_id: Option<u64>,
  31    },
  32    ParticipantLocationChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteVideoTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteAudioTracksChanged {
  39        participant_id: proto::PeerId,
  40    },
  41    RemoteProjectShared {
  42        owner: Arc<User>,
  43        project_id: u64,
  44        worktree_root_names: Vec<String>,
  45    },
  46    RemoteProjectUnshared {
  47        project_id: u64,
  48    },
  49    RemoteProjectJoined {
  50        project_id: u64,
  51    },
  52    RemoteProjectInvitationDiscarded {
  53        project_id: u64,
  54    },
  55    Left {
  56        channel_id: Option<u64>,
  57    },
  58}
  59
  60pub struct Room {
  61    id: u64,
  62    channel_id: Option<u64>,
  63    live_kit: Option<LiveKitRoom>,
  64    live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
  65    status: RoomStatus,
  66    shared_projects: HashSet<WeakModel<Project>>,
  67    joined_projects: HashSet<WeakModel<Project>>,
  68    local_participant: LocalParticipant,
  69    remote_participants: BTreeMap<u64, RemoteParticipant>,
  70    pending_participants: Vec<Arc<User>>,
  71    participant_user_ids: HashSet<u64>,
  72    pending_call_count: usize,
  73    leave_when_empty: bool,
  74    client: Arc<Client>,
  75    user_store: Model<UserStore>,
  76    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  77    client_subscriptions: Vec<client::Subscription>,
  78    _subscriptions: Vec<gpui::Subscription>,
  79    room_update_completed_tx: watch::Sender<Option<()>>,
  80    room_update_completed_rx: watch::Receiver<Option<()>>,
  81    pending_room_update: Option<Task<()>>,
  82    maintain_connection: Option<Task<Option<()>>>,
  83}
  84
  85impl EventEmitter<Event> for Room {}
  86
  87impl Room {
  88    pub fn channel_id(&self) -> Option<u64> {
  89        self.channel_id
  90    }
  91
  92    pub fn is_sharing_project(&self) -> bool {
  93        !self.shared_projects.is_empty()
  94    }
  95
  96    #[cfg(any(test, feature = "test-support"))]
  97    pub fn is_connected(&self) -> bool {
  98        if let Some(live_kit) = self.live_kit.as_ref() {
  99            matches!(
 100                *live_kit.room.status().borrow(),
 101                live_kit_client::ConnectionState::Connected { .. }
 102            )
 103        } else {
 104            false
 105        }
 106    }
 107
 108    fn new(
 109        id: u64,
 110        channel_id: Option<u64>,
 111        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 112        client: Arc<Client>,
 113        user_store: Model<UserStore>,
 114        cx: &mut ModelContext<Self>,
 115    ) -> Self {
 116        let maintain_connection = cx.spawn({
 117            let client = client.clone();
 118            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 119        });
 120
 121        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 122
 123        let mut this = Self {
 124            id,
 125            channel_id,
 126            live_kit: None,
 127            live_kit_connection_info,
 128            status: RoomStatus::Online,
 129            shared_projects: Default::default(),
 130            joined_projects: Default::default(),
 131            participant_user_ids: Default::default(),
 132            local_participant: Default::default(),
 133            remote_participants: Default::default(),
 134            pending_participants: Default::default(),
 135            pending_call_count: 0,
 136            client_subscriptions: vec![
 137                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 138            ],
 139            _subscriptions: vec![
 140                cx.on_release(Self::released),
 141                cx.on_app_quit(Self::app_will_quit),
 142            ],
 143            leave_when_empty: false,
 144            pending_room_update: None,
 145            client,
 146            user_store,
 147            follows_by_leader_id_project_id: Default::default(),
 148            maintain_connection: Some(maintain_connection),
 149            room_update_completed_tx,
 150            room_update_completed_rx,
 151        };
 152        if this.live_kit_connection_info.is_some() {
 153            this.join_call(cx).detach_and_log_err(cx);
 154        }
 155        this
 156    }
 157
 158    pub(crate) fn create(
 159        called_user_id: u64,
 160        initial_project: Option<Model<Project>>,
 161        client: Arc<Client>,
 162        user_store: Model<UserStore>,
 163        cx: &mut AppContext,
 164    ) -> Task<Result<Model<Self>>> {
 165        cx.spawn(move |mut cx| async move {
 166            let response = client.request(proto::CreateRoom {}).await?;
 167            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 168            let room = cx.new_model(|cx| {
 169                let mut room = Self::new(
 170                    room_proto.id,
 171                    None,
 172                    response.live_kit_connection_info,
 173                    client,
 174                    user_store,
 175                    cx,
 176                );
 177                if let Some(participant) = room_proto.participants.first() {
 178                    room.local_participant.role = participant.role()
 179                }
 180                room
 181            })?;
 182
 183            let initial_project_id = if let Some(initial_project) = initial_project {
 184                let initial_project_id = room
 185                    .update(&mut cx, |room, cx| {
 186                        room.share_project(initial_project.clone(), cx)
 187                    })?
 188                    .await?;
 189                Some(initial_project_id)
 190            } else {
 191                None
 192            };
 193
 194            match room
 195                .update(&mut cx, |room, cx| {
 196                    room.leave_when_empty = true;
 197                    room.call(called_user_id, initial_project_id, cx)
 198                })?
 199                .await
 200            {
 201                Ok(()) => Ok(room),
 202                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 203            }
 204        })
 205    }
 206
 207    pub(crate) async fn join_channel(
 208        channel_id: u64,
 209        client: Arc<Client>,
 210        user_store: Model<UserStore>,
 211        cx: AsyncAppContext,
 212    ) -> Result<Model<Self>> {
 213        Self::from_join_response(
 214            client.request(proto::JoinChannel2 { channel_id }).await?,
 215            client,
 216            user_store,
 217            cx,
 218        )
 219    }
 220
 221    pub(crate) async fn join(
 222        room_id: u64,
 223        client: Arc<Client>,
 224        user_store: Model<UserStore>,
 225        cx: AsyncAppContext,
 226    ) -> Result<Model<Self>> {
 227        Self::from_join_response(
 228            client.request(proto::JoinRoom { id: room_id }).await?,
 229            client,
 230            user_store,
 231            cx,
 232        )
 233    }
 234
 235    fn released(&mut self, cx: &mut AppContext) {
 236        if self.status.is_online() {
 237            self.leave_internal(cx).detach_and_log_err(cx);
 238        }
 239    }
 240
 241    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 242        let task = if self.status.is_online() {
 243            let leave = self.leave_internal(cx);
 244            Some(cx.background_executor().spawn(async move {
 245                leave.await.log_err();
 246            }))
 247        } else {
 248            None
 249        };
 250
 251        async move {
 252            if let Some(task) = task {
 253                task.await;
 254            }
 255        }
 256    }
 257
 258    pub fn mute_on_join(cx: &AppContext) -> bool {
 259        CallSettings::get_global(cx).mute_on_join
 260    }
 261
 262    fn from_join_response(
 263        response: proto::JoinRoomResponse,
 264        client: Arc<Client>,
 265        user_store: Model<UserStore>,
 266        mut cx: AsyncAppContext,
 267    ) -> Result<Model<Self>> {
 268        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 269        let room = cx.new_model(|cx| {
 270            Self::new(
 271                room_proto.id,
 272                response.channel_id,
 273                response.live_kit_connection_info,
 274                client,
 275                user_store,
 276                cx,
 277            )
 278        })?;
 279        room.update(&mut cx, |room, cx| {
 280            room.leave_when_empty = room.channel_id.is_none();
 281            room.apply_room_update(room_proto, cx)?;
 282            anyhow::Ok(())
 283        })??;
 284        Ok(room)
 285    }
 286
 287    fn should_leave(&self) -> bool {
 288        self.leave_when_empty
 289            && self.pending_room_update.is_none()
 290            && self.pending_participants.is_empty()
 291            && self.remote_participants.is_empty()
 292            && self.pending_call_count == 0
 293    }
 294
 295    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 296        cx.notify();
 297        cx.emit(Event::Left {
 298            channel_id: self.channel_id(),
 299        });
 300        self.leave_internal(cx)
 301    }
 302
 303    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 304        if self.status.is_offline() {
 305            return Task::ready(Err(anyhow!("room is offline")));
 306        }
 307
 308        log::info!("leaving room");
 309        if self.live_kit.is_some() {
 310            Audio::play_sound(Sound::Leave, cx);
 311        }
 312
 313        self.clear_state(cx);
 314
 315        let leave_room = self.client.request(proto::LeaveRoom {});
 316        cx.background_executor().spawn(async move {
 317            leave_room.await?;
 318            anyhow::Ok(())
 319        })
 320    }
 321
 322    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 323        for project in self.shared_projects.drain() {
 324            if let Some(project) = project.upgrade() {
 325                project.update(cx, |project, cx| {
 326                    project.unshare(cx).log_err();
 327                });
 328            }
 329        }
 330        for project in self.joined_projects.drain() {
 331            if let Some(project) = project.upgrade() {
 332                project.update(cx, |project, cx| {
 333                    project.disconnected_from_host(cx);
 334                    project.close(cx);
 335                });
 336            }
 337        }
 338
 339        self.status = RoomStatus::Offline;
 340        self.remote_participants.clear();
 341        self.pending_participants.clear();
 342        self.participant_user_ids.clear();
 343        self.client_subscriptions.clear();
 344        self.live_kit.take();
 345        self.pending_room_update.take();
 346        self.maintain_connection.take();
 347    }
 348
 349    async fn maintain_connection(
 350        this: WeakModel<Self>,
 351        client: Arc<Client>,
 352        mut cx: AsyncAppContext,
 353    ) -> Result<()> {
 354        let mut client_status = client.status();
 355        loop {
 356            let _ = client_status.try_recv();
 357            let is_connected = client_status.borrow().is_connected();
 358            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 359            if !is_connected || client_status.next().await.is_some() {
 360                log::info!("detected client disconnection");
 361
 362                this.upgrade()
 363                    .ok_or_else(|| anyhow!("room was dropped"))?
 364                    .update(&mut cx, |this, cx| {
 365                        this.status = RoomStatus::Rejoining;
 366                        cx.notify();
 367                    })?;
 368
 369                // Wait for client to re-establish a connection to the server.
 370                {
 371                    let mut reconnection_timeout =
 372                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 373                    let client_reconnection = async {
 374                        let mut remaining_attempts = 3;
 375                        while remaining_attempts > 0 {
 376                            if client_status.borrow().is_connected() {
 377                                log::info!("client reconnected, attempting to rejoin room");
 378
 379                                let Some(this) = this.upgrade() else { break };
 380                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 381                                    Ok(task) => {
 382                                        if task.await.log_err().is_some() {
 383                                            return true;
 384                                        } else {
 385                                            remaining_attempts -= 1;
 386                                        }
 387                                    }
 388                                    Err(_app_dropped) => return false,
 389                                }
 390                            } else if client_status.borrow().is_signed_out() {
 391                                return false;
 392                            }
 393
 394                            log::info!(
 395                                "waiting for client status change, remaining attempts {}",
 396                                remaining_attempts
 397                            );
 398                            client_status.next().await;
 399                        }
 400                        false
 401                    }
 402                    .fuse();
 403                    futures::pin_mut!(client_reconnection);
 404
 405                    futures::select_biased! {
 406                        reconnected = client_reconnection => {
 407                            if reconnected {
 408                                log::info!("successfully reconnected to room");
 409                                // If we successfully joined the room, go back around the loop
 410                                // waiting for future connection status changes.
 411                                continue;
 412                            }
 413                        }
 414                        _ = reconnection_timeout => {
 415                            log::info!("room reconnection timeout expired");
 416                        }
 417                    }
 418                }
 419
 420                break;
 421            }
 422        }
 423
 424        // The client failed to re-establish a connection to the server
 425        // or an error occurred while trying to re-join the room. Either way
 426        // we leave the room and return an error.
 427        if let Some(this) = this.upgrade() {
 428            log::info!("reconnection failed, leaving room");
 429            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 430        }
 431        Err(anyhow!(
 432            "can't reconnect to room: client failed to re-establish connection"
 433        ))
 434    }
 435
 436    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 437        let mut projects = HashMap::default();
 438        let mut reshared_projects = Vec::new();
 439        let mut rejoined_projects = Vec::new();
 440        self.shared_projects.retain(|project| {
 441            if let Some(handle) = project.upgrade() {
 442                let project = handle.read(cx);
 443                if let Some(project_id) = project.remote_id() {
 444                    projects.insert(project_id, handle.clone());
 445                    reshared_projects.push(proto::UpdateProject {
 446                        project_id,
 447                        worktrees: project.worktree_metadata_protos(cx),
 448                    });
 449                    return true;
 450                }
 451            }
 452            false
 453        });
 454        self.joined_projects.retain(|project| {
 455            if let Some(handle) = project.upgrade() {
 456                let project = handle.read(cx);
 457                if let Some(project_id) = project.remote_id() {
 458                    projects.insert(project_id, handle.clone());
 459                    rejoined_projects.push(proto::RejoinProject {
 460                        id: project_id,
 461                        worktrees: project
 462                            .worktrees()
 463                            .map(|worktree| {
 464                                let worktree = worktree.read(cx);
 465                                proto::RejoinWorktree {
 466                                    id: worktree.id().to_proto(),
 467                                    scan_id: worktree.completed_scan_id() as u64,
 468                                }
 469                            })
 470                            .collect(),
 471                    });
 472                }
 473                return true;
 474            }
 475            false
 476        });
 477
 478        let response = self.client.request_envelope(proto::RejoinRoom {
 479            id: self.id,
 480            reshared_projects,
 481            rejoined_projects,
 482        });
 483
 484        cx.spawn(|this, mut cx| async move {
 485            let response = response.await?;
 486            let message_id = response.message_id;
 487            let response = response.payload;
 488            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 489            this.update(&mut cx, |this, cx| {
 490                this.status = RoomStatus::Online;
 491                this.apply_room_update(room_proto, cx)?;
 492
 493                for reshared_project in response.reshared_projects {
 494                    if let Some(project) = projects.get(&reshared_project.id) {
 495                        project.update(cx, |project, cx| {
 496                            project.reshared(reshared_project, cx).log_err();
 497                        });
 498                    }
 499                }
 500
 501                for rejoined_project in response.rejoined_projects {
 502                    if let Some(project) = projects.get(&rejoined_project.id) {
 503                        project.update(cx, |project, cx| {
 504                            project.rejoined(rejoined_project, message_id, cx).log_err();
 505                        });
 506                    }
 507                }
 508
 509                anyhow::Ok(())
 510            })?
 511        })
 512    }
 513
 514    pub fn id(&self) -> u64 {
 515        self.id
 516    }
 517
 518    pub fn status(&self) -> RoomStatus {
 519        self.status
 520    }
 521
 522    pub fn local_participant(&self) -> &LocalParticipant {
 523        &self.local_participant
 524    }
 525
 526    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 527        &self.remote_participants
 528    }
 529
 530    pub fn call_participants(&self, cx: &AppContext) -> Vec<Arc<User>> {
 531        self.remote_participants()
 532            .values()
 533            .filter_map(|participant| {
 534                if participant.in_call {
 535                    Some(participant.user.clone())
 536                } else {
 537                    None
 538                }
 539            })
 540            .chain(if self.in_call() {
 541                self.user_store.read(cx).current_user()
 542            } else {
 543                None
 544            })
 545            .collect()
 546    }
 547
 548    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 549        self.remote_participants
 550            .values()
 551            .find(|p| p.peer_id == peer_id)
 552    }
 553
 554    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 555        self.remote_participants
 556            .get(&user_id)
 557            .map(|participant| participant.role)
 558    }
 559
 560    pub fn contains_guests(&self) -> bool {
 561        self.local_participant.role == proto::ChannelRole::Guest
 562            || self
 563                .remote_participants
 564                .values()
 565                .any(|p| p.role == proto::ChannelRole::Guest)
 566    }
 567
 568    pub fn local_participant_is_admin(&self) -> bool {
 569        self.local_participant.role == proto::ChannelRole::Admin
 570    }
 571
 572    pub fn set_participant_role(
 573        &mut self,
 574        user_id: u64,
 575        role: proto::ChannelRole,
 576        cx: &ModelContext<Self>,
 577    ) -> Task<Result<()>> {
 578        let client = self.client.clone();
 579        let room_id = self.id;
 580        let role = role.into();
 581        cx.spawn(|_, _| async move {
 582            client
 583                .request(proto::SetRoomParticipantRole {
 584                    room_id,
 585                    user_id,
 586                    role,
 587                })
 588                .await
 589                .map(|_| ())
 590        })
 591    }
 592
 593    pub fn pending_participants(&self) -> &[Arc<User>] {
 594        &self.pending_participants
 595    }
 596
 597    pub fn contains_participant(&self, user_id: u64) -> bool {
 598        self.participant_user_ids.contains(&user_id)
 599    }
 600
 601    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 602        self.follows_by_leader_id_project_id
 603            .get(&(leader_id, project_id))
 604            .map_or(&[], |v| v.as_slice())
 605    }
 606
 607    /// Returns the most 'active' projects, defined as most people in the project
 608    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 609        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 610        for participant in self.remote_participants.values() {
 611            match participant.location {
 612                ParticipantLocation::SharedProject { project_id } => {
 613                    project_hosts_and_guest_counts
 614                        .entry(project_id)
 615                        .or_default()
 616                        .1 += 1;
 617                }
 618                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 619            }
 620            for project in &participant.projects {
 621                project_hosts_and_guest_counts
 622                    .entry(project.id)
 623                    .or_default()
 624                    .0 = Some(participant.user.id);
 625            }
 626        }
 627
 628        if let Some(user) = self.user_store.read(cx).current_user() {
 629            for project in &self.local_participant.projects {
 630                project_hosts_and_guest_counts
 631                    .entry(project.id)
 632                    .or_default()
 633                    .0 = Some(user.id);
 634            }
 635        }
 636
 637        project_hosts_and_guest_counts
 638            .into_iter()
 639            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 640            .max_by_key(|(_, _, guest_count)| *guest_count)
 641            .map(|(id, host, _)| (id, host))
 642    }
 643
 644    async fn handle_room_updated(
 645        this: Model<Self>,
 646        envelope: TypedEnvelope<proto::RoomUpdated>,
 647        _: Arc<Client>,
 648        mut cx: AsyncAppContext,
 649    ) -> Result<()> {
 650        let room = envelope
 651            .payload
 652            .room
 653            .ok_or_else(|| anyhow!("invalid room"))?;
 654        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 655    }
 656
 657    fn apply_room_update(
 658        &mut self,
 659        mut room: proto::Room,
 660        cx: &mut ModelContext<Self>,
 661    ) -> Result<()> {
 662        // Filter ourselves out from the room's participants.
 663        let local_participant_ix = room
 664            .participants
 665            .iter()
 666            .position(|participant| Some(participant.user_id) == self.client.user_id());
 667        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 668
 669        let pending_participant_user_ids = room
 670            .pending_participants
 671            .iter()
 672            .map(|p| p.user_id)
 673            .collect::<Vec<_>>();
 674
 675        let remote_participant_user_ids = room
 676            .participants
 677            .iter()
 678            .map(|p| p.user_id)
 679            .collect::<Vec<_>>();
 680
 681        let (remote_participants, pending_participants) =
 682            self.user_store.update(cx, move |user_store, cx| {
 683                (
 684                    user_store.get_users(remote_participant_user_ids, cx),
 685                    user_store.get_users(pending_participant_user_ids, cx),
 686                )
 687            });
 688
 689        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 690            let (remote_participants, pending_participants) =
 691                futures::join!(remote_participants, pending_participants);
 692
 693            this.update(&mut cx, |this, cx| {
 694                this.participant_user_ids.clear();
 695
 696                if let Some(participant) = local_participant {
 697                    let role = participant.role();
 698                    this.local_participant.projects = participant.projects;
 699                    if this.local_participant.role != role {
 700                        this.local_participant.role = role;
 701
 702                        if role == proto::ChannelRole::Guest {
 703                            for project in mem::take(&mut this.shared_projects) {
 704                                if let Some(project) = project.upgrade() {
 705                                    this.unshare_project(project, cx).log_err();
 706                                }
 707                            }
 708                            this.local_participant.projects.clear();
 709                            if let Some(live_kit_room) = &mut this.live_kit {
 710                                live_kit_room.stop_publishing(cx);
 711                            }
 712                        }
 713
 714                        this.joined_projects.retain(|project| {
 715                            if let Some(project) = project.upgrade() {
 716                                project.update(cx, |project, cx| project.set_role(role, cx));
 717                                true
 718                            } else {
 719                                false
 720                            }
 721                        });
 722                    }
 723                } else {
 724                    this.local_participant.projects.clear();
 725                }
 726
 727                if let Some(participants) = remote_participants.log_err() {
 728                    for (participant, user) in room.participants.into_iter().zip(participants) {
 729                        let Some(peer_id) = participant.peer_id else {
 730                            continue;
 731                        };
 732                        let participant_index = ParticipantIndex(participant.participant_index);
 733                        this.participant_user_ids.insert(participant.user_id);
 734
 735                        let old_projects = this
 736                            .remote_participants
 737                            .get(&participant.user_id)
 738                            .into_iter()
 739                            .flat_map(|existing| &existing.projects)
 740                            .map(|project| project.id)
 741                            .collect::<HashSet<_>>();
 742                        let new_projects = participant
 743                            .projects
 744                            .iter()
 745                            .map(|project| project.id)
 746                            .collect::<HashSet<_>>();
 747
 748                        for project in &participant.projects {
 749                            if !old_projects.contains(&project.id) {
 750                                cx.emit(Event::RemoteProjectShared {
 751                                    owner: user.clone(),
 752                                    project_id: project.id,
 753                                    worktree_root_names: project.worktree_root_names.clone(),
 754                                });
 755                            }
 756                        }
 757
 758                        for unshared_project_id in old_projects.difference(&new_projects) {
 759                            this.joined_projects.retain(|project| {
 760                                if let Some(project) = project.upgrade() {
 761                                    project.update(cx, |project, cx| {
 762                                        if project.remote_id() == Some(*unshared_project_id) {
 763                                            project.disconnected_from_host(cx);
 764                                            false
 765                                        } else {
 766                                            true
 767                                        }
 768                                    })
 769                                } else {
 770                                    false
 771                                }
 772                            });
 773                            cx.emit(Event::RemoteProjectUnshared {
 774                                project_id: *unshared_project_id,
 775                            });
 776                        }
 777
 778                        let role = participant.role();
 779                        let in_call = participant.in_call;
 780                        let location = ParticipantLocation::from_proto(participant.location)
 781                            .unwrap_or(ParticipantLocation::External);
 782                        if let Some(remote_participant) =
 783                            this.remote_participants.get_mut(&participant.user_id)
 784                        {
 785                            remote_participant.peer_id = peer_id;
 786                            remote_participant.projects = participant.projects;
 787                            remote_participant.participant_index = participant_index;
 788                            if location != remote_participant.location
 789                                || role != remote_participant.role
 790                                || in_call != remote_participant.in_call
 791                            {
 792                                if in_call && !remote_participant.in_call {
 793                                    Audio::play_sound(Sound::Joined, cx);
 794                                }
 795                                remote_participant.location = location;
 796                                remote_participant.role = role;
 797                                remote_participant.in_call = participant.in_call;
 798
 799                                cx.emit(Event::ParticipantLocationChanged {
 800                                    participant_id: peer_id,
 801                                });
 802                            }
 803                        } else {
 804                            this.remote_participants.insert(
 805                                participant.user_id,
 806                                RemoteParticipant {
 807                                    user: user.clone(),
 808                                    participant_index,
 809                                    peer_id,
 810                                    projects: participant.projects,
 811                                    location,
 812                                    role,
 813                                    muted: true,
 814                                    speaking: false,
 815                                    in_call: participant.in_call,
 816                                    video_tracks: Default::default(),
 817                                    audio_tracks: Default::default(),
 818                                },
 819                            );
 820
 821                            if participant.in_call {
 822                                Audio::play_sound(Sound::Joined, cx);
 823                            }
 824
 825                            if let Some(live_kit) = this.live_kit.as_ref() {
 826                                let video_tracks =
 827                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 828                                let audio_tracks =
 829                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 830                                let publications = live_kit
 831                                    .room
 832                                    .remote_audio_track_publications(&user.id.to_string());
 833
 834                                for track in video_tracks {
 835                                    this.live_kit_room_updated(
 836                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
 837                                        cx,
 838                                    )
 839                                    .log_err();
 840                                }
 841
 842                                for (track, publication) in
 843                                    audio_tracks.iter().zip(publications.iter())
 844                                {
 845                                    this.live_kit_room_updated(
 846                                        RoomUpdate::SubscribedToRemoteAudioTrack(
 847                                            track.clone(),
 848                                            publication.clone(),
 849                                        ),
 850                                        cx,
 851                                    )
 852                                    .log_err();
 853                                }
 854                            }
 855                        }
 856                    }
 857
 858                    this.remote_participants.retain(|user_id, participant| {
 859                        if this.participant_user_ids.contains(user_id) {
 860                            true
 861                        } else {
 862                            for project in &participant.projects {
 863                                cx.emit(Event::RemoteProjectUnshared {
 864                                    project_id: project.id,
 865                                });
 866                            }
 867                            false
 868                        }
 869                    });
 870                }
 871
 872                if let Some(pending_participants) = pending_participants.log_err() {
 873                    this.pending_participants = pending_participants;
 874                    for participant in &this.pending_participants {
 875                        this.participant_user_ids.insert(participant.id);
 876                    }
 877                }
 878
 879                this.follows_by_leader_id_project_id.clear();
 880                for follower in room.followers {
 881                    let project_id = follower.project_id;
 882                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 883                        (Some(leader), Some(follower)) => (leader, follower),
 884
 885                        _ => {
 886                            log::error!("Follower message {follower:?} missing some state");
 887                            continue;
 888                        }
 889                    };
 890
 891                    let list = this
 892                        .follows_by_leader_id_project_id
 893                        .entry((leader, project_id))
 894                        .or_insert(Vec::new());
 895                    if !list.contains(&follower) {
 896                        list.push(follower);
 897                    }
 898                }
 899
 900                this.pending_room_update.take();
 901                if this.should_leave() {
 902                    log::info!("room is empty, leaving");
 903                    let _ = this.leave(cx);
 904                }
 905
 906                this.user_store.update(cx, |user_store, cx| {
 907                    let participant_indices_by_user_id = this
 908                        .remote_participants
 909                        .iter()
 910                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 911                        .collect();
 912                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 913                });
 914
 915                this.check_invariants();
 916                this.room_update_completed_tx.try_send(Some(())).ok();
 917                cx.notify();
 918            })
 919            .ok();
 920        }));
 921
 922        cx.notify();
 923        Ok(())
 924    }
 925
 926    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 927        let mut done_rx = self.room_update_completed_rx.clone();
 928        async move {
 929            while let Some(result) = done_rx.next().await {
 930                if result.is_some() {
 931                    break;
 932                }
 933            }
 934        }
 935    }
 936
 937    fn live_kit_room_updated(
 938        &mut self,
 939        update: RoomUpdate,
 940        cx: &mut ModelContext<Self>,
 941    ) -> Result<()> {
 942        match update {
 943            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
 944                let user_id = track.publisher_id().parse()?;
 945                let track_id = track.sid().to_string();
 946                let participant = self
 947                    .remote_participants
 948                    .get_mut(&user_id)
 949                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 950                participant.video_tracks.insert(track_id.clone(), track);
 951                cx.emit(Event::RemoteVideoTracksChanged {
 952                    participant_id: participant.peer_id,
 953                });
 954            }
 955
 956            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
 957                publisher_id,
 958                track_id,
 959            } => {
 960                let user_id = publisher_id.parse()?;
 961                let participant = self
 962                    .remote_participants
 963                    .get_mut(&user_id)
 964                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 965                participant.video_tracks.remove(&track_id);
 966                cx.emit(Event::RemoteVideoTracksChanged {
 967                    participant_id: participant.peer_id,
 968                });
 969            }
 970
 971            RoomUpdate::ActiveSpeakersChanged { speakers } => {
 972                let mut speaker_ids = speakers
 973                    .into_iter()
 974                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 975                    .collect::<Vec<u64>>();
 976                speaker_ids.sort_unstable();
 977                for (sid, participant) in &mut self.remote_participants {
 978                    if let Ok(_) = speaker_ids.binary_search(sid) {
 979                        participant.speaking = true;
 980                    } else {
 981                        participant.speaking = false;
 982                    }
 983                }
 984                if let Some(id) = self.client.user_id() {
 985                    if let Some(room) = &mut self.live_kit {
 986                        if let Ok(_) = speaker_ids.binary_search(&id) {
 987                            room.speaking = true;
 988                        } else {
 989                            room.speaking = false;
 990                        }
 991                    }
 992                }
 993            }
 994
 995            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
 996                let mut found = false;
 997                for participant in &mut self.remote_participants.values_mut() {
 998                    for track in participant.audio_tracks.values() {
 999                        if track.sid() == track_id {
1000                            found = true;
1001                            break;
1002                        }
1003                    }
1004                    if found {
1005                        participant.muted = muted;
1006                        break;
1007                    }
1008                }
1009            }
1010
1011            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1012                let user_id = track.publisher_id().parse()?;
1013                let track_id = track.sid().to_string();
1014                let participant = self
1015                    .remote_participants
1016                    .get_mut(&user_id)
1017                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1018                participant.audio_tracks.insert(track_id.clone(), track);
1019                participant.muted = publication.is_muted();
1020
1021                cx.emit(Event::RemoteAudioTracksChanged {
1022                    participant_id: participant.peer_id,
1023                });
1024            }
1025
1026            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1027                publisher_id,
1028                track_id,
1029            } => {
1030                let user_id = publisher_id.parse()?;
1031                let participant = self
1032                    .remote_participants
1033                    .get_mut(&user_id)
1034                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1035                participant.audio_tracks.remove(&track_id);
1036                cx.emit(Event::RemoteAudioTracksChanged {
1037                    participant_id: participant.peer_id,
1038                });
1039            }
1040
1041            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1042                log::info!("unpublished audio track {}", publication.sid());
1043                if let Some(room) = &mut self.live_kit {
1044                    room.microphone_track = LocalTrack::None;
1045                }
1046            }
1047
1048            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1049                log::info!("unpublished video track {}", publication.sid());
1050                if let Some(room) = &mut self.live_kit {
1051                    room.screen_track = LocalTrack::None;
1052                }
1053            }
1054
1055            RoomUpdate::LocalAudioTrackPublished { publication } => {
1056                log::info!("published audio track {}", publication.sid());
1057            }
1058
1059            RoomUpdate::LocalVideoTrackPublished { publication } => {
1060                log::info!("published video track {}", publication.sid());
1061            }
1062        }
1063
1064        cx.notify();
1065        Ok(())
1066    }
1067
1068    fn check_invariants(&self) {
1069        #[cfg(any(test, feature = "test-support"))]
1070        {
1071            for participant in self.remote_participants.values() {
1072                assert!(self.participant_user_ids.contains(&participant.user.id));
1073                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1074            }
1075
1076            for participant in &self.pending_participants {
1077                assert!(self.participant_user_ids.contains(&participant.id));
1078                assert_ne!(participant.id, self.client.user_id().unwrap());
1079            }
1080
1081            assert_eq!(
1082                self.participant_user_ids.len(),
1083                self.remote_participants.len() + self.pending_participants.len()
1084            );
1085        }
1086    }
1087
1088    pub(crate) fn call(
1089        &mut self,
1090        called_user_id: u64,
1091        initial_project_id: Option<u64>,
1092        cx: &mut ModelContext<Self>,
1093    ) -> Task<Result<()>> {
1094        if self.status.is_offline() {
1095            return Task::ready(Err(anyhow!("room is offline")));
1096        }
1097
1098        cx.notify();
1099        let client = self.client.clone();
1100        let room_id = self.id;
1101        self.pending_call_count += 1;
1102        cx.spawn(move |this, mut cx| async move {
1103            let result = client
1104                .request(proto::Call {
1105                    room_id,
1106                    called_user_id,
1107                    initial_project_id,
1108                })
1109                .await;
1110            this.update(&mut cx, |this, cx| {
1111                this.pending_call_count -= 1;
1112                if this.should_leave() {
1113                    this.leave(cx).detach_and_log_err(cx);
1114                }
1115            })?;
1116            result?;
1117            Ok(())
1118        })
1119    }
1120
1121    pub fn join_project(
1122        &mut self,
1123        id: u64,
1124        language_registry: Arc<LanguageRegistry>,
1125        fs: Arc<dyn Fs>,
1126        cx: &mut ModelContext<Self>,
1127    ) -> Task<Result<Model<Project>>> {
1128        let client = self.client.clone();
1129        let user_store = self.user_store.clone();
1130        let role = self.local_participant.role;
1131        cx.emit(Event::RemoteProjectJoined { project_id: id });
1132        cx.spawn(move |this, mut cx| async move {
1133            let project = Project::remote(
1134                id,
1135                client,
1136                user_store,
1137                language_registry,
1138                fs,
1139                role,
1140                cx.clone(),
1141            )
1142            .await?;
1143
1144            this.update(&mut cx, |this, cx| {
1145                this.joined_projects.retain(|project| {
1146                    if let Some(project) = project.upgrade() {
1147                        !project.read(cx).is_disconnected()
1148                    } else {
1149                        false
1150                    }
1151                });
1152                this.joined_projects.insert(project.downgrade());
1153            })?;
1154            Ok(project)
1155        })
1156    }
1157
1158    pub(crate) fn share_project(
1159        &mut self,
1160        project: Model<Project>,
1161        cx: &mut ModelContext<Self>,
1162    ) -> Task<Result<u64>> {
1163        if let Some(project_id) = project.read(cx).remote_id() {
1164            return Task::ready(Ok(project_id));
1165        }
1166
1167        let request = self.client.request(proto::ShareProject {
1168            room_id: self.id(),
1169            worktrees: project.read(cx).worktree_metadata_protos(cx),
1170        });
1171        cx.spawn(|this, mut cx| async move {
1172            let response = request.await?;
1173
1174            project.update(&mut cx, |project, cx| {
1175                project.shared(response.project_id, cx)
1176            })??;
1177
1178            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1179            this.update(&mut cx, |this, cx| {
1180                this.shared_projects.insert(project.downgrade());
1181                let active_project = this.local_participant.active_project.as_ref();
1182                if active_project.map_or(false, |location| *location == project) {
1183                    this.set_location(Some(&project), cx)
1184                } else {
1185                    Task::ready(Ok(()))
1186                }
1187            })?
1188            .await?;
1189
1190            Ok(response.project_id)
1191        })
1192    }
1193
1194    pub(crate) fn unshare_project(
1195        &mut self,
1196        project: Model<Project>,
1197        cx: &mut ModelContext<Self>,
1198    ) -> Result<()> {
1199        let project_id = match project.read(cx).remote_id() {
1200            Some(project_id) => project_id,
1201            None => return Ok(()),
1202        };
1203
1204        self.client.send(proto::UnshareProject { project_id })?;
1205        project.update(cx, |this, cx| this.unshare(cx))?;
1206
1207        if self.local_participant.active_project == Some(project.downgrade()) {
1208            self.set_location(Some(&project), cx).detach_and_log_err(cx);
1209        }
1210        Ok(())
1211    }
1212
1213    pub(crate) fn set_location(
1214        &mut self,
1215        project: Option<&Model<Project>>,
1216        cx: &mut ModelContext<Self>,
1217    ) -> Task<Result<()>> {
1218        if self.status.is_offline() {
1219            return Task::ready(Err(anyhow!("room is offline")));
1220        }
1221
1222        let client = self.client.clone();
1223        let room_id = self.id;
1224        let location = if let Some(project) = project {
1225            self.local_participant.active_project = Some(project.downgrade());
1226            if let Some(project_id) = project.read(cx).remote_id() {
1227                proto::participant_location::Variant::SharedProject(
1228                    proto::participant_location::SharedProject { id: project_id },
1229                )
1230            } else {
1231                proto::participant_location::Variant::UnsharedProject(
1232                    proto::participant_location::UnsharedProject {},
1233                )
1234            }
1235        } else {
1236            self.local_participant.active_project = None;
1237            proto::participant_location::Variant::External(proto::participant_location::External {})
1238        };
1239
1240        cx.notify();
1241        cx.background_executor().spawn(async move {
1242            client
1243                .request(proto::UpdateParticipantLocation {
1244                    room_id,
1245                    location: Some(proto::ParticipantLocation {
1246                        variant: Some(location),
1247                    }),
1248                })
1249                .await?;
1250            Ok(())
1251        })
1252    }
1253
1254    pub fn is_screen_sharing(&self) -> bool {
1255        self.live_kit.as_ref().map_or(false, |live_kit| {
1256            !matches!(live_kit.screen_track, LocalTrack::None)
1257        })
1258    }
1259
1260    pub fn is_muted(&self) -> bool {
1261        self.live_kit
1262            .as_ref()
1263            .map_or(true, |live_kit| match &live_kit.microphone_track {
1264                LocalTrack::None => true,
1265                LocalTrack::Pending { .. } => true,
1266                LocalTrack::Published { track_publication } => track_publication.is_muted(),
1267            })
1268    }
1269
1270    pub fn read_only(&self) -> bool {
1271        !(self.local_participant().role == proto::ChannelRole::Member
1272            || self.local_participant().role == proto::ChannelRole::Admin)
1273    }
1274
1275    pub fn is_speaking(&self) -> bool {
1276        self.live_kit
1277            .as_ref()
1278            .map_or(false, |live_kit| live_kit.speaking)
1279    }
1280
1281    pub fn in_call(&self) -> bool {
1282        self.live_kit.is_some()
1283    }
1284
1285    #[track_caller]
1286    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1287        if self.status.is_offline() {
1288            return Task::ready(Err(anyhow!("room is offline")));
1289        }
1290
1291        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1292            let publish_id = post_inc(&mut live_kit.next_publish_id);
1293            live_kit.microphone_track = LocalTrack::Pending { publish_id };
1294            cx.notify();
1295            publish_id
1296        } else {
1297            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1298        };
1299
1300        cx.spawn(move |this, mut cx| async move {
1301            let publish_track = async {
1302                let track = LocalAudioTrack::create();
1303                this.upgrade()
1304                    .ok_or_else(|| anyhow!("room was dropped"))?
1305                    .update(&mut cx, |this, _| {
1306                        this.live_kit
1307                            .as_ref()
1308                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1309                    })?
1310                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1311                    .await
1312            };
1313            let publication = publish_track.await;
1314            this.upgrade()
1315                .ok_or_else(|| anyhow!("room was dropped"))?
1316                .update(&mut cx, |this, cx| {
1317                    let live_kit = this
1318                        .live_kit
1319                        .as_mut()
1320                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1321
1322                    let canceled = if let LocalTrack::Pending {
1323                        publish_id: cur_publish_id,
1324                    } = &live_kit.microphone_track
1325                    {
1326                        *cur_publish_id != publish_id
1327                    } else {
1328                        true
1329                    };
1330
1331                    match publication {
1332                        Ok(publication) => {
1333                            if canceled {
1334                                live_kit.room.unpublish_track(publication);
1335                                live_kit.microphone_track = LocalTrack::None;
1336                            } else {
1337                                live_kit.microphone_track = LocalTrack::Published {
1338                                    track_publication: publication,
1339                                };
1340                                cx.notify();
1341                            }
1342                            Ok(())
1343                        }
1344                        Err(error) => {
1345                            if canceled {
1346                                Ok(())
1347                            } else {
1348                                live_kit.microphone_track = LocalTrack::None;
1349                                cx.notify();
1350                                Err(error)
1351                            }
1352                        }
1353                    }
1354                })?
1355        })
1356    }
1357
1358    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1359        if self.status.is_offline() {
1360            return Task::ready(Err(anyhow!("room is offline")));
1361        } else if self.is_screen_sharing() {
1362            return Task::ready(Err(anyhow!("screen was already shared")));
1363        }
1364
1365        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1366            let publish_id = post_inc(&mut live_kit.next_publish_id);
1367            live_kit.screen_track = LocalTrack::Pending { publish_id };
1368            cx.notify();
1369            (live_kit.room.display_sources(), publish_id)
1370        } else {
1371            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1372        };
1373
1374        cx.spawn(move |this, mut cx| async move {
1375            let publish_track = async {
1376                let displays = displays.await?;
1377                let display = displays
1378                    .first()
1379                    .ok_or_else(|| anyhow!("no display found"))?;
1380                let track = LocalVideoTrack::screen_share_for_display(display);
1381                this.upgrade()
1382                    .ok_or_else(|| anyhow!("room was dropped"))?
1383                    .update(&mut cx, |this, _| {
1384                        this.live_kit
1385                            .as_ref()
1386                            .map(|live_kit| live_kit.room.publish_video_track(track))
1387                    })?
1388                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1389                    .await
1390            };
1391
1392            let publication = publish_track.await;
1393            this.upgrade()
1394                .ok_or_else(|| anyhow!("room was dropped"))?
1395                .update(&mut cx, |this, cx| {
1396                    let live_kit = this
1397                        .live_kit
1398                        .as_mut()
1399                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1400
1401                    let canceled = if let LocalTrack::Pending {
1402                        publish_id: cur_publish_id,
1403                    } = &live_kit.screen_track
1404                    {
1405                        *cur_publish_id != publish_id
1406                    } else {
1407                        true
1408                    };
1409
1410                    match publication {
1411                        Ok(publication) => {
1412                            if canceled {
1413                                live_kit.room.unpublish_track(publication);
1414                            } else {
1415                                live_kit.screen_track = LocalTrack::Published {
1416                                    track_publication: publication,
1417                                };
1418                                cx.notify();
1419                            }
1420
1421                            Audio::play_sound(Sound::StartScreenshare, cx);
1422
1423                            Ok(())
1424                        }
1425                        Err(error) => {
1426                            if canceled {
1427                                Ok(())
1428                            } else {
1429                                live_kit.screen_track = LocalTrack::None;
1430                                cx.notify();
1431                                Err(error)
1432                            }
1433                        }
1434                    }
1435                })?
1436        })
1437    }
1438
1439    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) {
1440        let muted = !self.is_muted();
1441        if let Some(task) = self.set_mute(muted, cx) {
1442            task.detach_and_log_err(cx);
1443        }
1444    }
1445
1446    pub fn join_call(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1447        if self.live_kit.is_some() {
1448            return Task::ready(Ok(()));
1449        }
1450
1451        let room = live_kit_client::Room::new();
1452        let mut status = room.status();
1453        // Consume the initial status of the room.
1454        let _ = status.try_recv();
1455        let _maintain_room = cx.spawn(|this, mut cx| async move {
1456            while let Some(status) = status.next().await {
1457                let this = if let Some(this) = this.upgrade() {
1458                    this
1459                } else {
1460                    break;
1461                };
1462
1463                if status == live_kit_client::ConnectionState::Disconnected {
1464                    this.update(&mut cx, |this, cx| this.leave(cx).log_err())
1465                        .ok();
1466                    break;
1467                }
1468            }
1469        });
1470
1471        let _handle_updates = cx.spawn({
1472            let room = room.clone();
1473            move |this, mut cx| async move {
1474                let mut updates = room.updates();
1475                while let Some(update) = updates.next().await {
1476                    let this = if let Some(this) = this.upgrade() {
1477                        this
1478                    } else {
1479                        break;
1480                    };
1481
1482                    this.update(&mut cx, |this, cx| {
1483                        this.live_kit_room_updated(update, cx).log_err()
1484                    })
1485                    .ok();
1486                }
1487            }
1488        });
1489
1490        self.live_kit = Some(LiveKitRoom {
1491            room: room.clone(),
1492            screen_track: LocalTrack::None,
1493            microphone_track: LocalTrack::None,
1494            next_publish_id: 0,
1495            speaking: false,
1496            _maintain_room,
1497            _handle_updates,
1498        });
1499
1500        cx.spawn({
1501            let client = self.client.clone();
1502            let share_microphone = !self.read_only() && !Self::mute_on_join(cx);
1503            let connection_info = self.live_kit_connection_info.clone();
1504            let channel_id = self.channel_id;
1505
1506            move |this, mut cx| async move {
1507                let connection_info = if let Some(connection_info) = connection_info {
1508                    connection_info.clone()
1509                } else if let Some(channel_id) = channel_id {
1510                    if let Some(connection_info) = client
1511                        .request(proto::JoinChannelCall { channel_id })
1512                        .await?
1513                        .live_kit_connection_info
1514                    {
1515                        connection_info
1516                    } else {
1517                        return Err(anyhow!("failed to get connection info from server"));
1518                    }
1519                } else {
1520                    return Err(anyhow!(
1521                        "tried to connect to livekit without connection info"
1522                    ));
1523                };
1524                room.connect(&connection_info.server_url, &connection_info.token)
1525                    .await?;
1526
1527                let track_updates = this.update(&mut cx, |this, cx| {
1528                    Audio::play_sound(Sound::Joined, cx);
1529                    let Some(live_kit) = this.live_kit.as_mut() else {
1530                        return vec![];
1531                    };
1532
1533                    let mut track_updates = Vec::new();
1534                    for participant in this.remote_participants.values() {
1535                        for publication in live_kit
1536                            .room
1537                            .remote_audio_track_publications(&participant.user.id.to_string())
1538                        {
1539                            track_updates.push(publication.set_enabled(true));
1540                        }
1541
1542                        for track in participant.audio_tracks.values() {
1543                            track.start();
1544                        }
1545                    }
1546                    track_updates
1547                })?;
1548
1549                if share_microphone {
1550                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
1551                        .await?
1552                };
1553
1554                for result in futures::future::join_all(track_updates).await {
1555                    result?;
1556                }
1557                anyhow::Ok(())
1558            }
1559        })
1560    }
1561
1562    pub fn leave_call(&mut self, cx: &mut ModelContext<Self>) {
1563        Audio::play_sound(Sound::Leave, cx);
1564        if let Some(channel_id) = self.channel_id() {
1565            let client = self.client.clone();
1566            cx.background_executor()
1567                .spawn(client.request(proto::LeaveChannelCall { channel_id }))
1568                .detach_and_log_err(cx);
1569            self.live_kit.take();
1570            self.live_kit_connection_info.take();
1571            cx.notify();
1572        } else {
1573            self.leave(cx).detach_and_log_err(cx)
1574        }
1575    }
1576
1577    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1578        if self.status.is_offline() {
1579            return Err(anyhow!("room is offline"));
1580        }
1581
1582        let live_kit = self
1583            .live_kit
1584            .as_mut()
1585            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1586        match mem::take(&mut live_kit.screen_track) {
1587            LocalTrack::None => Err(anyhow!("screen was not shared")),
1588            LocalTrack::Pending { .. } => {
1589                cx.notify();
1590                Ok(())
1591            }
1592            LocalTrack::Published {
1593                track_publication, ..
1594            } => {
1595                live_kit.room.unpublish_track(track_publication);
1596                cx.notify();
1597
1598                Audio::play_sound(Sound::StopScreenshare, cx);
1599                Ok(())
1600            }
1601        }
1602    }
1603
1604    fn set_mute(
1605        &mut self,
1606        should_mute: bool,
1607        cx: &mut ModelContext<Room>,
1608    ) -> Option<Task<Result<()>>> {
1609        let live_kit = self.live_kit.as_mut()?;
1610        cx.notify();
1611
1612        if should_mute {
1613            Audio::play_sound(Sound::Mute, cx);
1614        } else {
1615            Audio::play_sound(Sound::Unmute, cx);
1616        }
1617
1618        match &mut live_kit.microphone_track {
1619            LocalTrack::None => {
1620                if should_mute {
1621                    None
1622                } else {
1623                    Some(self.share_microphone(cx))
1624                }
1625            }
1626            LocalTrack::Pending { .. } => None,
1627            LocalTrack::Published { track_publication } => Some(
1628                cx.foreground_executor()
1629                    .spawn(track_publication.set_mute(should_mute)),
1630            ),
1631        }
1632    }
1633
1634    #[cfg(any(test, feature = "test-support"))]
1635    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1636        self.live_kit
1637            .as_ref()
1638            .unwrap()
1639            .room
1640            .set_display_sources(sources);
1641    }
1642}
1643
1644struct LiveKitRoom {
1645    room: Arc<live_kit_client::Room>,
1646    screen_track: LocalTrack,
1647    microphone_track: LocalTrack,
1648    speaking: bool,
1649    next_publish_id: usize,
1650    _maintain_room: Task<()>,
1651    _handle_updates: Task<()>,
1652}
1653
1654impl LiveKitRoom {
1655    fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1656        if let LocalTrack::Published {
1657            track_publication, ..
1658        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1659        {
1660            self.room.unpublish_track(track_publication);
1661            cx.notify();
1662        }
1663
1664        if let LocalTrack::Published {
1665            track_publication, ..
1666        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1667        {
1668            self.room.unpublish_track(track_publication);
1669            cx.notify();
1670        }
1671    }
1672}
1673
1674enum LocalTrack {
1675    None,
1676    Pending {
1677        publish_id: usize,
1678    },
1679    Published {
1680        track_publication: LocalTrackPublication,
1681    },
1682}
1683
1684impl Default for LocalTrack {
1685    fn default() -> Self {
1686        Self::None
1687    }
1688}
1689
1690#[derive(Copy, Clone, PartialEq, Eq)]
1691pub enum RoomStatus {
1692    Online,
1693    Rejoining,
1694    Offline,
1695}
1696
1697impl RoomStatus {
1698    pub fn is_offline(&self) -> bool {
1699        matches!(self, RoomStatus::Offline)
1700    }
1701
1702    pub fn is_online(&self) -> bool {
1703        matches!(self, RoomStatus::Online)
1704    }
1705}