room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
  19use postage::{sink::Sink, stream::Stream, watch};
  20use project::Project;
  21use settings::Settings as _;
  22use std::{future::Future, mem, sync::Arc, time::Duration};
  23use util::{post_inc, ResultExt, TryFutureExt};
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27#[derive(Clone, Debug, PartialEq, Eq)]
  28pub enum Event {
  29    ParticipantLocationChanged {
  30        participant_id: proto::PeerId,
  31    },
  32    RemoteVideoTracksChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteAudioTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteProjectShared {
  39        owner: Arc<User>,
  40        project_id: u64,
  41        worktree_root_names: Vec<String>,
  42    },
  43    RemoteProjectUnshared {
  44        project_id: u64,
  45    },
  46    RemoteProjectJoined {
  47        project_id: u64,
  48    },
  49    RemoteProjectInvitationDiscarded {
  50        project_id: u64,
  51    },
  52    Left,
  53}
  54
  55pub struct Room {
  56    id: u64,
  57    channel_id: Option<u64>,
  58    live_kit: Option<LiveKitRoom>,
  59    status: RoomStatus,
  60    shared_projects: HashSet<WeakModel<Project>>,
  61    joined_projects: HashSet<WeakModel<Project>>,
  62    local_participant: LocalParticipant,
  63    remote_participants: BTreeMap<u64, RemoteParticipant>,
  64    pending_participants: Vec<Arc<User>>,
  65    participant_user_ids: HashSet<u64>,
  66    pending_call_count: usize,
  67    leave_when_empty: bool,
  68    client: Arc<Client>,
  69    user_store: Model<UserStore>,
  70    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  71    client_subscriptions: Vec<client::Subscription>,
  72    _subscriptions: Vec<gpui::Subscription>,
  73    room_update_completed_tx: watch::Sender<Option<()>>,
  74    room_update_completed_rx: watch::Receiver<Option<()>>,
  75    pending_room_update: Option<Task<()>>,
  76    maintain_connection: Option<Task<Option<()>>>,
  77}
  78
  79impl EventEmitter<Event> for Room {}
  80
  81impl Room {
  82    pub fn channel_id(&self) -> Option<u64> {
  83        self.channel_id
  84    }
  85
  86    pub fn is_sharing_project(&self) -> bool {
  87        !self.shared_projects.is_empty()
  88    }
  89
  90    #[cfg(any(test, feature = "test-support"))]
  91    pub fn is_connected(&self) -> bool {
  92        if let Some(live_kit) = self.live_kit.as_ref() {
  93            matches!(
  94                *live_kit.room.status().borrow(),
  95                live_kit_client::ConnectionState::Connected { .. }
  96            )
  97        } else {
  98            false
  99        }
 100    }
 101
 102    fn new(
 103        id: u64,
 104        channel_id: Option<u64>,
 105        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 106        client: Arc<Client>,
 107        user_store: Model<UserStore>,
 108        cx: &mut ModelContext<Self>,
 109    ) -> Self {
 110        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 111            let room = live_kit_client::Room::new();
 112            let mut status = room.status();
 113            // Consume the initial status of the room.
 114            let _ = status.try_recv();
 115            let _maintain_room = cx.spawn(|this, mut cx| async move {
 116                while let Some(status) = status.next().await {
 117                    let this = if let Some(this) = this.upgrade() {
 118                        this
 119                    } else {
 120                        break;
 121                    };
 122
 123                    if status == live_kit_client::ConnectionState::Disconnected {
 124                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 125                            .ok();
 126                        break;
 127                    }
 128                }
 129            });
 130
 131            let _handle_updates = cx.spawn({
 132                let room = room.clone();
 133                move |this, mut cx| async move {
 134                    let mut updates = room.updates();
 135                    while let Some(update) = updates.next().await {
 136                        let this = if let Some(this) = this.upgrade() {
 137                            this
 138                        } else {
 139                            break;
 140                        };
 141
 142                        this.update(&mut cx, |this, cx| {
 143                            this.live_kit_room_updated(update, cx).log_err()
 144                        })
 145                        .ok();
 146                    }
 147                }
 148            });
 149
 150            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 151            cx.spawn(|this, mut cx| async move {
 152                connect.await?;
 153
 154                let is_read_only = this
 155                    .update(&mut cx, |room, _| room.read_only())
 156                    .unwrap_or(true);
 157
 158                if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only {
 159                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 160                        .await?;
 161                }
 162
 163                anyhow::Ok(())
 164            })
 165            .detach_and_log_err(cx);
 166
 167            Some(LiveKitRoom {
 168                room,
 169                screen_track: LocalTrack::None,
 170                microphone_track: LocalTrack::None,
 171                next_publish_id: 0,
 172                muted_by_user: false,
 173                deafened: false,
 174                speaking: false,
 175                _maintain_room,
 176                _handle_updates,
 177            })
 178        } else {
 179            None
 180        };
 181
 182        let maintain_connection = cx.spawn({
 183            let client = client.clone();
 184            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 185        });
 186
 187        Audio::play_sound(Sound::Joined, cx);
 188
 189        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 190
 191        Self {
 192            id,
 193            channel_id,
 194            live_kit: live_kit_room,
 195            status: RoomStatus::Online,
 196            shared_projects: Default::default(),
 197            joined_projects: Default::default(),
 198            participant_user_ids: Default::default(),
 199            local_participant: Default::default(),
 200            remote_participants: Default::default(),
 201            pending_participants: Default::default(),
 202            pending_call_count: 0,
 203            client_subscriptions: vec![
 204                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 205            ],
 206            _subscriptions: vec![
 207                cx.on_release(Self::released),
 208                cx.on_app_quit(Self::app_will_quit),
 209            ],
 210            leave_when_empty: false,
 211            pending_room_update: None,
 212            client,
 213            user_store,
 214            follows_by_leader_id_project_id: Default::default(),
 215            maintain_connection: Some(maintain_connection),
 216            room_update_completed_tx,
 217            room_update_completed_rx,
 218        }
 219    }
 220
 221    pub(crate) fn create(
 222        called_user_id: u64,
 223        initial_project: Option<Model<Project>>,
 224        client: Arc<Client>,
 225        user_store: Model<UserStore>,
 226        cx: &mut AppContext,
 227    ) -> Task<Result<Model<Self>>> {
 228        cx.spawn(move |mut cx| async move {
 229            let response = client.request(proto::CreateRoom {}).await?;
 230            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 231            let room = cx.new_model(|cx| {
 232                let mut room = Self::new(
 233                    room_proto.id,
 234                    None,
 235                    response.live_kit_connection_info,
 236                    client,
 237                    user_store,
 238                    cx,
 239                );
 240                if let Some(participant) = room_proto.participants.first() {
 241                    room.local_participant.role = participant.role()
 242                }
 243                room
 244            })?;
 245
 246            let initial_project_id = if let Some(initial_project) = initial_project {
 247                let initial_project_id = room
 248                    .update(&mut cx, |room, cx| {
 249                        room.share_project(initial_project.clone(), cx)
 250                    })?
 251                    .await?;
 252                Some(initial_project_id)
 253            } else {
 254                None
 255            };
 256
 257            match room
 258                .update(&mut cx, |room, cx| {
 259                    room.leave_when_empty = true;
 260                    room.call(called_user_id, initial_project_id, cx)
 261                })?
 262                .await
 263            {
 264                Ok(()) => Ok(room),
 265                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 266            }
 267        })
 268    }
 269
 270    pub(crate) async fn join_channel(
 271        channel_id: u64,
 272        client: Arc<Client>,
 273        user_store: Model<UserStore>,
 274        cx: AsyncAppContext,
 275    ) -> Result<Model<Self>> {
 276        Self::from_join_response(
 277            client.request(proto::JoinChannel { channel_id }).await?,
 278            client,
 279            user_store,
 280            cx,
 281        )
 282    }
 283
 284    pub(crate) async fn join(
 285        room_id: u64,
 286        client: Arc<Client>,
 287        user_store: Model<UserStore>,
 288        cx: AsyncAppContext,
 289    ) -> Result<Model<Self>> {
 290        Self::from_join_response(
 291            client.request(proto::JoinRoom { id: room_id }).await?,
 292            client,
 293            user_store,
 294            cx,
 295        )
 296    }
 297
 298    fn released(&mut self, cx: &mut AppContext) {
 299        if self.status.is_online() {
 300            self.leave_internal(cx).detach_and_log_err(cx);
 301        }
 302    }
 303
 304    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 305        let task = if self.status.is_online() {
 306            let leave = self.leave_internal(cx);
 307            Some(cx.background_executor().spawn(async move {
 308                leave.await.log_err();
 309            }))
 310        } else {
 311            None
 312        };
 313
 314        async move {
 315            if let Some(task) = task {
 316                task.await;
 317            }
 318        }
 319    }
 320
 321    pub fn mute_on_join(cx: &AppContext) -> bool {
 322        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 323    }
 324
 325    fn from_join_response(
 326        response: proto::JoinRoomResponse,
 327        client: Arc<Client>,
 328        user_store: Model<UserStore>,
 329        mut cx: AsyncAppContext,
 330    ) -> Result<Model<Self>> {
 331        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 332        let room = cx.new_model(|cx| {
 333            Self::new(
 334                room_proto.id,
 335                response.channel_id,
 336                response.live_kit_connection_info,
 337                client,
 338                user_store,
 339                cx,
 340            )
 341        })?;
 342        room.update(&mut cx, |room, cx| {
 343            room.leave_when_empty = room.channel_id.is_none();
 344            room.apply_room_update(room_proto, cx)?;
 345            anyhow::Ok(())
 346        })??;
 347        Ok(room)
 348    }
 349
 350    fn should_leave(&self) -> bool {
 351        self.leave_when_empty
 352            && self.pending_room_update.is_none()
 353            && self.pending_participants.is_empty()
 354            && self.remote_participants.is_empty()
 355            && self.pending_call_count == 0
 356    }
 357
 358    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 359        cx.notify();
 360        cx.emit(Event::Left);
 361        self.leave_internal(cx)
 362    }
 363
 364    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 365        if self.status.is_offline() {
 366            return Task::ready(Err(anyhow!("room is offline")));
 367        }
 368
 369        log::info!("leaving room");
 370        Audio::play_sound(Sound::Leave, cx);
 371
 372        self.clear_state(cx);
 373
 374        let leave_room = self.client.request(proto::LeaveRoom {});
 375        cx.background_executor().spawn(async move {
 376            leave_room.await?;
 377            anyhow::Ok(())
 378        })
 379    }
 380
 381    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 382        for project in self.shared_projects.drain() {
 383            if let Some(project) = project.upgrade() {
 384                project.update(cx, |project, cx| {
 385                    project.unshare(cx).log_err();
 386                });
 387            }
 388        }
 389        for project in self.joined_projects.drain() {
 390            if let Some(project) = project.upgrade() {
 391                project.update(cx, |project, cx| {
 392                    project.disconnected_from_host(cx);
 393                    project.close(cx);
 394                });
 395            }
 396        }
 397
 398        self.status = RoomStatus::Offline;
 399        self.remote_participants.clear();
 400        self.pending_participants.clear();
 401        self.participant_user_ids.clear();
 402        self.client_subscriptions.clear();
 403        self.live_kit.take();
 404        self.pending_room_update.take();
 405        self.maintain_connection.take();
 406    }
 407
 408    async fn maintain_connection(
 409        this: WeakModel<Self>,
 410        client: Arc<Client>,
 411        mut cx: AsyncAppContext,
 412    ) -> Result<()> {
 413        let mut client_status = client.status();
 414        loop {
 415            let _ = client_status.try_recv();
 416            let is_connected = client_status.borrow().is_connected();
 417            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 418            if !is_connected || client_status.next().await.is_some() {
 419                log::info!("detected client disconnection");
 420
 421                this.upgrade()
 422                    .ok_or_else(|| anyhow!("room was dropped"))?
 423                    .update(&mut cx, |this, cx| {
 424                        this.status = RoomStatus::Rejoining;
 425                        cx.notify();
 426                    })?;
 427
 428                // Wait for client to re-establish a connection to the server.
 429                {
 430                    let mut reconnection_timeout =
 431                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 432                    let client_reconnection = async {
 433                        let mut remaining_attempts = 3;
 434                        while remaining_attempts > 0 {
 435                            if client_status.borrow().is_connected() {
 436                                log::info!("client reconnected, attempting to rejoin room");
 437
 438                                let Some(this) = this.upgrade() else { break };
 439                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 440                                    Ok(task) => {
 441                                        if task.await.log_err().is_some() {
 442                                            return true;
 443                                        } else {
 444                                            remaining_attempts -= 1;
 445                                        }
 446                                    }
 447                                    Err(_app_dropped) => return false,
 448                                }
 449                            } else if client_status.borrow().is_signed_out() {
 450                                return false;
 451                            }
 452
 453                            log::info!(
 454                                "waiting for client status change, remaining attempts {}",
 455                                remaining_attempts
 456                            );
 457                            client_status.next().await;
 458                        }
 459                        false
 460                    }
 461                    .fuse();
 462                    futures::pin_mut!(client_reconnection);
 463
 464                    futures::select_biased! {
 465                        reconnected = client_reconnection => {
 466                            if reconnected {
 467                                log::info!("successfully reconnected to room");
 468                                // If we successfully joined the room, go back around the loop
 469                                // waiting for future connection status changes.
 470                                continue;
 471                            }
 472                        }
 473                        _ = reconnection_timeout => {
 474                            log::info!("room reconnection timeout expired");
 475                        }
 476                    }
 477                }
 478
 479                break;
 480            }
 481        }
 482
 483        // The client failed to re-establish a connection to the server
 484        // or an error occurred while trying to re-join the room. Either way
 485        // we leave the room and return an error.
 486        if let Some(this) = this.upgrade() {
 487            log::info!("reconnection failed, leaving room");
 488            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 489        }
 490        Err(anyhow!(
 491            "can't reconnect to room: client failed to re-establish connection"
 492        ))
 493    }
 494
 495    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 496        let mut projects = HashMap::default();
 497        let mut reshared_projects = Vec::new();
 498        let mut rejoined_projects = Vec::new();
 499        self.shared_projects.retain(|project| {
 500            if let Some(handle) = project.upgrade() {
 501                let project = handle.read(cx);
 502                if let Some(project_id) = project.remote_id() {
 503                    projects.insert(project_id, handle.clone());
 504                    reshared_projects.push(proto::UpdateProject {
 505                        project_id,
 506                        worktrees: project.worktree_metadata_protos(cx),
 507                    });
 508                    return true;
 509                }
 510            }
 511            false
 512        });
 513        self.joined_projects.retain(|project| {
 514            if let Some(handle) = project.upgrade() {
 515                let project = handle.read(cx);
 516                if let Some(project_id) = project.remote_id() {
 517                    projects.insert(project_id, handle.clone());
 518                    rejoined_projects.push(proto::RejoinProject {
 519                        id: project_id,
 520                        worktrees: project
 521                            .worktrees()
 522                            .map(|worktree| {
 523                                let worktree = worktree.read(cx);
 524                                proto::RejoinWorktree {
 525                                    id: worktree.id().to_proto(),
 526                                    scan_id: worktree.completed_scan_id() as u64,
 527                                }
 528                            })
 529                            .collect(),
 530                    });
 531                }
 532                return true;
 533            }
 534            false
 535        });
 536
 537        let response = self.client.request_envelope(proto::RejoinRoom {
 538            id: self.id,
 539            reshared_projects,
 540            rejoined_projects,
 541        });
 542
 543        cx.spawn(|this, mut cx| async move {
 544            let response = response.await?;
 545            let message_id = response.message_id;
 546            let response = response.payload;
 547            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 548            this.update(&mut cx, |this, cx| {
 549                this.status = RoomStatus::Online;
 550                this.apply_room_update(room_proto, cx)?;
 551
 552                for reshared_project in response.reshared_projects {
 553                    if let Some(project) = projects.get(&reshared_project.id) {
 554                        project.update(cx, |project, cx| {
 555                            project.reshared(reshared_project, cx).log_err();
 556                        });
 557                    }
 558                }
 559
 560                for rejoined_project in response.rejoined_projects {
 561                    if let Some(project) = projects.get(&rejoined_project.id) {
 562                        project.update(cx, |project, cx| {
 563                            project.rejoined(rejoined_project, message_id, cx).log_err();
 564                        });
 565                    }
 566                }
 567
 568                anyhow::Ok(())
 569            })?
 570        })
 571    }
 572
 573    pub fn id(&self) -> u64 {
 574        self.id
 575    }
 576
 577    pub fn status(&self) -> RoomStatus {
 578        self.status
 579    }
 580
 581    pub fn local_participant(&self) -> &LocalParticipant {
 582        &self.local_participant
 583    }
 584
 585    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 586        &self.remote_participants
 587    }
 588
 589    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 590        self.remote_participants
 591            .values()
 592            .find(|p| p.peer_id == peer_id)
 593    }
 594
 595    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 596        self.remote_participants
 597            .get(&user_id)
 598            .map(|participant| participant.role)
 599    }
 600
 601    pub fn local_participant_is_admin(&self) -> bool {
 602        self.local_participant.role == proto::ChannelRole::Admin
 603    }
 604
 605    pub fn set_participant_role(
 606        &mut self,
 607        user_id: u64,
 608        role: proto::ChannelRole,
 609        cx: &ModelContext<Self>,
 610    ) -> Task<Result<()>> {
 611        let client = self.client.clone();
 612        let room_id = self.id;
 613        let role = role.into();
 614        cx.spawn(|_, _| async move {
 615            client
 616                .request(proto::SetRoomParticipantRole {
 617                    room_id,
 618                    user_id,
 619                    role,
 620                })
 621                .await
 622                .map(|_| ())
 623        })
 624    }
 625
 626    pub fn pending_participants(&self) -> &[Arc<User>] {
 627        &self.pending_participants
 628    }
 629
 630    pub fn contains_participant(&self, user_id: u64) -> bool {
 631        self.participant_user_ids.contains(&user_id)
 632    }
 633
 634    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 635        self.follows_by_leader_id_project_id
 636            .get(&(leader_id, project_id))
 637            .map_or(&[], |v| v.as_slice())
 638    }
 639
 640    /// Returns the most 'active' projects, defined as most people in the project
 641    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 642        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 643        for participant in self.remote_participants.values() {
 644            match participant.location {
 645                ParticipantLocation::SharedProject { project_id } => {
 646                    project_hosts_and_guest_counts
 647                        .entry(project_id)
 648                        .or_default()
 649                        .1 += 1;
 650                }
 651                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 652            }
 653            for project in &participant.projects {
 654                project_hosts_and_guest_counts
 655                    .entry(project.id)
 656                    .or_default()
 657                    .0 = Some(participant.user.id);
 658            }
 659        }
 660
 661        if let Some(user) = self.user_store.read(cx).current_user() {
 662            for project in &self.local_participant.projects {
 663                project_hosts_and_guest_counts
 664                    .entry(project.id)
 665                    .or_default()
 666                    .0 = Some(user.id);
 667            }
 668        }
 669
 670        project_hosts_and_guest_counts
 671            .into_iter()
 672            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 673            .max_by_key(|(_, _, guest_count)| *guest_count)
 674            .map(|(id, host, _)| (id, host))
 675    }
 676
 677    async fn handle_room_updated(
 678        this: Model<Self>,
 679        envelope: TypedEnvelope<proto::RoomUpdated>,
 680        _: Arc<Client>,
 681        mut cx: AsyncAppContext,
 682    ) -> Result<()> {
 683        let room = envelope
 684            .payload
 685            .room
 686            .ok_or_else(|| anyhow!("invalid room"))?;
 687        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 688    }
 689
 690    fn apply_room_update(
 691        &mut self,
 692        mut room: proto::Room,
 693        cx: &mut ModelContext<Self>,
 694    ) -> Result<()> {
 695        // Filter ourselves out from the room's participants.
 696        let local_participant_ix = room
 697            .participants
 698            .iter()
 699            .position(|participant| Some(participant.user_id) == self.client.user_id());
 700        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 701
 702        let pending_participant_user_ids = room
 703            .pending_participants
 704            .iter()
 705            .map(|p| p.user_id)
 706            .collect::<Vec<_>>();
 707
 708        let remote_participant_user_ids = room
 709            .participants
 710            .iter()
 711            .map(|p| p.user_id)
 712            .collect::<Vec<_>>();
 713
 714        let (remote_participants, pending_participants) =
 715            self.user_store.update(cx, move |user_store, cx| {
 716                (
 717                    user_store.get_users(remote_participant_user_ids, cx),
 718                    user_store.get_users(pending_participant_user_ids, cx),
 719                )
 720            });
 721
 722        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 723            let (remote_participants, pending_participants) =
 724                futures::join!(remote_participants, pending_participants);
 725
 726            this.update(&mut cx, |this, cx| {
 727                this.participant_user_ids.clear();
 728
 729                if let Some(participant) = local_participant {
 730                    let role = participant.role();
 731                    this.local_participant.projects = participant.projects;
 732                    if this.local_participant.role != role {
 733                        this.local_participant.role = role;
 734
 735                        if role == proto::ChannelRole::Guest {
 736                            for project in mem::take(&mut this.shared_projects) {
 737                                if let Some(project) = project.upgrade() {
 738                                    this.unshare_project(project, cx).log_err();
 739                                }
 740                            }
 741                            this.local_participant.projects.clear();
 742                            if let Some(live_kit_room) = &mut this.live_kit {
 743                                live_kit_room.stop_publishing(cx);
 744                            }
 745                        }
 746
 747                        this.joined_projects.retain(|project| {
 748                            if let Some(project) = project.upgrade() {
 749                                project.update(cx, |project, cx| project.set_role(role, cx));
 750                                true
 751                            } else {
 752                                false
 753                            }
 754                        });
 755                    }
 756                } else {
 757                    this.local_participant.projects.clear();
 758                }
 759
 760                if let Some(participants) = remote_participants.log_err() {
 761                    for (participant, user) in room.participants.into_iter().zip(participants) {
 762                        let Some(peer_id) = participant.peer_id else {
 763                            continue;
 764                        };
 765                        let participant_index = ParticipantIndex(participant.participant_index);
 766                        this.participant_user_ids.insert(participant.user_id);
 767
 768                        let old_projects = this
 769                            .remote_participants
 770                            .get(&participant.user_id)
 771                            .into_iter()
 772                            .flat_map(|existing| &existing.projects)
 773                            .map(|project| project.id)
 774                            .collect::<HashSet<_>>();
 775                        let new_projects = participant
 776                            .projects
 777                            .iter()
 778                            .map(|project| project.id)
 779                            .collect::<HashSet<_>>();
 780
 781                        for project in &participant.projects {
 782                            if !old_projects.contains(&project.id) {
 783                                cx.emit(Event::RemoteProjectShared {
 784                                    owner: user.clone(),
 785                                    project_id: project.id,
 786                                    worktree_root_names: project.worktree_root_names.clone(),
 787                                });
 788                            }
 789                        }
 790
 791                        for unshared_project_id in old_projects.difference(&new_projects) {
 792                            this.joined_projects.retain(|project| {
 793                                if let Some(project) = project.upgrade() {
 794                                    project.update(cx, |project, cx| {
 795                                        if project.remote_id() == Some(*unshared_project_id) {
 796                                            project.disconnected_from_host(cx);
 797                                            false
 798                                        } else {
 799                                            true
 800                                        }
 801                                    })
 802                                } else {
 803                                    false
 804                                }
 805                            });
 806                            cx.emit(Event::RemoteProjectUnshared {
 807                                project_id: *unshared_project_id,
 808                            });
 809                        }
 810
 811                        let role = participant.role();
 812                        let location = ParticipantLocation::from_proto(participant.location)
 813                            .unwrap_or(ParticipantLocation::External);
 814                        if let Some(remote_participant) =
 815                            this.remote_participants.get_mut(&participant.user_id)
 816                        {
 817                            remote_participant.peer_id = peer_id;
 818                            remote_participant.projects = participant.projects;
 819                            remote_participant.participant_index = participant_index;
 820                            if location != remote_participant.location
 821                                || role != remote_participant.role
 822                            {
 823                                remote_participant.location = location;
 824                                remote_participant.role = role;
 825                                cx.emit(Event::ParticipantLocationChanged {
 826                                    participant_id: peer_id,
 827                                });
 828                            }
 829                        } else {
 830                            this.remote_participants.insert(
 831                                participant.user_id,
 832                                RemoteParticipant {
 833                                    user: user.clone(),
 834                                    participant_index,
 835                                    peer_id,
 836                                    projects: participant.projects,
 837                                    location,
 838                                    role,
 839                                    muted: true,
 840                                    speaking: false,
 841                                    video_tracks: Default::default(),
 842                                    audio_tracks: Default::default(),
 843                                },
 844                            );
 845
 846                            Audio::play_sound(Sound::Joined, cx);
 847
 848                            if let Some(live_kit) = this.live_kit.as_ref() {
 849                                let video_tracks =
 850                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 851                                let audio_tracks =
 852                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 853                                let publications = live_kit
 854                                    .room
 855                                    .remote_audio_track_publications(&user.id.to_string());
 856
 857                                for track in video_tracks {
 858                                    this.live_kit_room_updated(
 859                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
 860                                        cx,
 861                                    )
 862                                    .log_err();
 863                                }
 864
 865                                for (track, publication) in
 866                                    audio_tracks.iter().zip(publications.iter())
 867                                {
 868                                    this.live_kit_room_updated(
 869                                        RoomUpdate::SubscribedToRemoteAudioTrack(
 870                                            track.clone(),
 871                                            publication.clone(),
 872                                        ),
 873                                        cx,
 874                                    )
 875                                    .log_err();
 876                                }
 877                            }
 878                        }
 879                    }
 880
 881                    this.remote_participants.retain(|user_id, participant| {
 882                        if this.participant_user_ids.contains(user_id) {
 883                            true
 884                        } else {
 885                            for project in &participant.projects {
 886                                cx.emit(Event::RemoteProjectUnshared {
 887                                    project_id: project.id,
 888                                });
 889                            }
 890                            false
 891                        }
 892                    });
 893                }
 894
 895                if let Some(pending_participants) = pending_participants.log_err() {
 896                    this.pending_participants = pending_participants;
 897                    for participant in &this.pending_participants {
 898                        this.participant_user_ids.insert(participant.id);
 899                    }
 900                }
 901
 902                this.follows_by_leader_id_project_id.clear();
 903                for follower in room.followers {
 904                    let project_id = follower.project_id;
 905                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 906                        (Some(leader), Some(follower)) => (leader, follower),
 907
 908                        _ => {
 909                            log::error!("Follower message {follower:?} missing some state");
 910                            continue;
 911                        }
 912                    };
 913
 914                    let list = this
 915                        .follows_by_leader_id_project_id
 916                        .entry((leader, project_id))
 917                        .or_insert(Vec::new());
 918                    if !list.contains(&follower) {
 919                        list.push(follower);
 920                    }
 921                }
 922
 923                this.pending_room_update.take();
 924                if this.should_leave() {
 925                    log::info!("room is empty, leaving");
 926                    let _ = this.leave(cx);
 927                }
 928
 929                this.user_store.update(cx, |user_store, cx| {
 930                    let participant_indices_by_user_id = this
 931                        .remote_participants
 932                        .iter()
 933                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 934                        .collect();
 935                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 936                });
 937
 938                this.check_invariants();
 939                this.room_update_completed_tx.try_send(Some(())).ok();
 940                cx.notify();
 941            })
 942            .ok();
 943        }));
 944
 945        cx.notify();
 946        Ok(())
 947    }
 948
 949    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 950        let mut done_rx = self.room_update_completed_rx.clone();
 951        async move {
 952            while let Some(result) = done_rx.next().await {
 953                if result.is_some() {
 954                    break;
 955                }
 956            }
 957        }
 958    }
 959
 960    fn live_kit_room_updated(
 961        &mut self,
 962        update: RoomUpdate,
 963        cx: &mut ModelContext<Self>,
 964    ) -> Result<()> {
 965        match update {
 966            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
 967                let user_id = track.publisher_id().parse()?;
 968                let track_id = track.sid().to_string();
 969                let participant = self
 970                    .remote_participants
 971                    .get_mut(&user_id)
 972                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 973                participant.video_tracks.insert(track_id.clone(), track);
 974                cx.emit(Event::RemoteVideoTracksChanged {
 975                    participant_id: participant.peer_id,
 976                });
 977            }
 978
 979            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
 980                publisher_id,
 981                track_id,
 982            } => {
 983                let user_id = publisher_id.parse()?;
 984                let participant = self
 985                    .remote_participants
 986                    .get_mut(&user_id)
 987                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 988                participant.video_tracks.remove(&track_id);
 989                cx.emit(Event::RemoteVideoTracksChanged {
 990                    participant_id: participant.peer_id,
 991                });
 992            }
 993
 994            RoomUpdate::ActiveSpeakersChanged { speakers } => {
 995                let mut speaker_ids = speakers
 996                    .into_iter()
 997                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 998                    .collect::<Vec<u64>>();
 999                speaker_ids.sort_unstable();
1000                for (sid, participant) in &mut self.remote_participants {
1001                    if let Ok(_) = speaker_ids.binary_search(sid) {
1002                        participant.speaking = true;
1003                    } else {
1004                        participant.speaking = false;
1005                    }
1006                }
1007                if let Some(id) = self.client.user_id() {
1008                    if let Some(room) = &mut self.live_kit {
1009                        if let Ok(_) = speaker_ids.binary_search(&id) {
1010                            room.speaking = true;
1011                        } else {
1012                            room.speaking = false;
1013                        }
1014                    }
1015                }
1016            }
1017
1018            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1019                let mut found = false;
1020                for participant in &mut self.remote_participants.values_mut() {
1021                    for track in participant.audio_tracks.values() {
1022                        if track.sid() == track_id {
1023                            found = true;
1024                            break;
1025                        }
1026                    }
1027                    if found {
1028                        participant.muted = muted;
1029                        break;
1030                    }
1031                }
1032            }
1033
1034            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1035                let user_id = track.publisher_id().parse()?;
1036                let track_id = track.sid().to_string();
1037                let participant = self
1038                    .remote_participants
1039                    .get_mut(&user_id)
1040                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1041                participant.audio_tracks.insert(track_id.clone(), track);
1042                participant.muted = publication.is_muted();
1043
1044                cx.emit(Event::RemoteAudioTracksChanged {
1045                    participant_id: participant.peer_id,
1046                });
1047            }
1048
1049            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1050                publisher_id,
1051                track_id,
1052            } => {
1053                let user_id = publisher_id.parse()?;
1054                let participant = self
1055                    .remote_participants
1056                    .get_mut(&user_id)
1057                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1058                participant.audio_tracks.remove(&track_id);
1059                cx.emit(Event::RemoteAudioTracksChanged {
1060                    participant_id: participant.peer_id,
1061                });
1062            }
1063
1064            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1065                log::info!("unpublished audio track {}", publication.sid());
1066                if let Some(room) = &mut self.live_kit {
1067                    room.microphone_track = LocalTrack::None;
1068                }
1069            }
1070
1071            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1072                log::info!("unpublished video track {}", publication.sid());
1073                if let Some(room) = &mut self.live_kit {
1074                    room.screen_track = LocalTrack::None;
1075                }
1076            }
1077
1078            RoomUpdate::LocalAudioTrackPublished { publication } => {
1079                log::info!("published audio track {}", publication.sid());
1080            }
1081
1082            RoomUpdate::LocalVideoTrackPublished { publication } => {
1083                log::info!("published video track {}", publication.sid());
1084            }
1085        }
1086
1087        cx.notify();
1088        Ok(())
1089    }
1090
1091    fn check_invariants(&self) {
1092        #[cfg(any(test, feature = "test-support"))]
1093        {
1094            for participant in self.remote_participants.values() {
1095                assert!(self.participant_user_ids.contains(&participant.user.id));
1096                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1097            }
1098
1099            for participant in &self.pending_participants {
1100                assert!(self.participant_user_ids.contains(&participant.id));
1101                assert_ne!(participant.id, self.client.user_id().unwrap());
1102            }
1103
1104            assert_eq!(
1105                self.participant_user_ids.len(),
1106                self.remote_participants.len() + self.pending_participants.len()
1107            );
1108        }
1109    }
1110
1111    pub(crate) fn call(
1112        &mut self,
1113        called_user_id: u64,
1114        initial_project_id: Option<u64>,
1115        cx: &mut ModelContext<Self>,
1116    ) -> Task<Result<()>> {
1117        if self.status.is_offline() {
1118            return Task::ready(Err(anyhow!("room is offline")));
1119        }
1120
1121        cx.notify();
1122        let client = self.client.clone();
1123        let room_id = self.id;
1124        self.pending_call_count += 1;
1125        cx.spawn(move |this, mut cx| async move {
1126            let result = client
1127                .request(proto::Call {
1128                    room_id,
1129                    called_user_id,
1130                    initial_project_id,
1131                })
1132                .await;
1133            this.update(&mut cx, |this, cx| {
1134                this.pending_call_count -= 1;
1135                if this.should_leave() {
1136                    this.leave(cx).detach_and_log_err(cx);
1137                }
1138            })?;
1139            result?;
1140            Ok(())
1141        })
1142    }
1143
1144    pub fn join_project(
1145        &mut self,
1146        id: u64,
1147        language_registry: Arc<LanguageRegistry>,
1148        fs: Arc<dyn Fs>,
1149        cx: &mut ModelContext<Self>,
1150    ) -> Task<Result<Model<Project>>> {
1151        let client = self.client.clone();
1152        let user_store = self.user_store.clone();
1153        let role = self.local_participant.role;
1154        cx.emit(Event::RemoteProjectJoined { project_id: id });
1155        cx.spawn(move |this, mut cx| async move {
1156            let project = Project::remote(
1157                id,
1158                client,
1159                user_store,
1160                language_registry,
1161                fs,
1162                role,
1163                cx.clone(),
1164            )
1165            .await?;
1166
1167            this.update(&mut cx, |this, cx| {
1168                this.joined_projects.retain(|project| {
1169                    if let Some(project) = project.upgrade() {
1170                        !project.read(cx).is_disconnected()
1171                    } else {
1172                        false
1173                    }
1174                });
1175                this.joined_projects.insert(project.downgrade());
1176            })?;
1177            Ok(project)
1178        })
1179    }
1180
1181    pub(crate) fn share_project(
1182        &mut self,
1183        project: Model<Project>,
1184        cx: &mut ModelContext<Self>,
1185    ) -> Task<Result<u64>> {
1186        if let Some(project_id) = project.read(cx).remote_id() {
1187            return Task::ready(Ok(project_id));
1188        }
1189
1190        let request = self.client.request(proto::ShareProject {
1191            room_id: self.id(),
1192            worktrees: project.read(cx).worktree_metadata_protos(cx),
1193        });
1194        cx.spawn(|this, mut cx| async move {
1195            let response = request.await?;
1196
1197            project.update(&mut cx, |project, cx| {
1198                project.shared(response.project_id, cx)
1199            })??;
1200
1201            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1202            this.update(&mut cx, |this, cx| {
1203                this.shared_projects.insert(project.downgrade());
1204                let active_project = this.local_participant.active_project.as_ref();
1205                if active_project.map_or(false, |location| *location == project) {
1206                    this.set_location(Some(&project), cx)
1207                } else {
1208                    Task::ready(Ok(()))
1209                }
1210            })?
1211            .await?;
1212
1213            Ok(response.project_id)
1214        })
1215    }
1216
1217    pub(crate) fn unshare_project(
1218        &mut self,
1219        project: Model<Project>,
1220        cx: &mut ModelContext<Self>,
1221    ) -> Result<()> {
1222        let project_id = match project.read(cx).remote_id() {
1223            Some(project_id) => project_id,
1224            None => return Ok(()),
1225        };
1226
1227        self.client.send(proto::UnshareProject { project_id })?;
1228        project.update(cx, |this, cx| this.unshare(cx))?;
1229
1230        if self.local_participant.active_project == Some(project.downgrade()) {
1231            self.set_location(Some(&project), cx).detach_and_log_err(cx);
1232        }
1233        Ok(())
1234    }
1235
1236    pub(crate) fn set_location(
1237        &mut self,
1238        project: Option<&Model<Project>>,
1239        cx: &mut ModelContext<Self>,
1240    ) -> Task<Result<()>> {
1241        if self.status.is_offline() {
1242            return Task::ready(Err(anyhow!("room is offline")));
1243        }
1244
1245        let client = self.client.clone();
1246        let room_id = self.id;
1247        let location = if let Some(project) = project {
1248            self.local_participant.active_project = Some(project.downgrade());
1249            if let Some(project_id) = project.read(cx).remote_id() {
1250                proto::participant_location::Variant::SharedProject(
1251                    proto::participant_location::SharedProject { id: project_id },
1252                )
1253            } else {
1254                proto::participant_location::Variant::UnsharedProject(
1255                    proto::participant_location::UnsharedProject {},
1256                )
1257            }
1258        } else {
1259            self.local_participant.active_project = None;
1260            proto::participant_location::Variant::External(proto::participant_location::External {})
1261        };
1262
1263        cx.notify();
1264        cx.background_executor().spawn(async move {
1265            client
1266                .request(proto::UpdateParticipantLocation {
1267                    room_id,
1268                    location: Some(proto::ParticipantLocation {
1269                        variant: Some(location),
1270                    }),
1271                })
1272                .await?;
1273            Ok(())
1274        })
1275    }
1276
1277    pub fn is_screen_sharing(&self) -> bool {
1278        self.live_kit.as_ref().map_or(false, |live_kit| {
1279            !matches!(live_kit.screen_track, LocalTrack::None)
1280        })
1281    }
1282
1283    pub fn is_sharing_mic(&self) -> bool {
1284        self.live_kit.as_ref().map_or(false, |live_kit| {
1285            !matches!(live_kit.microphone_track, LocalTrack::None)
1286        })
1287    }
1288
1289    pub fn is_muted(&self, cx: &AppContext) -> bool {
1290        self.live_kit
1291            .as_ref()
1292            .and_then(|live_kit| match &live_kit.microphone_track {
1293                LocalTrack::None => Some(Self::mute_on_join(cx)),
1294                LocalTrack::Pending { muted, .. } => Some(*muted),
1295                LocalTrack::Published { muted, .. } => Some(*muted),
1296            })
1297            .unwrap_or(false)
1298    }
1299
1300    pub fn read_only(&self) -> bool {
1301        !(self.local_participant().role == proto::ChannelRole::Member
1302            || self.local_participant().role == proto::ChannelRole::Admin)
1303    }
1304
1305    pub fn is_speaking(&self) -> bool {
1306        self.live_kit
1307            .as_ref()
1308            .map_or(false, |live_kit| live_kit.speaking)
1309    }
1310
1311    pub fn is_deafened(&self) -> Option<bool> {
1312        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1313    }
1314
1315    #[track_caller]
1316    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1317        if self.status.is_offline() {
1318            return Task::ready(Err(anyhow!("room is offline")));
1319        } else if self.is_sharing_mic() {
1320            return Task::ready(Err(anyhow!("microphone was already shared")));
1321        }
1322
1323        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1324            let publish_id = post_inc(&mut live_kit.next_publish_id);
1325            live_kit.microphone_track = LocalTrack::Pending {
1326                publish_id,
1327                muted: false,
1328            };
1329            cx.notify();
1330            publish_id
1331        } else {
1332            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1333        };
1334
1335        cx.spawn(move |this, mut cx| async move {
1336            let publish_track = async {
1337                let track = LocalAudioTrack::create();
1338                this.upgrade()
1339                    .ok_or_else(|| anyhow!("room was dropped"))?
1340                    .update(&mut cx, |this, _| {
1341                        this.live_kit
1342                            .as_ref()
1343                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1344                    })?
1345                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1346                    .await
1347            };
1348            let publication = publish_track.await;
1349            this.upgrade()
1350                .ok_or_else(|| anyhow!("room was dropped"))?
1351                .update(&mut cx, |this, cx| {
1352                    let live_kit = this
1353                        .live_kit
1354                        .as_mut()
1355                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1356
1357                    let (canceled, muted) = if let LocalTrack::Pending {
1358                        publish_id: cur_publish_id,
1359                        muted,
1360                    } = &live_kit.microphone_track
1361                    {
1362                        (*cur_publish_id != publish_id, *muted)
1363                    } else {
1364                        (true, false)
1365                    };
1366
1367                    match publication {
1368                        Ok(publication) => {
1369                            if canceled {
1370                                live_kit.room.unpublish_track(publication);
1371                            } else {
1372                                if muted {
1373                                    cx.background_executor()
1374                                        .spawn(publication.set_mute(muted))
1375                                        .detach();
1376                                }
1377                                live_kit.microphone_track = LocalTrack::Published {
1378                                    track_publication: publication,
1379                                    muted,
1380                                };
1381                                cx.notify();
1382                            }
1383                            Ok(())
1384                        }
1385                        Err(error) => {
1386                            if canceled {
1387                                Ok(())
1388                            } else {
1389                                live_kit.microphone_track = LocalTrack::None;
1390                                cx.notify();
1391                                Err(error)
1392                            }
1393                        }
1394                    }
1395                })?
1396        })
1397    }
1398
1399    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1400        if self.status.is_offline() {
1401            return Task::ready(Err(anyhow!("room is offline")));
1402        } else if self.is_screen_sharing() {
1403            return Task::ready(Err(anyhow!("screen was already shared")));
1404        }
1405
1406        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1407            let publish_id = post_inc(&mut live_kit.next_publish_id);
1408            live_kit.screen_track = LocalTrack::Pending {
1409                publish_id,
1410                muted: false,
1411            };
1412            cx.notify();
1413            (live_kit.room.display_sources(), publish_id)
1414        } else {
1415            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1416        };
1417
1418        cx.spawn(move |this, mut cx| async move {
1419            let publish_track = async {
1420                let displays = displays.await?;
1421                let display = displays
1422                    .first()
1423                    .ok_or_else(|| anyhow!("no display found"))?;
1424                let track = LocalVideoTrack::screen_share_for_display(&display);
1425                this.upgrade()
1426                    .ok_or_else(|| anyhow!("room was dropped"))?
1427                    .update(&mut cx, |this, _| {
1428                        this.live_kit
1429                            .as_ref()
1430                            .map(|live_kit| live_kit.room.publish_video_track(track))
1431                    })?
1432                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1433                    .await
1434            };
1435
1436            let publication = publish_track.await;
1437            this.upgrade()
1438                .ok_or_else(|| anyhow!("room was dropped"))?
1439                .update(&mut cx, |this, cx| {
1440                    let live_kit = this
1441                        .live_kit
1442                        .as_mut()
1443                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1444
1445                    let (canceled, muted) = if let LocalTrack::Pending {
1446                        publish_id: cur_publish_id,
1447                        muted,
1448                    } = &live_kit.screen_track
1449                    {
1450                        (*cur_publish_id != publish_id, *muted)
1451                    } else {
1452                        (true, false)
1453                    };
1454
1455                    match publication {
1456                        Ok(publication) => {
1457                            if canceled {
1458                                live_kit.room.unpublish_track(publication);
1459                            } else {
1460                                if muted {
1461                                    cx.background_executor()
1462                                        .spawn(publication.set_mute(muted))
1463                                        .detach();
1464                                }
1465                                live_kit.screen_track = LocalTrack::Published {
1466                                    track_publication: publication,
1467                                    muted,
1468                                };
1469                                cx.notify();
1470                            }
1471
1472                            Audio::play_sound(Sound::StartScreenshare, cx);
1473
1474                            Ok(())
1475                        }
1476                        Err(error) => {
1477                            if canceled {
1478                                Ok(())
1479                            } else {
1480                                live_kit.screen_track = LocalTrack::None;
1481                                cx.notify();
1482                                Err(error)
1483                            }
1484                        }
1485                    }
1486                })?
1487        })
1488    }
1489
1490    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1491        let should_mute = !self.is_muted(cx);
1492        if let Some(live_kit) = self.live_kit.as_mut() {
1493            if matches!(live_kit.microphone_track, LocalTrack::None) {
1494                return Ok(self.share_microphone(cx));
1495            }
1496
1497            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1498            live_kit.muted_by_user = should_mute;
1499
1500            if old_muted == true && live_kit.deafened == true {
1501                if let Some(task) = self.toggle_deafen(cx).ok() {
1502                    task.detach();
1503                }
1504            }
1505
1506            Ok(ret_task)
1507        } else {
1508            Err(anyhow!("LiveKit not started"))
1509        }
1510    }
1511
1512    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1513        if let Some(live_kit) = self.live_kit.as_mut() {
1514            (*live_kit).deafened = !live_kit.deafened;
1515
1516            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1517            // Context notification is sent within set_mute itself.
1518            let mut mute_task = None;
1519            // When deafening, mute user's mic as well.
1520            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1521            if live_kit.deafened || !live_kit.muted_by_user {
1522                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1523            };
1524            for participant in self.remote_participants.values() {
1525                for track in live_kit
1526                    .room
1527                    .remote_audio_track_publications(&participant.user.id.to_string())
1528                {
1529                    let deafened = live_kit.deafened;
1530                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1531                }
1532            }
1533
1534            Ok(cx.foreground_executor().spawn(async move {
1535                if let Some(mute_task) = mute_task {
1536                    mute_task.await?;
1537                }
1538                for task in tasks {
1539                    task.await?;
1540                }
1541                Ok(())
1542            }))
1543        } else {
1544            Err(anyhow!("LiveKit not started"))
1545        }
1546    }
1547
1548    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1549        if self.status.is_offline() {
1550            return Err(anyhow!("room is offline"));
1551        }
1552
1553        let live_kit = self
1554            .live_kit
1555            .as_mut()
1556            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1557        match mem::take(&mut live_kit.screen_track) {
1558            LocalTrack::None => Err(anyhow!("screen was not shared")),
1559            LocalTrack::Pending { .. } => {
1560                cx.notify();
1561                Ok(())
1562            }
1563            LocalTrack::Published {
1564                track_publication, ..
1565            } => {
1566                live_kit.room.unpublish_track(track_publication);
1567                cx.notify();
1568
1569                Audio::play_sound(Sound::StopScreenshare, cx);
1570                Ok(())
1571            }
1572        }
1573    }
1574
1575    #[cfg(any(test, feature = "test-support"))]
1576    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1577        self.live_kit
1578            .as_ref()
1579            .unwrap()
1580            .room
1581            .set_display_sources(sources);
1582    }
1583}
1584
1585struct LiveKitRoom {
1586    room: Arc<live_kit_client::Room>,
1587    screen_track: LocalTrack,
1588    microphone_track: LocalTrack,
1589    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1590    muted_by_user: bool,
1591    deafened: bool,
1592    speaking: bool,
1593    next_publish_id: usize,
1594    _maintain_room: Task<()>,
1595    _handle_updates: Task<()>,
1596}
1597
1598impl LiveKitRoom {
1599    fn set_mute(
1600        self: &mut LiveKitRoom,
1601        should_mute: bool,
1602        cx: &mut ModelContext<Room>,
1603    ) -> Result<(Task<Result<()>>, bool)> {
1604        if !should_mute {
1605            // clear user muting state.
1606            self.muted_by_user = false;
1607        }
1608
1609        let (result, old_muted) = match &mut self.microphone_track {
1610            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1611            LocalTrack::Pending { muted, .. } => {
1612                let old_muted = *muted;
1613                *muted = should_mute;
1614                cx.notify();
1615                Ok((Task::Ready(Some(Ok(()))), old_muted))
1616            }
1617            LocalTrack::Published {
1618                track_publication,
1619                muted,
1620            } => {
1621                let old_muted = *muted;
1622                *muted = should_mute;
1623                cx.notify();
1624                Ok((
1625                    cx.background_executor()
1626                        .spawn(track_publication.set_mute(*muted)),
1627                    old_muted,
1628                ))
1629            }
1630        }?;
1631
1632        if old_muted != should_mute {
1633            if should_mute {
1634                Audio::play_sound(Sound::Mute, cx);
1635            } else {
1636                Audio::play_sound(Sound::Unmute, cx);
1637            }
1638        }
1639
1640        Ok((result, old_muted))
1641    }
1642
1643    fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1644        if let LocalTrack::Published {
1645            track_publication, ..
1646        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1647        {
1648            self.room.unpublish_track(track_publication);
1649            cx.notify();
1650        }
1651
1652        if let LocalTrack::Published {
1653            track_publication, ..
1654        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1655        {
1656            self.room.unpublish_track(track_publication);
1657            cx.notify();
1658        }
1659    }
1660}
1661
1662enum LocalTrack {
1663    None,
1664    Pending {
1665        publish_id: usize,
1666        muted: bool,
1667    },
1668    Published {
1669        track_publication: LocalTrackPublication,
1670        muted: bool,
1671    },
1672}
1673
1674impl Default for LocalTrack {
1675    fn default() -> Self {
1676        Self::None
1677    }
1678}
1679
1680#[derive(Copy, Clone, PartialEq, Eq)]
1681pub enum RoomStatus {
1682    Online,
1683    Rejoining,
1684    Offline,
1685}
1686
1687impl RoomStatus {
1688    pub fn is_offline(&self) -> bool {
1689        matches!(self, RoomStatus::Offline)
1690    }
1691
1692    pub fn is_online(&self) -> bool {
1693        matches!(self, RoomStatus::Online)
1694    }
1695}