room.rs

   1use crate::{
   2    call_settings::CallSettings,
   3    participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
   4};
   5use anyhow::{anyhow, Result};
   6use audio::{Audio, Sound};
   7use client::{
   8    proto::{self, PeerId},
   9    Client, ParticipantIndex, TypedEnvelope, User, UserStore,
  10};
  11use collections::{BTreeMap, HashMap, HashSet};
  12use fs::Fs;
  13use futures::{FutureExt, StreamExt};
  14use gpui::{
  15    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  16};
  17use language::LanguageRegistry;
  18use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
  19use postage::{sink::Sink, stream::Stream, watch};
  20use project::Project;
  21use settings::Settings as _;
  22use std::{future::Future, mem, sync::Arc, time::Duration};
  23use util::{post_inc, ResultExt, TryFutureExt};
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27#[derive(Clone, Debug, PartialEq, Eq)]
  28pub enum Event {
  29    ParticipantLocationChanged {
  30        participant_id: proto::PeerId,
  31    },
  32    RemoteVideoTracksChanged {
  33        participant_id: proto::PeerId,
  34    },
  35    RemoteAudioTracksChanged {
  36        participant_id: proto::PeerId,
  37    },
  38    RemoteProjectShared {
  39        owner: Arc<User>,
  40        project_id: u64,
  41        worktree_root_names: Vec<String>,
  42    },
  43    RemoteProjectUnshared {
  44        project_id: u64,
  45    },
  46    RemoteProjectJoined {
  47        project_id: u64,
  48    },
  49    RemoteProjectInvitationDiscarded {
  50        project_id: u64,
  51    },
  52    Left,
  53}
  54
  55pub struct Room {
  56    id: u64,
  57    channel_id: Option<u64>,
  58    live_kit: Option<LiveKitRoom>,
  59    status: RoomStatus,
  60    shared_projects: HashSet<WeakModel<Project>>,
  61    joined_projects: HashSet<WeakModel<Project>>,
  62    local_participant: LocalParticipant,
  63    remote_participants: BTreeMap<u64, RemoteParticipant>,
  64    pending_participants: Vec<Arc<User>>,
  65    participant_user_ids: HashSet<u64>,
  66    pending_call_count: usize,
  67    leave_when_empty: bool,
  68    client: Arc<Client>,
  69    user_store: Model<UserStore>,
  70    follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
  71    client_subscriptions: Vec<client::Subscription>,
  72    _subscriptions: Vec<gpui::Subscription>,
  73    room_update_completed_tx: watch::Sender<Option<()>>,
  74    room_update_completed_rx: watch::Receiver<Option<()>>,
  75    pending_room_update: Option<Task<()>>,
  76    maintain_connection: Option<Task<Option<()>>>,
  77}
  78
  79impl EventEmitter<Event> for Room {}
  80
  81impl Room {
  82    pub fn channel_id(&self) -> Option<u64> {
  83        self.channel_id
  84    }
  85
  86    pub fn is_sharing_project(&self) -> bool {
  87        !self.shared_projects.is_empty()
  88    }
  89
  90    #[cfg(any(test, feature = "test-support"))]
  91    pub fn is_connected(&self) -> bool {
  92        if let Some(live_kit) = self.live_kit.as_ref() {
  93            matches!(
  94                *live_kit.room.status().borrow(),
  95                live_kit_client::ConnectionState::Connected { .. }
  96            )
  97        } else {
  98            false
  99        }
 100    }
 101
 102    fn new(
 103        id: u64,
 104        channel_id: Option<u64>,
 105        live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
 106        client: Arc<Client>,
 107        user_store: Model<UserStore>,
 108        cx: &mut ModelContext<Self>,
 109    ) -> Self {
 110        let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
 111            let room = live_kit_client::Room::new();
 112            let mut status = room.status();
 113            // Consume the initial status of the room.
 114            let _ = status.try_recv();
 115            let _maintain_room = cx.spawn(|this, mut cx| async move {
 116                while let Some(status) = status.next().await {
 117                    let this = if let Some(this) = this.upgrade() {
 118                        this
 119                    } else {
 120                        break;
 121                    };
 122
 123                    if status == live_kit_client::ConnectionState::Disconnected {
 124                        this.update(&mut cx, |this, cx| this.leave(cx).log_err())
 125                            .ok();
 126                        break;
 127                    }
 128                }
 129            });
 130
 131            let _handle_updates = cx.spawn({
 132                let room = room.clone();
 133                move |this, mut cx| async move {
 134                    let mut updates = room.updates();
 135                    while let Some(update) = updates.next().await {
 136                        let this = if let Some(this) = this.upgrade() {
 137                            this
 138                        } else {
 139                            break;
 140                        };
 141
 142                        this.update(&mut cx, |this, cx| {
 143                            this.live_kit_room_updated(update, cx).log_err()
 144                        })
 145                        .ok();
 146                    }
 147                }
 148            });
 149
 150            let connect = room.connect(&connection_info.server_url, &connection_info.token);
 151            cx.spawn(|this, mut cx| async move {
 152                connect.await?;
 153
 154                let is_read_only = this
 155                    .update(&mut cx, |room, _| room.read_only())
 156                    .unwrap_or(true);
 157
 158                if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only {
 159                    this.update(&mut cx, |this, cx| this.share_microphone(cx))?
 160                        .await?;
 161                }
 162
 163                anyhow::Ok(())
 164            })
 165            .detach_and_log_err(cx);
 166
 167            Some(LiveKitRoom {
 168                room,
 169                screen_track: LocalTrack::None,
 170                microphone_track: LocalTrack::None,
 171                next_publish_id: 0,
 172                muted_by_user: false,
 173                deafened: false,
 174                speaking: false,
 175                _maintain_room,
 176                _handle_updates,
 177            })
 178        } else {
 179            None
 180        };
 181
 182        let maintain_connection = cx.spawn({
 183            let client = client.clone();
 184            move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
 185        });
 186
 187        Audio::play_sound(Sound::Joined, cx);
 188
 189        let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
 190
 191        Self {
 192            id,
 193            channel_id,
 194            live_kit: live_kit_room,
 195            status: RoomStatus::Online,
 196            shared_projects: Default::default(),
 197            joined_projects: Default::default(),
 198            participant_user_ids: Default::default(),
 199            local_participant: Default::default(),
 200            remote_participants: Default::default(),
 201            pending_participants: Default::default(),
 202            pending_call_count: 0,
 203            client_subscriptions: vec![
 204                client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
 205            ],
 206            _subscriptions: vec![
 207                cx.on_release(Self::released),
 208                cx.on_app_quit(Self::app_will_quit),
 209            ],
 210            leave_when_empty: false,
 211            pending_room_update: None,
 212            client,
 213            user_store,
 214            follows_by_leader_id_project_id: Default::default(),
 215            maintain_connection: Some(maintain_connection),
 216            room_update_completed_tx,
 217            room_update_completed_rx,
 218        }
 219    }
 220
 221    pub(crate) fn create(
 222        called_user_id: u64,
 223        initial_project: Option<Model<Project>>,
 224        client: Arc<Client>,
 225        user_store: Model<UserStore>,
 226        cx: &mut AppContext,
 227    ) -> Task<Result<Model<Self>>> {
 228        cx.spawn(move |mut cx| async move {
 229            let response = client.request(proto::CreateRoom {}).await?;
 230            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 231            let room = cx.new_model(|cx| {
 232                let mut room = Self::new(
 233                    room_proto.id,
 234                    None,
 235                    response.live_kit_connection_info,
 236                    client,
 237                    user_store,
 238                    cx,
 239                );
 240                if let Some(participant) = room_proto.participants.first() {
 241                    room.local_participant.role = participant.role()
 242                }
 243                room
 244            })?;
 245
 246            let initial_project_id = if let Some(initial_project) = initial_project {
 247                let initial_project_id = room
 248                    .update(&mut cx, |room, cx| {
 249                        room.share_project(initial_project.clone(), cx)
 250                    })?
 251                    .await?;
 252                Some(initial_project_id)
 253            } else {
 254                None
 255            };
 256
 257            match room
 258                .update(&mut cx, |room, cx| {
 259                    room.leave_when_empty = true;
 260                    room.call(called_user_id, initial_project_id, cx)
 261                })?
 262                .await
 263            {
 264                Ok(()) => Ok(room),
 265                Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
 266            }
 267        })
 268    }
 269
 270    pub(crate) async fn join_channel(
 271        channel_id: u64,
 272        client: Arc<Client>,
 273        user_store: Model<UserStore>,
 274        cx: AsyncAppContext,
 275    ) -> Result<Model<Self>> {
 276        Self::from_join_response(
 277            client.request(proto::JoinChannel { channel_id }).await?,
 278            client,
 279            user_store,
 280            cx,
 281        )
 282    }
 283
 284    pub(crate) async fn join(
 285        room_id: u64,
 286        client: Arc<Client>,
 287        user_store: Model<UserStore>,
 288        cx: AsyncAppContext,
 289    ) -> Result<Model<Self>> {
 290        Self::from_join_response(
 291            client.request(proto::JoinRoom { id: room_id }).await?,
 292            client,
 293            user_store,
 294            cx,
 295        )
 296    }
 297
 298    fn released(&mut self, cx: &mut AppContext) {
 299        if self.status.is_online() {
 300            self.leave_internal(cx).detach_and_log_err(cx);
 301        }
 302    }
 303
 304    fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
 305        let task = if self.status.is_online() {
 306            let leave = self.leave_internal(cx);
 307            Some(cx.background_executor().spawn(async move {
 308                leave.await.log_err();
 309            }))
 310        } else {
 311            None
 312        };
 313
 314        async move {
 315            if let Some(task) = task {
 316                task.await;
 317            }
 318        }
 319    }
 320
 321    pub fn mute_on_join(cx: &AppContext) -> bool {
 322        CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
 323    }
 324
 325    fn from_join_response(
 326        response: proto::JoinRoomResponse,
 327        client: Arc<Client>,
 328        user_store: Model<UserStore>,
 329        mut cx: AsyncAppContext,
 330    ) -> Result<Model<Self>> {
 331        let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 332        let room = cx.new_model(|cx| {
 333            Self::new(
 334                room_proto.id,
 335                response.channel_id,
 336                response.live_kit_connection_info,
 337                client,
 338                user_store,
 339                cx,
 340            )
 341        })?;
 342        room.update(&mut cx, |room, cx| {
 343            room.leave_when_empty = room.channel_id.is_none();
 344            room.apply_room_update(room_proto, cx)?;
 345            anyhow::Ok(())
 346        })??;
 347        Ok(room)
 348    }
 349
 350    fn should_leave(&self) -> bool {
 351        self.leave_when_empty
 352            && self.pending_room_update.is_none()
 353            && self.pending_participants.is_empty()
 354            && self.remote_participants.is_empty()
 355            && self.pending_call_count == 0
 356    }
 357
 358    pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 359        cx.notify();
 360        cx.emit(Event::Left);
 361        self.leave_internal(cx)
 362    }
 363
 364    fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
 365        if self.status.is_offline() {
 366            return Task::ready(Err(anyhow!("room is offline")));
 367        }
 368
 369        log::info!("leaving room");
 370        Audio::play_sound(Sound::Leave, cx);
 371
 372        self.clear_state(cx);
 373
 374        let leave_room = self.client.request(proto::LeaveRoom {});
 375        cx.background_executor().spawn(async move {
 376            leave_room.await?;
 377            anyhow::Ok(())
 378        })
 379    }
 380
 381    pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
 382        for project in self.shared_projects.drain() {
 383            if let Some(project) = project.upgrade() {
 384                project.update(cx, |project, cx| {
 385                    project.unshare(cx).log_err();
 386                });
 387            }
 388        }
 389        for project in self.joined_projects.drain() {
 390            if let Some(project) = project.upgrade() {
 391                project.update(cx, |project, cx| {
 392                    project.disconnected_from_host(cx);
 393                    project.close(cx);
 394                });
 395            }
 396        }
 397
 398        self.status = RoomStatus::Offline;
 399        self.remote_participants.clear();
 400        self.pending_participants.clear();
 401        self.participant_user_ids.clear();
 402        self.client_subscriptions.clear();
 403        self.live_kit.take();
 404        self.pending_room_update.take();
 405        self.maintain_connection.take();
 406    }
 407
 408    async fn maintain_connection(
 409        this: WeakModel<Self>,
 410        client: Arc<Client>,
 411        mut cx: AsyncAppContext,
 412    ) -> Result<()> {
 413        let mut client_status = client.status();
 414        loop {
 415            let _ = client_status.try_recv();
 416            let is_connected = client_status.borrow().is_connected();
 417            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
 418            if !is_connected || client_status.next().await.is_some() {
 419                log::info!("detected client disconnection");
 420
 421                this.upgrade()
 422                    .ok_or_else(|| anyhow!("room was dropped"))?
 423                    .update(&mut cx, |this, cx| {
 424                        this.status = RoomStatus::Rejoining;
 425                        cx.notify();
 426                    })?;
 427
 428                // Wait for client to re-establish a connection to the server.
 429                {
 430                    let mut reconnection_timeout =
 431                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
 432                    let client_reconnection = async {
 433                        let mut remaining_attempts = 3;
 434                        while remaining_attempts > 0 {
 435                            if client_status.borrow().is_connected() {
 436                                log::info!("client reconnected, attempting to rejoin room");
 437
 438                                let Some(this) = this.upgrade() else { break };
 439                                match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
 440                                    Ok(task) => {
 441                                        if task.await.log_err().is_some() {
 442                                            return true;
 443                                        } else {
 444                                            remaining_attempts -= 1;
 445                                        }
 446                                    }
 447                                    Err(_app_dropped) => return false,
 448                                }
 449                            } else if client_status.borrow().is_signed_out() {
 450                                return false;
 451                            }
 452
 453                            log::info!(
 454                                "waiting for client status change, remaining attempts {}",
 455                                remaining_attempts
 456                            );
 457                            client_status.next().await;
 458                        }
 459                        false
 460                    }
 461                    .fuse();
 462                    futures::pin_mut!(client_reconnection);
 463
 464                    futures::select_biased! {
 465                        reconnected = client_reconnection => {
 466                            if reconnected {
 467                                log::info!("successfully reconnected to room");
 468                                // If we successfully joined the room, go back around the loop
 469                                // waiting for future connection status changes.
 470                                continue;
 471                            }
 472                        }
 473                        _ = reconnection_timeout => {
 474                            log::info!("room reconnection timeout expired");
 475                        }
 476                    }
 477                }
 478
 479                break;
 480            }
 481        }
 482
 483        // The client failed to re-establish a connection to the server
 484        // or an error occurred while trying to re-join the room. Either way
 485        // we leave the room and return an error.
 486        if let Some(this) = this.upgrade() {
 487            log::info!("reconnection failed, leaving room");
 488            let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
 489        }
 490        Err(anyhow!(
 491            "can't reconnect to room: client failed to re-establish connection"
 492        ))
 493    }
 494
 495    fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 496        let mut projects = HashMap::default();
 497        let mut reshared_projects = Vec::new();
 498        let mut rejoined_projects = Vec::new();
 499        self.shared_projects.retain(|project| {
 500            if let Some(handle) = project.upgrade() {
 501                let project = handle.read(cx);
 502                if let Some(project_id) = project.remote_id() {
 503                    projects.insert(project_id, handle.clone());
 504                    reshared_projects.push(proto::UpdateProject {
 505                        project_id,
 506                        worktrees: project.worktree_metadata_protos(cx),
 507                    });
 508                    return true;
 509                }
 510            }
 511            false
 512        });
 513        self.joined_projects.retain(|project| {
 514            if let Some(handle) = project.upgrade() {
 515                let project = handle.read(cx);
 516                if let Some(project_id) = project.remote_id() {
 517                    projects.insert(project_id, handle.clone());
 518                    rejoined_projects.push(proto::RejoinProject {
 519                        id: project_id,
 520                        worktrees: project
 521                            .worktrees()
 522                            .map(|worktree| {
 523                                let worktree = worktree.read(cx);
 524                                proto::RejoinWorktree {
 525                                    id: worktree.id().to_proto(),
 526                                    scan_id: worktree.completed_scan_id() as u64,
 527                                }
 528                            })
 529                            .collect(),
 530                    });
 531                }
 532                return true;
 533            }
 534            false
 535        });
 536
 537        let response = self.client.request_envelope(proto::RejoinRoom {
 538            id: self.id,
 539            reshared_projects,
 540            rejoined_projects,
 541        });
 542
 543        cx.spawn(|this, mut cx| async move {
 544            let response = response.await?;
 545            let message_id = response.message_id;
 546            let response = response.payload;
 547            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
 548            this.update(&mut cx, |this, cx| {
 549                this.status = RoomStatus::Online;
 550                this.apply_room_update(room_proto, cx)?;
 551
 552                for reshared_project in response.reshared_projects {
 553                    if let Some(project) = projects.get(&reshared_project.id) {
 554                        project.update(cx, |project, cx| {
 555                            project.reshared(reshared_project, cx).log_err();
 556                        });
 557                    }
 558                }
 559
 560                for rejoined_project in response.rejoined_projects {
 561                    if let Some(project) = projects.get(&rejoined_project.id) {
 562                        project.update(cx, |project, cx| {
 563                            project.rejoined(rejoined_project, message_id, cx).log_err();
 564                        });
 565                    }
 566                }
 567
 568                anyhow::Ok(())
 569            })?
 570        })
 571    }
 572
 573    pub fn id(&self) -> u64 {
 574        self.id
 575    }
 576
 577    pub fn status(&self) -> RoomStatus {
 578        self.status
 579    }
 580
 581    pub fn local_participant(&self) -> &LocalParticipant {
 582        &self.local_participant
 583    }
 584
 585    pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
 586        &self.remote_participants
 587    }
 588
 589    pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
 590        self.remote_participants
 591            .values()
 592            .find(|p| p.peer_id == peer_id)
 593    }
 594
 595    pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
 596        self.remote_participants
 597            .get(&user_id)
 598            .map(|participant| participant.role)
 599    }
 600
 601    pub fn local_participant_is_admin(&self) -> bool {
 602        self.local_participant.role == proto::ChannelRole::Admin
 603    }
 604
 605    pub fn set_participant_role(
 606        &mut self,
 607        user_id: u64,
 608        role: proto::ChannelRole,
 609        cx: &ModelContext<Self>,
 610    ) -> Task<Result<()>> {
 611        let client = self.client.clone();
 612        let room_id = self.id;
 613        let role = role.into();
 614        cx.spawn(|_, _| async move {
 615            client
 616                .request(proto::SetRoomParticipantRole {
 617                    room_id,
 618                    user_id,
 619                    role,
 620                })
 621                .await
 622                .map(|_| ())
 623        })
 624    }
 625
 626    pub fn pending_participants(&self) -> &[Arc<User>] {
 627        &self.pending_participants
 628    }
 629
 630    pub fn contains_participant(&self, user_id: u64) -> bool {
 631        self.participant_user_ids.contains(&user_id)
 632    }
 633
 634    pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
 635        self.follows_by_leader_id_project_id
 636            .get(&(leader_id, project_id))
 637            .map_or(&[], |v| v.as_slice())
 638    }
 639
 640    /// Returns the most 'active' projects, defined as most people in the project
 641    pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
 642        let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
 643        for participant in self.remote_participants.values() {
 644            match participant.location {
 645                ParticipantLocation::SharedProject { project_id } => {
 646                    project_hosts_and_guest_counts
 647                        .entry(project_id)
 648                        .or_default()
 649                        .1 += 1;
 650                }
 651                ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
 652            }
 653            for project in &participant.projects {
 654                project_hosts_and_guest_counts
 655                    .entry(project.id)
 656                    .or_default()
 657                    .0 = Some(participant.user.id);
 658            }
 659        }
 660
 661        if let Some(user) = self.user_store.read(cx).current_user() {
 662            for project in &self.local_participant.projects {
 663                project_hosts_and_guest_counts
 664                    .entry(project.id)
 665                    .or_default()
 666                    .0 = Some(user.id);
 667            }
 668        }
 669
 670        project_hosts_and_guest_counts
 671            .into_iter()
 672            .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
 673            .max_by_key(|(_, _, guest_count)| *guest_count)
 674            .map(|(id, host, _)| (id, host))
 675    }
 676
 677    async fn handle_room_updated(
 678        this: Model<Self>,
 679        envelope: TypedEnvelope<proto::RoomUpdated>,
 680        _: Arc<Client>,
 681        mut cx: AsyncAppContext,
 682    ) -> Result<()> {
 683        let room = envelope
 684            .payload
 685            .room
 686            .ok_or_else(|| anyhow!("invalid room"))?;
 687        this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
 688    }
 689
 690    fn apply_room_update(
 691        &mut self,
 692        mut room: proto::Room,
 693        cx: &mut ModelContext<Self>,
 694    ) -> Result<()> {
 695        // Filter ourselves out from the room's participants.
 696        let local_participant_ix = room
 697            .participants
 698            .iter()
 699            .position(|participant| Some(participant.user_id) == self.client.user_id());
 700        let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
 701
 702        let pending_participant_user_ids = room
 703            .pending_participants
 704            .iter()
 705            .map(|p| p.user_id)
 706            .collect::<Vec<_>>();
 707
 708        let remote_participant_user_ids = room
 709            .participants
 710            .iter()
 711            .map(|p| p.user_id)
 712            .collect::<Vec<_>>();
 713
 714        let (remote_participants, pending_participants) =
 715            self.user_store.update(cx, move |user_store, cx| {
 716                (
 717                    user_store.get_users(remote_participant_user_ids, cx),
 718                    user_store.get_users(pending_participant_user_ids, cx),
 719                )
 720            });
 721
 722        self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
 723            let (remote_participants, pending_participants) =
 724                futures::join!(remote_participants, pending_participants);
 725
 726            this.update(&mut cx, |this, cx| {
 727                this.participant_user_ids.clear();
 728
 729                if let Some(participant) = local_participant {
 730                    let role = participant.role();
 731                    this.local_participant.projects = participant.projects;
 732                    if this.local_participant.role != role {
 733                        this.local_participant.role = role;
 734
 735                        if role == proto::ChannelRole::Guest {
 736                            for project in mem::take(&mut this.shared_projects) {
 737                                if let Some(project) = project.upgrade() {
 738                                    this.unshare_project(project, cx).log_err();
 739                                }
 740                            }
 741                            this.local_participant.projects.clear();
 742                            if let Some(live_kit_room) = &mut this.live_kit {
 743                                live_kit_room.stop_publishing(cx);
 744                            }
 745                        }
 746
 747                        this.joined_projects.retain(|project| {
 748                            if let Some(project) = project.upgrade() {
 749                                project.update(cx, |project, cx| project.set_role(role, cx));
 750                                true
 751                            } else {
 752                                false
 753                            }
 754                        });
 755                    }
 756                } else {
 757                    this.local_participant.projects.clear();
 758                }
 759
 760                if let Some(participants) = remote_participants.log_err() {
 761                    for (participant, user) in room.participants.into_iter().zip(participants) {
 762                        let Some(peer_id) = participant.peer_id else {
 763                            continue;
 764                        };
 765                        let participant_index = ParticipantIndex(participant.participant_index);
 766                        this.participant_user_ids.insert(participant.user_id);
 767
 768                        let old_projects = this
 769                            .remote_participants
 770                            .get(&participant.user_id)
 771                            .into_iter()
 772                            .flat_map(|existing| &existing.projects)
 773                            .map(|project| project.id)
 774                            .collect::<HashSet<_>>();
 775                        let new_projects = participant
 776                            .projects
 777                            .iter()
 778                            .map(|project| project.id)
 779                            .collect::<HashSet<_>>();
 780
 781                        for project in &participant.projects {
 782                            if !old_projects.contains(&project.id) {
 783                                cx.emit(Event::RemoteProjectShared {
 784                                    owner: user.clone(),
 785                                    project_id: project.id,
 786                                    worktree_root_names: project.worktree_root_names.clone(),
 787                                });
 788                            }
 789                        }
 790
 791                        for unshared_project_id in old_projects.difference(&new_projects) {
 792                            this.joined_projects.retain(|project| {
 793                                if let Some(project) = project.upgrade() {
 794                                    project.update(cx, |project, cx| {
 795                                        if project.remote_id() == Some(*unshared_project_id) {
 796                                            project.disconnected_from_host(cx);
 797                                            false
 798                                        } else {
 799                                            true
 800                                        }
 801                                    })
 802                                } else {
 803                                    false
 804                                }
 805                            });
 806                            cx.emit(Event::RemoteProjectUnshared {
 807                                project_id: *unshared_project_id,
 808                            });
 809                        }
 810
 811                        let role = participant.role();
 812                        let location = ParticipantLocation::from_proto(participant.location)
 813                            .unwrap_or(ParticipantLocation::External);
 814                        if let Some(remote_participant) =
 815                            this.remote_participants.get_mut(&participant.user_id)
 816                        {
 817                            remote_participant.peer_id = peer_id;
 818                            remote_participant.projects = participant.projects;
 819                            remote_participant.participant_index = participant_index;
 820                            if location != remote_participant.location
 821                                || role != remote_participant.role
 822                            {
 823                                remote_participant.location = location;
 824                                remote_participant.role = role;
 825                                cx.emit(Event::ParticipantLocationChanged {
 826                                    participant_id: peer_id,
 827                                });
 828                            }
 829                        } else {
 830                            this.remote_participants.insert(
 831                                participant.user_id,
 832                                RemoteParticipant {
 833                                    user: user.clone(),
 834                                    participant_index,
 835                                    peer_id,
 836                                    projects: participant.projects,
 837                                    location,
 838                                    role,
 839                                    muted: true,
 840                                    speaking: false,
 841                                    video_tracks: Default::default(),
 842                                    audio_tracks: Default::default(),
 843                                },
 844                            );
 845
 846                            Audio::play_sound(Sound::Joined, cx);
 847
 848                            if let Some(live_kit) = this.live_kit.as_ref() {
 849                                let video_tracks =
 850                                    live_kit.room.remote_video_tracks(&user.id.to_string());
 851                                let audio_tracks =
 852                                    live_kit.room.remote_audio_tracks(&user.id.to_string());
 853                                let publications = live_kit
 854                                    .room
 855                                    .remote_audio_track_publications(&user.id.to_string());
 856
 857                                for track in video_tracks {
 858                                    this.live_kit_room_updated(
 859                                        RoomUpdate::SubscribedToRemoteVideoTrack(track),
 860                                        cx,
 861                                    )
 862                                    .log_err();
 863                                }
 864
 865                                for (track, publication) in
 866                                    audio_tracks.iter().zip(publications.iter())
 867                                {
 868                                    this.live_kit_room_updated(
 869                                        RoomUpdate::SubscribedToRemoteAudioTrack(
 870                                            track.clone(),
 871                                            publication.clone(),
 872                                        ),
 873                                        cx,
 874                                    )
 875                                    .log_err();
 876                                }
 877                            }
 878                        }
 879                    }
 880
 881                    this.remote_participants.retain(|user_id, participant| {
 882                        if this.participant_user_ids.contains(user_id) {
 883                            true
 884                        } else {
 885                            for project in &participant.projects {
 886                                cx.emit(Event::RemoteProjectUnshared {
 887                                    project_id: project.id,
 888                                });
 889                            }
 890                            false
 891                        }
 892                    });
 893                }
 894
 895                if let Some(pending_participants) = pending_participants.log_err() {
 896                    this.pending_participants = pending_participants;
 897                    for participant in &this.pending_participants {
 898                        this.participant_user_ids.insert(participant.id);
 899                    }
 900                }
 901
 902                this.follows_by_leader_id_project_id.clear();
 903                for follower in room.followers {
 904                    let project_id = follower.project_id;
 905                    let (leader, follower) = match (follower.leader_id, follower.follower_id) {
 906                        (Some(leader), Some(follower)) => (leader, follower),
 907
 908                        _ => {
 909                            log::error!("Follower message {follower:?} missing some state");
 910                            continue;
 911                        }
 912                    };
 913
 914                    let list = this
 915                        .follows_by_leader_id_project_id
 916                        .entry((leader, project_id))
 917                        .or_insert(Vec::new());
 918                    if !list.contains(&follower) {
 919                        list.push(follower);
 920                    }
 921                }
 922
 923                this.pending_room_update.take();
 924                if this.should_leave() {
 925                    log::info!("room is empty, leaving");
 926                    let _ = this.leave(cx);
 927                }
 928
 929                this.user_store.update(cx, |user_store, cx| {
 930                    let participant_indices_by_user_id = this
 931                        .remote_participants
 932                        .iter()
 933                        .map(|(user_id, participant)| (*user_id, participant.participant_index))
 934                        .collect();
 935                    user_store.set_participant_indices(participant_indices_by_user_id, cx);
 936                });
 937
 938                this.check_invariants();
 939                this.room_update_completed_tx.try_send(Some(())).ok();
 940                cx.notify();
 941            })
 942            .ok();
 943        }));
 944
 945        cx.notify();
 946        Ok(())
 947    }
 948
 949    pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
 950        let mut done_rx = self.room_update_completed_rx.clone();
 951        async move {
 952            while let Some(result) = done_rx.next().await {
 953                if result.is_some() {
 954                    break;
 955                }
 956            }
 957        }
 958    }
 959
 960    fn live_kit_room_updated(
 961        &mut self,
 962        update: RoomUpdate,
 963        cx: &mut ModelContext<Self>,
 964    ) -> Result<()> {
 965        match update {
 966            RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
 967                let user_id = track.publisher_id().parse()?;
 968                let track_id = track.sid().to_string();
 969                let participant = self
 970                    .remote_participants
 971                    .get_mut(&user_id)
 972                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
 973                participant.video_tracks.insert(track_id.clone(), track);
 974                cx.emit(Event::RemoteVideoTracksChanged {
 975                    participant_id: participant.peer_id,
 976                });
 977            }
 978
 979            RoomUpdate::UnsubscribedFromRemoteVideoTrack {
 980                publisher_id,
 981                track_id,
 982            } => {
 983                let user_id = publisher_id.parse()?;
 984                let participant = self
 985                    .remote_participants
 986                    .get_mut(&user_id)
 987                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
 988                participant.video_tracks.remove(&track_id);
 989                cx.emit(Event::RemoteVideoTracksChanged {
 990                    participant_id: participant.peer_id,
 991                });
 992            }
 993
 994            RoomUpdate::ActiveSpeakersChanged { speakers } => {
 995                let mut speaker_ids = speakers
 996                    .into_iter()
 997                    .filter_map(|speaker_sid| speaker_sid.parse().ok())
 998                    .collect::<Vec<u64>>();
 999                speaker_ids.sort_unstable();
1000                for (sid, participant) in &mut self.remote_participants {
1001                    if let Ok(_) = speaker_ids.binary_search(sid) {
1002                        participant.speaking = true;
1003                    } else {
1004                        participant.speaking = false;
1005                    }
1006                }
1007                if let Some(id) = self.client.user_id() {
1008                    if let Some(room) = &mut self.live_kit {
1009                        if let Ok(_) = speaker_ids.binary_search(&id) {
1010                            room.speaking = true;
1011                        } else {
1012                            room.speaking = false;
1013                        }
1014                    }
1015                }
1016            }
1017
1018            RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1019                let mut found = false;
1020                for participant in &mut self.remote_participants.values_mut() {
1021                    for track in participant.audio_tracks.values() {
1022                        if track.sid() == track_id {
1023                            found = true;
1024                            break;
1025                        }
1026                    }
1027                    if found {
1028                        participant.muted = muted;
1029                        break;
1030                    }
1031                }
1032            }
1033
1034            RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1035                let user_id = track.publisher_id().parse()?;
1036                let track_id = track.sid().to_string();
1037                let participant = self
1038                    .remote_participants
1039                    .get_mut(&user_id)
1040                    .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1041                participant.audio_tracks.insert(track_id.clone(), track);
1042                participant.muted = publication.is_muted();
1043
1044                cx.emit(Event::RemoteAudioTracksChanged {
1045                    participant_id: participant.peer_id,
1046                });
1047            }
1048
1049            RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1050                publisher_id,
1051                track_id,
1052            } => {
1053                let user_id = publisher_id.parse()?;
1054                let participant = self
1055                    .remote_participants
1056                    .get_mut(&user_id)
1057                    .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1058                participant.audio_tracks.remove(&track_id);
1059                cx.emit(Event::RemoteAudioTracksChanged {
1060                    participant_id: participant.peer_id,
1061                });
1062            }
1063
1064            RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1065                log::info!("unpublished audio track {}", publication.sid());
1066                if let Some(room) = &mut self.live_kit {
1067                    room.microphone_track = LocalTrack::None;
1068                }
1069            }
1070
1071            RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1072                log::info!("unpublished video track {}", publication.sid());
1073                if let Some(room) = &mut self.live_kit {
1074                    room.screen_track = LocalTrack::None;
1075                }
1076            }
1077
1078            RoomUpdate::LocalAudioTrackPublished { publication } => {
1079                log::info!("published audio track {}", publication.sid());
1080            }
1081
1082            RoomUpdate::LocalVideoTrackPublished { publication } => {
1083                log::info!("published video track {}", publication.sid());
1084            }
1085        }
1086
1087        cx.notify();
1088        Ok(())
1089    }
1090
1091    fn check_invariants(&self) {
1092        #[cfg(any(test, feature = "test-support"))]
1093        {
1094            for participant in self.remote_participants.values() {
1095                assert!(self.participant_user_ids.contains(&participant.user.id));
1096                assert_ne!(participant.user.id, self.client.user_id().unwrap());
1097            }
1098
1099            for participant in &self.pending_participants {
1100                assert!(self.participant_user_ids.contains(&participant.id));
1101                assert_ne!(participant.id, self.client.user_id().unwrap());
1102            }
1103
1104            assert_eq!(
1105                self.participant_user_ids.len(),
1106                self.remote_participants.len() + self.pending_participants.len()
1107            );
1108        }
1109    }
1110
1111    pub(crate) fn call(
1112        &mut self,
1113        called_user_id: u64,
1114        initial_project_id: Option<u64>,
1115        cx: &mut ModelContext<Self>,
1116    ) -> Task<Result<()>> {
1117        if self.status.is_offline() {
1118            return Task::ready(Err(anyhow!("room is offline")));
1119        }
1120
1121        cx.notify();
1122        let client = self.client.clone();
1123        let room_id = self.id;
1124        self.pending_call_count += 1;
1125        cx.spawn(move |this, mut cx| async move {
1126            let result = client
1127                .request(proto::Call {
1128                    room_id,
1129                    called_user_id,
1130                    initial_project_id,
1131                })
1132                .await;
1133            this.update(&mut cx, |this, cx| {
1134                this.pending_call_count -= 1;
1135                if this.should_leave() {
1136                    this.leave(cx).detach_and_log_err(cx);
1137                }
1138            })?;
1139            result?;
1140            Ok(())
1141        })
1142    }
1143
1144    pub fn join_project(
1145        &mut self,
1146        id: u64,
1147        language_registry: Arc<LanguageRegistry>,
1148        fs: Arc<dyn Fs>,
1149        cx: &mut ModelContext<Self>,
1150    ) -> Task<Result<Model<Project>>> {
1151        let client = self.client.clone();
1152        let user_store = self.user_store.clone();
1153        let role = self.local_participant.role;
1154        cx.emit(Event::RemoteProjectJoined { project_id: id });
1155        cx.spawn(move |this, mut cx| async move {
1156            let project = Project::remote(
1157                id,
1158                client,
1159                user_store,
1160                language_registry,
1161                fs,
1162                role,
1163                cx.clone(),
1164            )
1165            .await?;
1166
1167            this.update(&mut cx, |this, cx| {
1168                this.joined_projects.retain(|project| {
1169                    if let Some(project) = project.upgrade() {
1170                        !project.read(cx).is_disconnected()
1171                    } else {
1172                        false
1173                    }
1174                });
1175                this.joined_projects.insert(project.downgrade());
1176            })?;
1177            Ok(project)
1178        })
1179    }
1180
1181    pub(crate) fn share_project(
1182        &mut self,
1183        project: Model<Project>,
1184        cx: &mut ModelContext<Self>,
1185    ) -> Task<Result<u64>> {
1186        if let Some(project_id) = project.read(cx).remote_id() {
1187            return Task::ready(Ok(project_id));
1188        }
1189
1190        let request = self.client.request(proto::ShareProject {
1191            room_id: self.id(),
1192            worktrees: project.read(cx).worktree_metadata_protos(cx),
1193        });
1194        cx.spawn(|this, mut cx| async move {
1195            let response = request.await?;
1196
1197            project.update(&mut cx, |project, cx| {
1198                project.shared(response.project_id, cx)
1199            })??;
1200
1201            // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1202            this.update(&mut cx, |this, cx| {
1203                this.shared_projects.insert(project.downgrade());
1204                let active_project = this.local_participant.active_project.as_ref();
1205                if active_project.map_or(false, |location| *location == project) {
1206                    this.set_location(Some(&project), cx)
1207                } else {
1208                    Task::ready(Ok(()))
1209                }
1210            })?
1211            .await?;
1212
1213            Ok(response.project_id)
1214        })
1215    }
1216
1217    pub(crate) fn unshare_project(
1218        &mut self,
1219        project: Model<Project>,
1220        cx: &mut ModelContext<Self>,
1221    ) -> Result<()> {
1222        let project_id = match project.read(cx).remote_id() {
1223            Some(project_id) => project_id,
1224            None => return Ok(()),
1225        };
1226
1227        self.client.send(proto::UnshareProject { project_id })?;
1228        project.update(cx, |this, cx| this.unshare(cx))
1229    }
1230
1231    pub(crate) fn set_location(
1232        &mut self,
1233        project: Option<&Model<Project>>,
1234        cx: &mut ModelContext<Self>,
1235    ) -> Task<Result<()>> {
1236        if self.status.is_offline() {
1237            return Task::ready(Err(anyhow!("room is offline")));
1238        }
1239
1240        let client = self.client.clone();
1241        let room_id = self.id;
1242        let location = if let Some(project) = project {
1243            self.local_participant.active_project = Some(project.downgrade());
1244            if let Some(project_id) = project.read(cx).remote_id() {
1245                proto::participant_location::Variant::SharedProject(
1246                    proto::participant_location::SharedProject { id: project_id },
1247                )
1248            } else {
1249                proto::participant_location::Variant::UnsharedProject(
1250                    proto::participant_location::UnsharedProject {},
1251                )
1252            }
1253        } else {
1254            self.local_participant.active_project = None;
1255            proto::participant_location::Variant::External(proto::participant_location::External {})
1256        };
1257
1258        cx.notify();
1259        cx.background_executor().spawn(async move {
1260            client
1261                .request(proto::UpdateParticipantLocation {
1262                    room_id,
1263                    location: Some(proto::ParticipantLocation {
1264                        variant: Some(location),
1265                    }),
1266                })
1267                .await?;
1268            Ok(())
1269        })
1270    }
1271
1272    pub fn is_screen_sharing(&self) -> bool {
1273        self.live_kit.as_ref().map_or(false, |live_kit| {
1274            !matches!(live_kit.screen_track, LocalTrack::None)
1275        })
1276    }
1277
1278    pub fn is_sharing_mic(&self) -> bool {
1279        self.live_kit.as_ref().map_or(false, |live_kit| {
1280            !matches!(live_kit.microphone_track, LocalTrack::None)
1281        })
1282    }
1283
1284    pub fn is_muted(&self, cx: &AppContext) -> bool {
1285        self.live_kit
1286            .as_ref()
1287            .and_then(|live_kit| match &live_kit.microphone_track {
1288                LocalTrack::None => Some(Self::mute_on_join(cx)),
1289                LocalTrack::Pending { muted, .. } => Some(*muted),
1290                LocalTrack::Published { muted, .. } => Some(*muted),
1291            })
1292            .unwrap_or(false)
1293    }
1294
1295    pub fn read_only(&self) -> bool {
1296        !(self.local_participant().role == proto::ChannelRole::Member
1297            || self.local_participant().role == proto::ChannelRole::Admin)
1298    }
1299
1300    pub fn is_speaking(&self) -> bool {
1301        self.live_kit
1302            .as_ref()
1303            .map_or(false, |live_kit| live_kit.speaking)
1304    }
1305
1306    pub fn is_deafened(&self) -> Option<bool> {
1307        self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1308    }
1309
1310    #[track_caller]
1311    pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1312        if self.status.is_offline() {
1313            return Task::ready(Err(anyhow!("room is offline")));
1314        } else if self.is_sharing_mic() {
1315            return Task::ready(Err(anyhow!("microphone was already shared")));
1316        }
1317
1318        let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1319            let publish_id = post_inc(&mut live_kit.next_publish_id);
1320            live_kit.microphone_track = LocalTrack::Pending {
1321                publish_id,
1322                muted: false,
1323            };
1324            cx.notify();
1325            publish_id
1326        } else {
1327            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1328        };
1329
1330        cx.spawn(move |this, mut cx| async move {
1331            let publish_track = async {
1332                let track = LocalAudioTrack::create();
1333                this.upgrade()
1334                    .ok_or_else(|| anyhow!("room was dropped"))?
1335                    .update(&mut cx, |this, _| {
1336                        this.live_kit
1337                            .as_ref()
1338                            .map(|live_kit| live_kit.room.publish_audio_track(track))
1339                    })?
1340                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1341                    .await
1342            };
1343            let publication = publish_track.await;
1344            this.upgrade()
1345                .ok_or_else(|| anyhow!("room was dropped"))?
1346                .update(&mut cx, |this, cx| {
1347                    let live_kit = this
1348                        .live_kit
1349                        .as_mut()
1350                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1351
1352                    let (canceled, muted) = if let LocalTrack::Pending {
1353                        publish_id: cur_publish_id,
1354                        muted,
1355                    } = &live_kit.microphone_track
1356                    {
1357                        (*cur_publish_id != publish_id, *muted)
1358                    } else {
1359                        (true, false)
1360                    };
1361
1362                    match publication {
1363                        Ok(publication) => {
1364                            if canceled {
1365                                live_kit.room.unpublish_track(publication);
1366                            } else {
1367                                if muted {
1368                                    cx.background_executor()
1369                                        .spawn(publication.set_mute(muted))
1370                                        .detach();
1371                                }
1372                                live_kit.microphone_track = LocalTrack::Published {
1373                                    track_publication: publication,
1374                                    muted,
1375                                };
1376                                cx.notify();
1377                            }
1378                            Ok(())
1379                        }
1380                        Err(error) => {
1381                            if canceled {
1382                                Ok(())
1383                            } else {
1384                                live_kit.microphone_track = LocalTrack::None;
1385                                cx.notify();
1386                                Err(error)
1387                            }
1388                        }
1389                    }
1390                })?
1391        })
1392    }
1393
1394    pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1395        if self.status.is_offline() {
1396            return Task::ready(Err(anyhow!("room is offline")));
1397        } else if self.is_screen_sharing() {
1398            return Task::ready(Err(anyhow!("screen was already shared")));
1399        }
1400
1401        let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1402            let publish_id = post_inc(&mut live_kit.next_publish_id);
1403            live_kit.screen_track = LocalTrack::Pending {
1404                publish_id,
1405                muted: false,
1406            };
1407            cx.notify();
1408            (live_kit.room.display_sources(), publish_id)
1409        } else {
1410            return Task::ready(Err(anyhow!("live-kit was not initialized")));
1411        };
1412
1413        cx.spawn(move |this, mut cx| async move {
1414            let publish_track = async {
1415                let displays = displays.await?;
1416                let display = displays
1417                    .first()
1418                    .ok_or_else(|| anyhow!("no display found"))?;
1419                let track = LocalVideoTrack::screen_share_for_display(&display);
1420                this.upgrade()
1421                    .ok_or_else(|| anyhow!("room was dropped"))?
1422                    .update(&mut cx, |this, _| {
1423                        this.live_kit
1424                            .as_ref()
1425                            .map(|live_kit| live_kit.room.publish_video_track(track))
1426                    })?
1427                    .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1428                    .await
1429            };
1430
1431            let publication = publish_track.await;
1432            this.upgrade()
1433                .ok_or_else(|| anyhow!("room was dropped"))?
1434                .update(&mut cx, |this, cx| {
1435                    let live_kit = this
1436                        .live_kit
1437                        .as_mut()
1438                        .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1439
1440                    let (canceled, muted) = if let LocalTrack::Pending {
1441                        publish_id: cur_publish_id,
1442                        muted,
1443                    } = &live_kit.screen_track
1444                    {
1445                        (*cur_publish_id != publish_id, *muted)
1446                    } else {
1447                        (true, false)
1448                    };
1449
1450                    match publication {
1451                        Ok(publication) => {
1452                            if canceled {
1453                                live_kit.room.unpublish_track(publication);
1454                            } else {
1455                                if muted {
1456                                    cx.background_executor()
1457                                        .spawn(publication.set_mute(muted))
1458                                        .detach();
1459                                }
1460                                live_kit.screen_track = LocalTrack::Published {
1461                                    track_publication: publication,
1462                                    muted,
1463                                };
1464                                cx.notify();
1465                            }
1466
1467                            Audio::play_sound(Sound::StartScreenshare, cx);
1468
1469                            Ok(())
1470                        }
1471                        Err(error) => {
1472                            if canceled {
1473                                Ok(())
1474                            } else {
1475                                live_kit.screen_track = LocalTrack::None;
1476                                cx.notify();
1477                                Err(error)
1478                            }
1479                        }
1480                    }
1481                })?
1482        })
1483    }
1484
1485    pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1486        let should_mute = !self.is_muted(cx);
1487        if let Some(live_kit) = self.live_kit.as_mut() {
1488            if matches!(live_kit.microphone_track, LocalTrack::None) {
1489                return Ok(self.share_microphone(cx));
1490            }
1491
1492            let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1493            live_kit.muted_by_user = should_mute;
1494
1495            if old_muted == true && live_kit.deafened == true {
1496                if let Some(task) = self.toggle_deafen(cx).ok() {
1497                    task.detach();
1498                }
1499            }
1500
1501            Ok(ret_task)
1502        } else {
1503            Err(anyhow!("LiveKit not started"))
1504        }
1505    }
1506
1507    pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1508        if let Some(live_kit) = self.live_kit.as_mut() {
1509            (*live_kit).deafened = !live_kit.deafened;
1510
1511            let mut tasks = Vec::with_capacity(self.remote_participants.len());
1512            // Context notification is sent within set_mute itself.
1513            let mut mute_task = None;
1514            // When deafening, mute user's mic as well.
1515            // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1516            if live_kit.deafened || !live_kit.muted_by_user {
1517                mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1518            };
1519            for participant in self.remote_participants.values() {
1520                for track in live_kit
1521                    .room
1522                    .remote_audio_track_publications(&participant.user.id.to_string())
1523                {
1524                    let deafened = live_kit.deafened;
1525                    tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1526                }
1527            }
1528
1529            Ok(cx.foreground_executor().spawn(async move {
1530                if let Some(mute_task) = mute_task {
1531                    mute_task.await?;
1532                }
1533                for task in tasks {
1534                    task.await?;
1535                }
1536                Ok(())
1537            }))
1538        } else {
1539            Err(anyhow!("LiveKit not started"))
1540        }
1541    }
1542
1543    pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1544        if self.status.is_offline() {
1545            return Err(anyhow!("room is offline"));
1546        }
1547
1548        let live_kit = self
1549            .live_kit
1550            .as_mut()
1551            .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1552        match mem::take(&mut live_kit.screen_track) {
1553            LocalTrack::None => Err(anyhow!("screen was not shared")),
1554            LocalTrack::Pending { .. } => {
1555                cx.notify();
1556                Ok(())
1557            }
1558            LocalTrack::Published {
1559                track_publication, ..
1560            } => {
1561                live_kit.room.unpublish_track(track_publication);
1562                cx.notify();
1563
1564                Audio::play_sound(Sound::StopScreenshare, cx);
1565                Ok(())
1566            }
1567        }
1568    }
1569
1570    #[cfg(any(test, feature = "test-support"))]
1571    pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1572        self.live_kit
1573            .as_ref()
1574            .unwrap()
1575            .room
1576            .set_display_sources(sources);
1577    }
1578}
1579
1580struct LiveKitRoom {
1581    room: Arc<live_kit_client::Room>,
1582    screen_track: LocalTrack,
1583    microphone_track: LocalTrack,
1584    /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1585    muted_by_user: bool,
1586    deafened: bool,
1587    speaking: bool,
1588    next_publish_id: usize,
1589    _maintain_room: Task<()>,
1590    _handle_updates: Task<()>,
1591}
1592
1593impl LiveKitRoom {
1594    fn set_mute(
1595        self: &mut LiveKitRoom,
1596        should_mute: bool,
1597        cx: &mut ModelContext<Room>,
1598    ) -> Result<(Task<Result<()>>, bool)> {
1599        if !should_mute {
1600            // clear user muting state.
1601            self.muted_by_user = false;
1602        }
1603
1604        let (result, old_muted) = match &mut self.microphone_track {
1605            LocalTrack::None => Err(anyhow!("microphone was not shared")),
1606            LocalTrack::Pending { muted, .. } => {
1607                let old_muted = *muted;
1608                *muted = should_mute;
1609                cx.notify();
1610                Ok((Task::Ready(Some(Ok(()))), old_muted))
1611            }
1612            LocalTrack::Published {
1613                track_publication,
1614                muted,
1615            } => {
1616                let old_muted = *muted;
1617                *muted = should_mute;
1618                cx.notify();
1619                Ok((
1620                    cx.background_executor()
1621                        .spawn(track_publication.set_mute(*muted)),
1622                    old_muted,
1623                ))
1624            }
1625        }?;
1626
1627        if old_muted != should_mute {
1628            if should_mute {
1629                Audio::play_sound(Sound::Mute, cx);
1630            } else {
1631                Audio::play_sound(Sound::Unmute, cx);
1632            }
1633        }
1634
1635        Ok((result, old_muted))
1636    }
1637
1638    fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1639        if let LocalTrack::Published {
1640            track_publication, ..
1641        } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1642        {
1643            self.room.unpublish_track(track_publication);
1644            cx.notify();
1645        }
1646
1647        if let LocalTrack::Published {
1648            track_publication, ..
1649        } = mem::replace(&mut self.screen_track, LocalTrack::None)
1650        {
1651            self.room.unpublish_track(track_publication);
1652            cx.notify();
1653        }
1654    }
1655}
1656
1657enum LocalTrack {
1658    None,
1659    Pending {
1660        publish_id: usize,
1661        muted: bool,
1662    },
1663    Published {
1664        track_publication: LocalTrackPublication,
1665        muted: bool,
1666    },
1667}
1668
1669impl Default for LocalTrack {
1670    fn default() -> Self {
1671        Self::None
1672    }
1673}
1674
1675#[derive(Copy, Clone, PartialEq, Eq)]
1676pub enum RoomStatus {
1677    Online,
1678    Rejoining,
1679    Offline,
1680}
1681
1682impl RoomStatus {
1683    pub fn is_offline(&self) -> bool {
1684        matches!(self, RoomStatus::Offline)
1685    }
1686
1687    pub fn is_online(&self) -> bool {
1688        matches!(self, RoomStatus::Online)
1689    }
1690}