1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, RemoteParticipant},
4};
5use anyhow::{Context as _, Result, anyhow};
6use audio::{Audio, Sound};
7use client::{
8 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
9 proto::{self, PeerId},
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use feature_flags::FeatureFlagAppExt;
13use fs::Fs;
14use futures::StreamExt;
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, FutureExt as _,
17 ScreenCaptureSource, ScreenCaptureStream, Task, Timeout, WeakEntity,
18};
19use gpui_tokio::Tokio;
20use language::LanguageRegistry;
21use livekit::{LocalTrackPublication, ParticipantIdentity, RoomEvent};
22use livekit_client::{self as livekit, AudioStream, TrackSid};
23use postage::{sink::Sink, stream::Stream, watch};
24use project::{CURRENT_PROJECT_FEATURES, Project};
25use settings::Settings as _;
26use std::sync::atomic::AtomicU64;
27use std::{future::Future, mem, rc::Rc, sync::Arc, time::Duration, time::Instant};
28
29use super::diagnostics::CallDiagnostics;
30use util::{ResultExt, TryFutureExt, paths::PathStyle, post_inc};
31use workspace::ParticipantLocation;
32
33pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
34
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub enum Event {
37 RoomJoined {
38 channel_id: Option<ChannelId>,
39 },
40 ParticipantLocationChanged {
41 participant_id: proto::PeerId,
42 },
43 RemoteVideoTracksChanged {
44 participant_id: proto::PeerId,
45 },
46 RemoteVideoTrackUnsubscribed {
47 sid: TrackSid,
48 },
49 RemoteAudioTracksChanged {
50 participant_id: proto::PeerId,
51 },
52 RemoteProjectShared {
53 owner: Arc<User>,
54 project_id: u64,
55 worktree_root_names: Vec<String>,
56 },
57 RemoteProjectUnshared {
58 project_id: u64,
59 },
60 RemoteProjectJoined {
61 project_id: u64,
62 },
63 RemoteProjectInvitationDiscarded {
64 project_id: u64,
65 },
66 RoomLeft {
67 channel_id: Option<ChannelId>,
68 },
69}
70
71pub struct Room {
72 id: u64,
73 channel_id: Option<ChannelId>,
74 live_kit: Option<LiveKitRoom>,
75 diagnostics: Option<Entity<CallDiagnostics>>,
76 status: RoomStatus,
77 shared_projects: HashSet<WeakEntity<Project>>,
78 joined_projects: HashSet<WeakEntity<Project>>,
79 local_participant: LocalParticipant,
80 remote_participants: BTreeMap<u64, RemoteParticipant>,
81 pending_participants: Vec<Arc<User>>,
82 participant_user_ids: HashSet<u64>,
83 pending_call_count: usize,
84 leave_when_empty: bool,
85 client: Arc<Client>,
86 user_store: Entity<UserStore>,
87 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
88 client_subscriptions: Vec<client::Subscription>,
89 _subscriptions: Vec<gpui::Subscription>,
90 room_update_completed_tx: watch::Sender<Option<()>>,
91 room_update_completed_rx: watch::Receiver<Option<()>>,
92 pending_room_update: Option<Task<()>>,
93 maintain_connection: Option<Task<Option<()>>>,
94 created: Instant,
95}
96
97impl EventEmitter<Event> for Room {}
98
99impl Room {
100 pub fn channel_id(&self) -> Option<ChannelId> {
101 self.channel_id
102 }
103
104 pub fn is_sharing_project(&self) -> bool {
105 !self.shared_projects.is_empty()
106 }
107
108 pub fn is_connected(&self, _: &App) -> bool {
109 if let Some(live_kit) = self.live_kit.as_ref() {
110 live_kit.room.connection_state() == livekit::ConnectionState::Connected
111 } else {
112 false
113 }
114 }
115
116 fn new(
117 id: u64,
118 channel_id: Option<ChannelId>,
119 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
120 client: Arc<Client>,
121 user_store: Entity<UserStore>,
122 cx: &mut Context<Self>,
123 ) -> Self {
124 spawn_room_connection(livekit_connection_info, cx);
125
126 let maintain_connection = cx.spawn({
127 let client = client.clone();
128 async move |this, cx| {
129 Self::maintain_connection(this, client.clone(), cx)
130 .log_err()
131 .await
132 }
133 });
134
135 Audio::play_sound(Sound::Joined, cx);
136
137 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
138
139 Self {
140 id,
141 channel_id,
142 live_kit: None,
143 diagnostics: None,
144 status: RoomStatus::Online,
145 shared_projects: Default::default(),
146 joined_projects: Default::default(),
147 participant_user_ids: Default::default(),
148 local_participant: Default::default(),
149 remote_participants: Default::default(),
150 pending_participants: Default::default(),
151 pending_call_count: 0,
152 client_subscriptions: vec![
153 client.add_message_handler(cx.weak_entity(), Self::handle_room_updated),
154 ],
155 _subscriptions: vec![
156 cx.on_release(Self::released),
157 cx.on_app_quit(Self::app_will_quit),
158 ],
159 leave_when_empty: false,
160 pending_room_update: None,
161 client,
162 user_store,
163 follows_by_leader_id_project_id: Default::default(),
164 maintain_connection: Some(maintain_connection),
165 room_update_completed_tx,
166 room_update_completed_rx,
167 created: cx.background_executor().now(),
168 }
169 }
170
171 pub(crate) fn create(
172 called_user_id: u64,
173 initial_project: Option<Entity<Project>>,
174 client: Arc<Client>,
175 user_store: Entity<UserStore>,
176 cx: &mut App,
177 ) -> Task<Result<Entity<Self>>> {
178 cx.spawn(async move |cx| {
179 let response = client.request(proto::CreateRoom {}).await?;
180 let room_proto = response.room.context("invalid room")?;
181 let room = cx.new(|cx| {
182 let mut room = Self::new(
183 room_proto.id,
184 None,
185 response.live_kit_connection_info,
186 client,
187 user_store,
188 cx,
189 );
190 if let Some(participant) = room_proto.participants.first() {
191 room.local_participant.role = participant.role()
192 }
193 room
194 });
195
196 let initial_project_id = if let Some(initial_project) = initial_project {
197 let initial_project_id = room
198 .update(cx, |room, cx| {
199 room.share_project(initial_project.clone(), cx)
200 })
201 .await?;
202 Some(initial_project_id)
203 } else {
204 None
205 };
206
207 let did_join = room
208 .update(cx, |room, cx| {
209 room.leave_when_empty = true;
210 room.call(called_user_id, initial_project_id, cx)
211 })
212 .await;
213 match did_join {
214 Ok(()) => Ok(room),
215 Err(error) => Err(error.context("room creation failed")),
216 }
217 })
218 }
219
220 pub(crate) async fn join_channel(
221 channel_id: ChannelId,
222 client: Arc<Client>,
223 user_store: Entity<UserStore>,
224 cx: AsyncApp,
225 ) -> Result<Entity<Self>> {
226 Self::from_join_response(
227 client
228 .request(proto::JoinChannel {
229 channel_id: channel_id.0,
230 })
231 .await?,
232 client,
233 user_store,
234 cx,
235 )
236 }
237
238 pub(crate) async fn join(
239 room_id: u64,
240 client: Arc<Client>,
241 user_store: Entity<UserStore>,
242 cx: AsyncApp,
243 ) -> Result<Entity<Self>> {
244 Self::from_join_response(
245 client.request(proto::JoinRoom { id: room_id }).await?,
246 client,
247 user_store,
248 cx,
249 )
250 }
251
252 fn released(&mut self, cx: &mut App) {
253 if self.status.is_online() {
254 self.leave_internal(cx).detach_and_log_err(cx);
255 }
256 }
257
258 fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> + use<> {
259 let task = if self.status.is_online() {
260 let leave = self.leave_internal(cx);
261 Some(cx.background_spawn(async move {
262 leave.await.log_err();
263 }))
264 } else {
265 None
266 };
267
268 async move {
269 if let Some(task) = task {
270 task.await;
271 }
272 }
273 }
274
275 pub fn mute_on_join(cx: &App) -> bool {
276 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
277 }
278
279 fn from_join_response(
280 response: proto::JoinRoomResponse,
281 client: Arc<Client>,
282 user_store: Entity<UserStore>,
283 mut cx: AsyncApp,
284 ) -> Result<Entity<Self>> {
285 let room_proto = response.room.context("invalid room")?;
286 let room = cx.new(|cx| {
287 Self::new(
288 room_proto.id,
289 response.channel_id.map(ChannelId),
290 response.live_kit_connection_info,
291 client,
292 user_store,
293 cx,
294 )
295 });
296 room.update(&mut cx, |room, cx| {
297 room.leave_when_empty = room.channel_id.is_none();
298 room.apply_room_update(room_proto, cx)?;
299 anyhow::Ok(())
300 })?;
301 Ok(room)
302 }
303
304 fn should_leave(&self) -> bool {
305 self.leave_when_empty
306 && self.pending_room_update.is_none()
307 && self.pending_participants.is_empty()
308 && self.remote_participants.is_empty()
309 && self.pending_call_count == 0
310 }
311
312 pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
313 cx.notify();
314 self.emit_video_track_unsubscribed_events(cx);
315 self.leave_internal(cx)
316 }
317
318 fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
319 if self.status.is_offline() {
320 return Task::ready(Err(anyhow!("room is offline")));
321 }
322
323 log::info!("leaving room");
324 Audio::play_sound(Sound::Leave, cx);
325
326 self.clear_state(cx);
327
328 let leave_room = self.client.request(proto::LeaveRoom {});
329 cx.background_spawn(async move {
330 leave_room.await?;
331 anyhow::Ok(())
332 })
333 }
334
335 pub(crate) fn clear_state(&mut self, cx: &mut App) {
336 for project in self.shared_projects.drain() {
337 if let Some(project) = project.upgrade() {
338 project.update(cx, |project, cx| {
339 project.unshare(cx).log_err();
340 });
341 }
342 }
343 for project in self.joined_projects.drain() {
344 if let Some(project) = project.upgrade() {
345 project.update(cx, |project, cx| {
346 project.disconnected_from_host(cx);
347 project.close(cx);
348 });
349 }
350 }
351
352 self.status = RoomStatus::Offline;
353 self.remote_participants.clear();
354 self.pending_participants.clear();
355 self.participant_user_ids.clear();
356 self.client_subscriptions.clear();
357 self.live_kit.take();
358 self.diagnostics.take();
359 self.pending_room_update.take();
360 self.maintain_connection.take();
361 }
362
363 fn emit_video_track_unsubscribed_events(&self, cx: &mut Context<Self>) {
364 for participant in self.remote_participants.values() {
365 for sid in participant.video_tracks.keys() {
366 cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: sid.clone() });
367 }
368 }
369 }
370
371 async fn maintain_connection(
372 this: WeakEntity<Self>,
373 client: Arc<Client>,
374 cx: &mut AsyncApp,
375 ) -> Result<()> {
376 let mut client_status = client.status();
377 loop {
378 let _ = client_status.try_recv();
379 let is_connected = client_status.borrow().is_connected();
380 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
381 if !is_connected || client_status.next().await.is_some() {
382 log::info!("detected client disconnection");
383
384 this.upgrade()
385 .context("room was dropped")?
386 .update(cx, |this, cx| {
387 this.status = RoomStatus::Rejoining;
388 cx.notify();
389 });
390
391 // Wait for client to re-establish a connection to the server.
392 let executor = cx.background_executor().clone();
393 let client_reconnection = async {
394 let mut remaining_attempts = 3;
395 while remaining_attempts > 0 {
396 if client_status.borrow().is_connected() {
397 log::info!("client reconnected, attempting to rejoin room");
398
399 let Some(this) = this.upgrade() else { break };
400 let task = this.update(cx, |this, cx| this.rejoin(cx));
401 if task.await.log_err().is_some() {
402 return true;
403 } else {
404 remaining_attempts -= 1;
405 }
406 } else if client_status.borrow().is_signed_out() {
407 return false;
408 }
409
410 log::info!(
411 "waiting for client status change, remaining attempts {}",
412 remaining_attempts
413 );
414 client_status.next().await;
415 }
416 false
417 };
418
419 match client_reconnection
420 .with_timeout(RECONNECT_TIMEOUT, &executor)
421 .await
422 {
423 Ok(true) => {
424 log::info!("successfully reconnected to room");
425 // If we successfully joined the room, go back around the loop
426 // waiting for future connection status changes.
427 continue;
428 }
429 Ok(false) => break,
430 Err(Timeout) => {
431 log::info!("room reconnection timeout expired");
432 break;
433 }
434 }
435 }
436 }
437
438 // The client failed to re-establish a connection to the server
439 // or an error occurred while trying to re-join the room. Either way
440 // we leave the room and return an error.
441 if let Some(this) = this.upgrade() {
442 log::info!("reconnection failed, leaving room");
443 this.update(cx, |this, cx| this.leave(cx)).await?;
444 }
445 anyhow::bail!("can't reconnect to room: client failed to re-establish connection");
446 }
447
448 fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
449 let mut projects = HashMap::default();
450 let mut reshared_projects = Vec::new();
451 let mut rejoined_projects = Vec::new();
452 self.shared_projects.retain(|project| {
453 if let Some(handle) = project.upgrade() {
454 let project = handle.read(cx);
455 if let Some(project_id) = project.remote_id() {
456 projects.insert(project_id, handle.clone());
457 reshared_projects.push(proto::UpdateProject {
458 project_id,
459 worktrees: project.worktree_metadata_protos(cx),
460 });
461 return true;
462 }
463 }
464 false
465 });
466 self.joined_projects.retain(|project| {
467 if let Some(handle) = project.upgrade() {
468 let project = handle.read(cx);
469 if let Some(project_id) = project.remote_id() {
470 projects.insert(project_id, handle.clone());
471 let mut worktrees = Vec::new();
472 let mut repositories = Vec::new();
473 for worktree in project.worktrees(cx) {
474 let worktree = worktree.read(cx);
475 worktrees.push(proto::RejoinWorktree {
476 id: worktree.id().to_proto(),
477 scan_id: worktree.completed_scan_id() as u64,
478 });
479 }
480 for (entry_id, repository) in project.repositories(cx) {
481 let repository = repository.read(cx);
482 repositories.push(proto::RejoinRepository {
483 id: entry_id.to_proto(),
484 scan_id: repository.scan_id,
485 });
486 }
487
488 rejoined_projects.push(proto::RejoinProject {
489 id: project_id,
490 worktrees,
491 repositories,
492 });
493 }
494 return true;
495 }
496 false
497 });
498
499 let response = self.client.request_envelope(proto::RejoinRoom {
500 id: self.id,
501 reshared_projects,
502 rejoined_projects,
503 });
504
505 cx.spawn(async move |this, cx| {
506 let response = response.await?;
507 let message_id = response.message_id;
508 let response = response.payload;
509 let room_proto = response.room.context("invalid room")?;
510 this.update(cx, |this, cx| {
511 this.status = RoomStatus::Online;
512 this.apply_room_update(room_proto, cx)?;
513
514 for reshared_project in response.reshared_projects {
515 if let Some(project) = projects.get(&reshared_project.id) {
516 project.update(cx, |project, cx| {
517 project.reshared(reshared_project, cx).log_err();
518 });
519 }
520 }
521
522 for rejoined_project in response.rejoined_projects {
523 if let Some(project) = projects.get(&rejoined_project.id) {
524 project.update(cx, |project, cx| {
525 project.rejoined(rejoined_project, message_id, cx).log_err();
526 });
527 }
528 }
529
530 anyhow::Ok(())
531 })?
532 })
533 }
534
535 pub fn id(&self) -> u64 {
536 self.id
537 }
538
539 pub fn room_id(&self) -> impl Future<Output = Option<String>> + 'static {
540 let room = self.live_kit.as_ref().map(|lk| lk.room.clone());
541 async move {
542 let room = room?;
543 let sid = room.sid().await;
544 let name = room.name();
545 Some(format!("{} (sid: {sid})", name))
546 }
547 }
548
549 pub fn get_stats(&self, cx: &App) -> Task<Option<livekit::SessionStats>> {
550 match self.live_kit.as_ref() {
551 Some(lk) => {
552 let task = lk.room.stats_task(cx);
553 cx.background_executor()
554 .spawn(async move { task.await.ok() })
555 }
556 None => Task::ready(None),
557 }
558 }
559
560 pub fn input_lag(&self) -> Option<Duration> {
561 let us = self
562 .live_kit
563 .as_ref()?
564 .input_lag_us
565 .as_ref()?
566 .load(std::sync::atomic::Ordering::Relaxed);
567 if us > 0 {
568 Some(Duration::from_micros(us))
569 } else {
570 None
571 }
572 }
573
574 pub fn diagnostics(&self) -> Option<&Entity<CallDiagnostics>> {
575 self.diagnostics.as_ref()
576 }
577
578 pub fn connection_quality(&self) -> livekit::ConnectionQuality {
579 self.live_kit
580 .as_ref()
581 .map(|lk| lk.room.local_participant().connection_quality())
582 .unwrap_or(livekit::ConnectionQuality::Lost)
583 }
584
585 pub fn status(&self) -> RoomStatus {
586 self.status
587 }
588
589 pub fn local_participant(&self) -> &LocalParticipant {
590 &self.local_participant
591 }
592
593 pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
594 self.user_store.read(cx).current_user()
595 }
596
597 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
598 &self.remote_participants
599 }
600
601 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
602 self.remote_participants
603 .values()
604 .find(|p| p.peer_id == peer_id)
605 }
606
607 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
608 self.remote_participants
609 .get(&user_id)
610 .map(|participant| participant.role)
611 }
612
613 pub fn contains_guests(&self) -> bool {
614 self.local_participant.role == proto::ChannelRole::Guest
615 || self
616 .remote_participants
617 .values()
618 .any(|p| p.role == proto::ChannelRole::Guest)
619 }
620
621 pub fn local_participant_is_admin(&self) -> bool {
622 self.local_participant.role == proto::ChannelRole::Admin
623 }
624
625 pub fn local_participant_is_guest(&self) -> bool {
626 self.local_participant.role == proto::ChannelRole::Guest
627 }
628
629 pub fn set_participant_role(
630 &mut self,
631 user_id: u64,
632 role: proto::ChannelRole,
633 cx: &Context<Self>,
634 ) -> Task<Result<()>> {
635 let client = self.client.clone();
636 let room_id = self.id;
637 let role = role.into();
638 cx.spawn(async move |_, _| {
639 client
640 .request(proto::SetRoomParticipantRole {
641 room_id,
642 user_id,
643 role,
644 })
645 .await
646 .map(|_| ())
647 })
648 }
649
650 pub fn pending_participants(&self) -> &[Arc<User>] {
651 &self.pending_participants
652 }
653
654 pub fn contains_participant(&self, user_id: u64) -> bool {
655 self.participant_user_ids.contains(&user_id)
656 }
657
658 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
659 self.follows_by_leader_id_project_id
660 .get(&(leader_id, project_id))
661 .map_or(&[], |v| v.as_slice())
662 }
663
664 /// Returns the most 'active' projects, defined as most people in the project
665 pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
666 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
667 for participant in self.remote_participants.values() {
668 match participant.location {
669 ParticipantLocation::SharedProject { project_id } => {
670 project_hosts_and_guest_counts
671 .entry(project_id)
672 .or_default()
673 .1 += 1;
674 }
675 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
676 }
677 for project in &participant.projects {
678 project_hosts_and_guest_counts
679 .entry(project.id)
680 .or_default()
681 .0 = Some(participant.user.id);
682 }
683 }
684
685 if let Some(user) = self.user_store.read(cx).current_user() {
686 for project in &self.local_participant.projects {
687 project_hosts_and_guest_counts
688 .entry(project.id)
689 .or_default()
690 .0 = Some(user.id);
691 }
692 }
693
694 project_hosts_and_guest_counts
695 .into_iter()
696 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
697 .max_by_key(|(_, _, guest_count)| *guest_count)
698 .map(|(id, host, _)| (id, host))
699 }
700
701 async fn handle_room_updated(
702 this: Entity<Self>,
703 envelope: TypedEnvelope<proto::RoomUpdated>,
704 mut cx: AsyncApp,
705 ) -> Result<()> {
706 let room = envelope.payload.room.context("invalid room")?;
707 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
708 }
709
710 fn apply_room_update(&mut self, room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
711 log::trace!(
712 "client {:?}. room update: {:?}",
713 self.client.user_id(),
714 &room
715 );
716
717 self.pending_room_update = Some(self.start_room_connection(room, cx));
718
719 cx.notify();
720 Ok(())
721 }
722
723 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> + use<> {
724 let mut done_rx = self.room_update_completed_rx.clone();
725 async move {
726 while let Some(result) = done_rx.next().await {
727 if result.is_some() {
728 break;
729 }
730 }
731 }
732 }
733
734 fn start_room_connection(&self, mut room: proto::Room, cx: &mut Context<Self>) -> Task<()> {
735 // Filter ourselves out from the room's participants.
736 let local_participant_ix = room
737 .participants
738 .iter()
739 .position(|participant| Some(participant.user_id) == self.client.user_id());
740 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
741
742 let pending_participant_user_ids = room
743 .pending_participants
744 .iter()
745 .map(|p| p.user_id)
746 .collect::<Vec<_>>();
747
748 let remote_participant_user_ids = room
749 .participants
750 .iter()
751 .map(|p| p.user_id)
752 .collect::<Vec<_>>();
753
754 let (remote_participants, pending_participants) =
755 self.user_store.update(cx, move |user_store, cx| {
756 (
757 user_store.get_users(remote_participant_user_ids, cx),
758 user_store.get_users(pending_participant_user_ids, cx),
759 )
760 });
761 cx.spawn(async move |this, cx| {
762 let (remote_participants, pending_participants) =
763 futures::join!(remote_participants, pending_participants);
764
765 this.update(cx, |this, cx| {
766 this.participant_user_ids.clear();
767
768 if let Some(participant) = local_participant {
769 let role = participant.role();
770 this.local_participant.projects = participant.projects;
771 if this.local_participant.role != role {
772 this.local_participant.role = role;
773
774 if role == proto::ChannelRole::Guest {
775 for project in mem::take(&mut this.shared_projects) {
776 if let Some(project) = project.upgrade() {
777 this.unshare_project(project, cx).log_err();
778 }
779 }
780 this.local_participant.projects.clear();
781 if let Some(livekit_room) = &mut this.live_kit {
782 livekit_room.stop_publishing(cx);
783 }
784 }
785
786 this.joined_projects.retain(|project| {
787 if let Some(project) = project.upgrade() {
788 project.update(cx, |project, cx| project.set_role(role, cx));
789 true
790 } else {
791 false
792 }
793 });
794 }
795 } else {
796 this.local_participant.projects.clear();
797 }
798
799 let livekit_participants = this
800 .live_kit
801 .as_ref()
802 .map(|live_kit| live_kit.room.remote_participants());
803
804 if let Some(participants) = remote_participants.log_err() {
805 for (participant, user) in room.participants.into_iter().zip(participants) {
806 let Some(peer_id) = participant.peer_id else {
807 continue;
808 };
809 let participant_index = ParticipantIndex(participant.participant_index);
810 this.participant_user_ids.insert(participant.user_id);
811
812 let old_projects = this
813 .remote_participants
814 .get(&participant.user_id)
815 .into_iter()
816 .flat_map(|existing| &existing.projects)
817 .map(|project| project.id)
818 .collect::<HashSet<_>>();
819 let new_projects = participant
820 .projects
821 .iter()
822 .map(|project| project.id)
823 .collect::<HashSet<_>>();
824
825 for project in &participant.projects {
826 if !old_projects.contains(&project.id) {
827 cx.emit(Event::RemoteProjectShared {
828 owner: user.clone(),
829 project_id: project.id,
830 worktree_root_names: project.worktree_root_names.clone(),
831 });
832 }
833 }
834
835 for unshared_project_id in old_projects.difference(&new_projects) {
836 this.joined_projects.retain(|project| {
837 if let Some(project) = project.upgrade() {
838 project.update(cx, |project, cx| {
839 if project.remote_id() == Some(*unshared_project_id) {
840 project.disconnected_from_host(cx);
841 false
842 } else {
843 true
844 }
845 })
846 } else {
847 false
848 }
849 });
850 cx.emit(Event::RemoteProjectUnshared {
851 project_id: *unshared_project_id,
852 });
853 }
854
855 let role = participant.role();
856 let location = ParticipantLocation::from_proto(participant.location)
857 .unwrap_or(ParticipantLocation::External);
858 if let Some(remote_participant) =
859 this.remote_participants.get_mut(&participant.user_id)
860 {
861 remote_participant.peer_id = peer_id;
862 remote_participant.projects = participant.projects;
863 remote_participant.participant_index = participant_index;
864 if location != remote_participant.location
865 || role != remote_participant.role
866 {
867 remote_participant.location = location;
868 remote_participant.role = role;
869 cx.emit(Event::ParticipantLocationChanged {
870 participant_id: peer_id,
871 });
872 }
873 } else {
874 this.remote_participants.insert(
875 participant.user_id,
876 RemoteParticipant {
877 user: user.clone(),
878 participant_index,
879 peer_id,
880 projects: participant.projects,
881 location,
882 role,
883 muted: true,
884 speaking: false,
885 video_tracks: Default::default(),
886 audio_tracks: Default::default(),
887 },
888 );
889
890 // When joining a room start_room_connection gets
891 // called but we have already played the join sound.
892 // Dont play extra sounds over that.
893 if this.created.elapsed() > Duration::from_millis(100) {
894 if let proto::ChannelRole::Guest = role {
895 Audio::play_sound(Sound::GuestJoined, cx);
896 } else {
897 Audio::play_sound(Sound::Joined, cx);
898 }
899 }
900
901 if let Some(livekit_participants) = &livekit_participants
902 && let Some(livekit_participant) = livekit_participants
903 .get(&ParticipantIdentity(user.id.to_string()))
904 {
905 for publication in
906 livekit_participant.track_publications().into_values()
907 {
908 if let Some(track) = publication.track() {
909 this.livekit_room_updated(
910 RoomEvent::TrackSubscribed {
911 track,
912 publication,
913 participant: livekit_participant.clone(),
914 },
915 cx,
916 )
917 .warn_on_err();
918 }
919 }
920 }
921 }
922 }
923
924 this.remote_participants.retain(|user_id, participant| {
925 if this.participant_user_ids.contains(user_id) {
926 true
927 } else {
928 for project in &participant.projects {
929 cx.emit(Event::RemoteProjectUnshared {
930 project_id: project.id,
931 });
932 }
933 for sid in participant.video_tracks.keys() {
934 cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: sid.clone() });
935 }
936 false
937 }
938 });
939 }
940
941 if let Some(pending_participants) = pending_participants.log_err() {
942 this.pending_participants = pending_participants;
943 for participant in &this.pending_participants {
944 this.participant_user_ids.insert(participant.id);
945 }
946 }
947
948 this.follows_by_leader_id_project_id.clear();
949 for follower in room.followers {
950 let project_id = follower.project_id;
951 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
952 (Some(leader), Some(follower)) => (leader, follower),
953
954 _ => {
955 log::error!("Follower message {follower:?} missing some state");
956 continue;
957 }
958 };
959
960 let list = this
961 .follows_by_leader_id_project_id
962 .entry((leader, project_id))
963 .or_default();
964 if !list.contains(&follower) {
965 list.push(follower);
966 }
967 }
968
969 this.pending_room_update.take();
970 if this.should_leave() {
971 log::info!("room is empty, leaving");
972 this.leave(cx).detach();
973 }
974
975 this.user_store.update(cx, |user_store, cx| {
976 let participant_indices_by_user_id = this
977 .remote_participants
978 .iter()
979 .map(|(user_id, participant)| (*user_id, participant.participant_index))
980 .collect();
981 user_store.set_participant_indices(participant_indices_by_user_id, cx);
982 });
983
984 this.check_invariants();
985 this.room_update_completed_tx.try_send(Some(())).ok();
986 cx.notify();
987 })
988 .ok();
989 })
990 }
991
992 fn livekit_room_updated(&mut self, event: RoomEvent, cx: &mut Context<Self>) -> Result<()> {
993 log::trace!(
994 "client {:?}. livekit event: {:?}",
995 self.client.user_id(),
996 &event
997 );
998
999 match event {
1000 RoomEvent::TrackSubscribed {
1001 track,
1002 participant,
1003 publication,
1004 } => {
1005 let user_id = participant.identity().0.parse()?;
1006 let track_id = track.sid();
1007 let participant =
1008 self.remote_participants
1009 .get_mut(&user_id)
1010 .with_context(|| {
1011 format!(
1012 "{:?} subscribed to track by unknown participant {user_id}",
1013 self.client.user_id()
1014 )
1015 })?;
1016 if self.live_kit.as_ref().is_none_or(|kit| kit.deafened) && publication.is_audio() {
1017 publication.set_enabled(false, cx);
1018 }
1019 match track {
1020 livekit_client::RemoteTrack::Audio(track) => {
1021 cx.emit(Event::RemoteAudioTracksChanged {
1022 participant_id: participant.peer_id,
1023 });
1024 if let Some(live_kit) = self.live_kit.as_ref() {
1025 let stream = live_kit.room.play_remote_audio_track(&track, cx)?;
1026 participant.audio_tracks.insert(track_id, (track, stream));
1027 participant.muted = publication.is_muted();
1028 }
1029 }
1030 livekit_client::RemoteTrack::Video(track) => {
1031 cx.emit(Event::RemoteVideoTracksChanged {
1032 participant_id: participant.peer_id,
1033 });
1034 participant.video_tracks.insert(track_id, track);
1035 }
1036 }
1037 }
1038
1039 RoomEvent::TrackUnsubscribed {
1040 track, participant, ..
1041 } => {
1042 let user_id = participant.identity().0.parse()?;
1043 let participant =
1044 self.remote_participants
1045 .get_mut(&user_id)
1046 .with_context(|| {
1047 format!(
1048 "{:?}, unsubscribed from track by unknown participant {user_id}",
1049 self.client.user_id()
1050 )
1051 })?;
1052 match track {
1053 livekit_client::RemoteTrack::Audio(track) => {
1054 participant.audio_tracks.remove(&track.sid());
1055 participant.muted = true;
1056 cx.emit(Event::RemoteAudioTracksChanged {
1057 participant_id: participant.peer_id,
1058 });
1059 }
1060 livekit_client::RemoteTrack::Video(track) => {
1061 participant.video_tracks.remove(&track.sid());
1062 cx.emit(Event::RemoteVideoTracksChanged {
1063 participant_id: participant.peer_id,
1064 });
1065 cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: track.sid() });
1066 }
1067 }
1068 }
1069
1070 RoomEvent::ActiveSpeakersChanged { speakers } => {
1071 let mut speaker_ids = speakers
1072 .into_iter()
1073 .filter_map(|speaker| speaker.identity().0.parse().ok())
1074 .collect::<Vec<u64>>();
1075 speaker_ids.sort_unstable();
1076 for (sid, participant) in &mut self.remote_participants {
1077 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1078 }
1079 if let Some(id) = self.client.user_id()
1080 && let Some(room) = &mut self.live_kit
1081 {
1082 room.speaking = speaker_ids.binary_search(&id).is_ok();
1083 }
1084 }
1085
1086 RoomEvent::TrackMuted {
1087 participant,
1088 publication,
1089 }
1090 | RoomEvent::TrackUnmuted {
1091 participant,
1092 publication,
1093 } => {
1094 let mut found = false;
1095 let user_id = participant.identity().0.parse()?;
1096 let track_id = publication.sid();
1097 if let Some(participant) = self.remote_participants.get_mut(&user_id) {
1098 for (track, _) in participant.audio_tracks.values() {
1099 if track.sid() == track_id {
1100 found = true;
1101 break;
1102 }
1103 }
1104 if found {
1105 participant.muted = publication.is_muted();
1106 }
1107 }
1108 }
1109
1110 RoomEvent::LocalTrackUnpublished { publication, .. } => {
1111 log::info!("unpublished track {}", publication.sid());
1112 if let Some(room) = &mut self.live_kit {
1113 if let LocalTrack::Published {
1114 track_publication, ..
1115 } = &room.microphone_track
1116 && track_publication.sid() == publication.sid()
1117 {
1118 room.microphone_track = LocalTrack::None;
1119 }
1120 if let LocalTrack::Published {
1121 track_publication, ..
1122 } = &room.screen_track
1123 && track_publication.sid() == publication.sid()
1124 {
1125 room.screen_track = LocalTrack::None;
1126 }
1127 }
1128 }
1129
1130 RoomEvent::LocalTrackPublished { publication, .. } => {
1131 log::info!("published track {:?}", publication.sid());
1132 }
1133
1134 RoomEvent::Disconnected { reason } => {
1135 log::info!("disconnected from room: {reason:?}");
1136 self.leave(cx).detach_and_log_err(cx);
1137 }
1138 _ => {}
1139 }
1140
1141 cx.notify();
1142 Ok(())
1143 }
1144
1145 fn check_invariants(&self) {
1146 #[cfg(any(test, feature = "test-support"))]
1147 {
1148 for participant in self.remote_participants.values() {
1149 assert!(self.participant_user_ids.contains(&participant.user.id));
1150 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1151 }
1152
1153 for participant in &self.pending_participants {
1154 assert!(self.participant_user_ids.contains(&participant.id));
1155 assert_ne!(participant.id, self.client.user_id().unwrap());
1156 }
1157
1158 assert_eq!(
1159 self.participant_user_ids.len(),
1160 self.remote_participants.len() + self.pending_participants.len()
1161 );
1162 }
1163 }
1164
1165 pub(crate) fn call(
1166 &mut self,
1167 called_user_id: u64,
1168 initial_project_id: Option<u64>,
1169 cx: &mut Context<Self>,
1170 ) -> Task<Result<()>> {
1171 if self.status.is_offline() {
1172 return Task::ready(Err(anyhow!("room is offline")));
1173 }
1174
1175 cx.notify();
1176 let client = self.client.clone();
1177 let room_id = self.id;
1178 self.pending_call_count += 1;
1179 cx.spawn(async move |this, cx| {
1180 let result = client
1181 .request(proto::Call {
1182 room_id,
1183 called_user_id,
1184 initial_project_id,
1185 })
1186 .await;
1187 this.update(cx, |this, cx| {
1188 this.pending_call_count -= 1;
1189 if this.should_leave() {
1190 this.leave(cx).detach_and_log_err(cx);
1191 }
1192 })?;
1193 result?;
1194 Ok(())
1195 })
1196 }
1197
1198 pub fn join_project(
1199 &mut self,
1200 id: u64,
1201 language_registry: Arc<LanguageRegistry>,
1202 fs: Arc<dyn Fs>,
1203 cx: &mut Context<Self>,
1204 ) -> Task<Result<Entity<Project>>> {
1205 let client = self.client.clone();
1206 let user_store = self.user_store.clone();
1207 cx.emit(Event::RemoteProjectJoined { project_id: id });
1208 cx.spawn(async move |this, cx| {
1209 let project =
1210 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1211
1212 this.update(cx, |this, cx| {
1213 this.joined_projects.retain(|project| {
1214 if let Some(project) = project.upgrade() {
1215 !project.read(cx).is_disconnected(cx)
1216 } else {
1217 false
1218 }
1219 });
1220 this.joined_projects.insert(project.downgrade());
1221 })?;
1222 Ok(project)
1223 })
1224 }
1225
1226 pub fn share_project(
1227 &mut self,
1228 project: Entity<Project>,
1229 cx: &mut Context<Self>,
1230 ) -> Task<Result<u64>> {
1231 if let Some(project_id) = project.read(cx).remote_id() {
1232 return Task::ready(Ok(project_id));
1233 }
1234
1235 let request = self.client.request(proto::ShareProject {
1236 room_id: self.id(),
1237 worktrees: project.read(cx).worktree_metadata_protos(cx),
1238 is_ssh_project: project.read(cx).is_via_remote_server(),
1239 windows_paths: Some(project.read(cx).path_style(cx) == PathStyle::Windows),
1240 features: CURRENT_PROJECT_FEATURES
1241 .iter()
1242 .map(|s| s.to_string())
1243 .collect(),
1244 });
1245
1246 cx.spawn(async move |this, cx| {
1247 let response = request.await?;
1248
1249 project.update(cx, |project, cx| project.shared(response.project_id, cx))?;
1250
1251 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1252 this.update(cx, |this, cx| {
1253 this.shared_projects.insert(project.downgrade());
1254 let active_project = this.local_participant.active_project.as_ref();
1255 if active_project.is_some_and(|location| *location == project) {
1256 this.set_location(Some(&project), cx)
1257 } else {
1258 Task::ready(Ok(()))
1259 }
1260 })?
1261 .await?;
1262
1263 Ok(response.project_id)
1264 })
1265 }
1266
1267 pub(crate) fn unshare_project(
1268 &mut self,
1269 project: Entity<Project>,
1270 cx: &mut Context<Self>,
1271 ) -> Result<()> {
1272 let project_id = match project.read(cx).remote_id() {
1273 Some(project_id) => project_id,
1274 None => return Ok(()),
1275 };
1276
1277 self.client.send(proto::UnshareProject { project_id })?;
1278 project.update(cx, |this, cx| this.unshare(cx))?;
1279
1280 if self.local_participant.active_project == Some(project.downgrade()) {
1281 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1282 }
1283 Ok(())
1284 }
1285
1286 pub(crate) fn set_location(
1287 &mut self,
1288 project: Option<&Entity<Project>>,
1289 cx: &mut Context<Self>,
1290 ) -> Task<Result<()>> {
1291 if self.status.is_offline() {
1292 return Task::ready(Err(anyhow!("room is offline")));
1293 }
1294
1295 let client = self.client.clone();
1296 let room_id = self.id;
1297 let location = if let Some(project) = project {
1298 self.local_participant.active_project = Some(project.downgrade());
1299 if let Some(project_id) = project.read(cx).remote_id() {
1300 proto::participant_location::Variant::SharedProject(
1301 proto::participant_location::SharedProject { id: project_id },
1302 )
1303 } else {
1304 proto::participant_location::Variant::UnsharedProject(
1305 proto::participant_location::UnsharedProject {},
1306 )
1307 }
1308 } else {
1309 self.local_participant.active_project = None;
1310 proto::participant_location::Variant::External(proto::participant_location::External {})
1311 };
1312
1313 cx.notify();
1314 cx.background_spawn(async move {
1315 client
1316 .request(proto::UpdateParticipantLocation {
1317 room_id,
1318 location: Some(proto::ParticipantLocation {
1319 variant: Some(location),
1320 }),
1321 })
1322 .await?;
1323 Ok(())
1324 })
1325 }
1326
1327 pub fn is_sharing_screen(&self) -> bool {
1328 self.live_kit
1329 .as_ref()
1330 .is_some_and(|live_kit| !matches!(live_kit.screen_track, LocalTrack::None))
1331 }
1332
1333 pub fn shared_screen_id(&self) -> Option<u64> {
1334 self.live_kit.as_ref().and_then(|lk| match lk.screen_track {
1335 LocalTrack::Published { ref _stream, .. } => {
1336 _stream.metadata().ok().map(|meta| meta.id)
1337 }
1338 _ => None,
1339 })
1340 }
1341
1342 pub fn is_sharing_mic(&self) -> bool {
1343 self.live_kit
1344 .as_ref()
1345 .is_some_and(|live_kit| !matches!(live_kit.microphone_track, LocalTrack::None))
1346 }
1347
1348 pub fn is_muted(&self) -> bool {
1349 self.live_kit.as_ref().is_some_and(|live_kit| {
1350 matches!(live_kit.microphone_track, LocalTrack::None)
1351 || live_kit.muted_by_user
1352 || live_kit.deafened
1353 })
1354 }
1355
1356 pub fn muted_by_user(&self) -> bool {
1357 self.live_kit
1358 .as_ref()
1359 .is_some_and(|live_kit| live_kit.muted_by_user)
1360 }
1361
1362 pub fn is_speaking(&self) -> bool {
1363 self.live_kit
1364 .as_ref()
1365 .is_some_and(|live_kit| live_kit.speaking)
1366 }
1367
1368 pub fn is_deafened(&self) -> Option<bool> {
1369 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1370 }
1371
1372 pub fn can_use_microphone(&self) -> bool {
1373 use proto::ChannelRole::*;
1374
1375 match self.local_participant.role {
1376 Admin | Member | Talker => true,
1377 Guest | Banned => false,
1378 }
1379 }
1380
1381 pub fn can_share_projects(&self) -> bool {
1382 use proto::ChannelRole::*;
1383 match self.local_participant.role {
1384 Admin | Member => true,
1385 Guest | Banned | Talker => false,
1386 }
1387 }
1388
1389 #[track_caller]
1390 pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1391 if self.status.is_offline() {
1392 return Task::ready(Err(anyhow!("room is offline")));
1393 }
1394
1395 let (room, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1396 let publish_id = post_inc(&mut live_kit.next_publish_id);
1397 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1398 cx.notify();
1399 (live_kit.room.clone(), publish_id)
1400 } else {
1401 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1402 };
1403
1404 let is_staff = cx.is_staff();
1405 let user_name = self
1406 .user_store
1407 .read(cx)
1408 .current_user()
1409 .and_then(|user| user.name.clone())
1410 .unwrap_or_else(|| "unknown".to_string());
1411
1412 cx.spawn(async move |this, cx| {
1413 let publication = room
1414 .publish_local_microphone_track(user_name, is_staff, cx)
1415 .await;
1416 this.update(cx, |this, cx| {
1417 let live_kit = this
1418 .live_kit
1419 .as_mut()
1420 .context("live-kit was not initialized")?;
1421
1422 let canceled = if let LocalTrack::Pending {
1423 publish_id: cur_publish_id,
1424 } = &live_kit.microphone_track
1425 {
1426 *cur_publish_id != publish_id
1427 } else {
1428 true
1429 };
1430
1431 match publication {
1432 Ok((publication, stream, input_lag_us)) => {
1433 if canceled {
1434 cx.spawn(async move |_, cx| {
1435 room.unpublish_local_track(publication.sid(), cx).await
1436 })
1437 .detach_and_log_err(cx)
1438 } else {
1439 if live_kit.muted_by_user || live_kit.deafened {
1440 publication.mute(cx);
1441 }
1442 live_kit.input_lag_us = Some(input_lag_us);
1443 live_kit.microphone_track = LocalTrack::Published {
1444 track_publication: publication,
1445 _stream: Box::new(stream),
1446 };
1447 cx.notify();
1448 }
1449 Ok(())
1450 }
1451 Err(error) => {
1452 if canceled {
1453 Ok(())
1454 } else {
1455 live_kit.microphone_track = LocalTrack::None;
1456 cx.notify();
1457 Err(error)
1458 }
1459 }
1460 }
1461 })?
1462 })
1463 }
1464
1465 pub fn share_screen(
1466 &mut self,
1467 source: Rc<dyn ScreenCaptureSource>,
1468 cx: &mut Context<Self>,
1469 ) -> Task<Result<()>> {
1470 if self.status.is_offline() {
1471 return Task::ready(Err(anyhow!("room is offline")));
1472 }
1473 if self.is_sharing_screen() {
1474 return Task::ready(Err(anyhow!("screen was already shared")));
1475 }
1476
1477 let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1478 let publish_id = post_inc(&mut live_kit.next_publish_id);
1479 live_kit.screen_track = LocalTrack::Pending { publish_id };
1480 cx.notify();
1481 (live_kit.room.local_participant(), publish_id)
1482 } else {
1483 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1484 };
1485
1486 cx.spawn(async move |this, cx| {
1487 let publication = participant.publish_screenshare_track(&*source, cx).await;
1488
1489 this.update(cx, |this, cx| {
1490 let live_kit = this
1491 .live_kit
1492 .as_mut()
1493 .context("live-kit was not initialized")?;
1494
1495 let canceled = if let LocalTrack::Pending {
1496 publish_id: cur_publish_id,
1497 } = &live_kit.screen_track
1498 {
1499 *cur_publish_id != publish_id
1500 } else {
1501 true
1502 };
1503
1504 match publication {
1505 Ok((publication, stream)) => {
1506 if canceled {
1507 cx.spawn(async move |_, cx| {
1508 participant.unpublish_track(publication.sid(), cx).await
1509 })
1510 .detach()
1511 } else {
1512 live_kit.screen_track = LocalTrack::Published {
1513 track_publication: publication,
1514 _stream: stream,
1515 };
1516 cx.notify();
1517 }
1518
1519 Audio::play_sound(Sound::StartScreenshare, cx);
1520 Ok(())
1521 }
1522 Err(error) => {
1523 if canceled {
1524 Ok(())
1525 } else {
1526 live_kit.screen_track = LocalTrack::None;
1527 cx.notify();
1528 Err(error)
1529 }
1530 }
1531 }
1532 })?
1533 })
1534 }
1535
1536 #[cfg(target_os = "linux")]
1537 pub fn share_screen_wayland(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1538 log::info!("will screenshare on wayland");
1539 if self.status.is_offline() {
1540 return Task::ready(Err(anyhow!("room is offline")));
1541 }
1542 if self.is_sharing_screen() {
1543 return Task::ready(Err(anyhow!("screen was already shared")));
1544 }
1545
1546 let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1547 let publish_id = post_inc(&mut live_kit.next_publish_id);
1548 live_kit.screen_track = LocalTrack::Pending { publish_id };
1549 cx.notify();
1550 (live_kit.room.local_participant(), publish_id)
1551 } else {
1552 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1553 };
1554
1555 cx.spawn(async move |this, cx| {
1556 let publication = participant.publish_screenshare_track_wayland(cx).await;
1557
1558 this.update(cx, |this, cx| {
1559 let live_kit = this
1560 .live_kit
1561 .as_mut()
1562 .context("live-kit was not initialized")?;
1563
1564 let canceled = if let LocalTrack::Pending {
1565 publish_id: cur_publish_id,
1566 } = &live_kit.screen_track
1567 {
1568 *cur_publish_id != publish_id
1569 } else {
1570 true
1571 };
1572
1573 match publication {
1574 Ok((publication, stream, failure_rx)) => {
1575 if canceled {
1576 cx.spawn(async move |_, cx| {
1577 participant.unpublish_track(publication.sid(), cx).await
1578 })
1579 .detach()
1580 } else {
1581 cx.spawn(async move |this, cx| {
1582 if failure_rx.await.is_ok() {
1583 log::warn!("Wayland capture died, auto-unsharing screen");
1584 let _ =
1585 this.update(cx, |this, cx| this.unshare_screen(false, cx));
1586 }
1587 })
1588 .detach();
1589
1590 live_kit.screen_track = LocalTrack::Published {
1591 track_publication: publication,
1592 _stream: stream,
1593 };
1594 cx.notify();
1595 }
1596
1597 Audio::play_sound(Sound::StartScreenshare, cx);
1598 Ok(())
1599 }
1600 Err(error) => {
1601 if canceled {
1602 Ok(())
1603 } else {
1604 live_kit.screen_track = LocalTrack::None;
1605 cx.notify();
1606 Err(error)
1607 }
1608 }
1609 }
1610 })?
1611 })
1612 }
1613
1614 pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1615 if let Some(live_kit) = self.live_kit.as_mut() {
1616 // When unmuting, undeafen if the user was deafened before.
1617 let was_deafened = live_kit.deafened;
1618 if live_kit.muted_by_user
1619 || live_kit.deafened
1620 || matches!(live_kit.microphone_track, LocalTrack::None)
1621 {
1622 live_kit.muted_by_user = false;
1623 live_kit.deafened = false;
1624 } else {
1625 live_kit.muted_by_user = true;
1626 }
1627 let muted = live_kit.muted_by_user;
1628 let should_undeafen = was_deafened && !live_kit.deafened;
1629
1630 if let Some(task) = self.set_mute(muted, cx) {
1631 task.detach_and_log_err(cx);
1632 }
1633
1634 if should_undeafen {
1635 self.set_deafened(false, cx);
1636 }
1637 }
1638 }
1639
1640 pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1641 if let Some(live_kit) = self.live_kit.as_mut() {
1642 // When deafening, mute the microphone if it was not already muted.
1643 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1644 let deafened = !live_kit.deafened;
1645 live_kit.deafened = deafened;
1646 let should_change_mute = !live_kit.muted_by_user;
1647
1648 self.set_deafened(deafened, cx);
1649
1650 if should_change_mute && let Some(task) = self.set_mute(deafened, cx) {
1651 task.detach_and_log_err(cx);
1652 }
1653 }
1654 }
1655
1656 pub fn unshare_screen(&mut self, play_sound: bool, cx: &mut Context<Self>) -> Result<()> {
1657 anyhow::ensure!(!self.status.is_offline(), "room is offline");
1658
1659 let live_kit = self
1660 .live_kit
1661 .as_mut()
1662 .context("live-kit was not initialized")?;
1663 match mem::take(&mut live_kit.screen_track) {
1664 LocalTrack::None => anyhow::bail!("screen was not shared"),
1665 LocalTrack::Pending { .. } => {
1666 cx.notify();
1667 Ok(())
1668 }
1669 LocalTrack::Published {
1670 track_publication, ..
1671 } => {
1672 {
1673 let local_participant = live_kit.room.local_participant();
1674 let sid = track_publication.sid();
1675 cx.spawn(async move |_, cx| local_participant.unpublish_track(sid, cx).await)
1676 .detach_and_log_err(cx);
1677 cx.notify();
1678 }
1679
1680 if play_sound {
1681 Audio::play_sound(Sound::StopScreenshare, cx);
1682 }
1683
1684 Ok(())
1685 }
1686 }
1687 }
1688
1689 fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<()> {
1690 {
1691 let live_kit = self.live_kit.as_mut()?;
1692 cx.notify();
1693 for (_, participant) in live_kit.room.remote_participants() {
1694 for (_, publication) in participant.track_publications() {
1695 if publication.is_audio() {
1696 publication.set_enabled(!deafened, cx);
1697 }
1698 }
1699 }
1700 }
1701
1702 None
1703 }
1704
1705 fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1706 let live_kit = self.live_kit.as_mut()?;
1707 cx.notify();
1708
1709 if should_mute {
1710 Audio::play_sound(Sound::Mute, cx);
1711 } else {
1712 Audio::play_sound(Sound::Unmute, cx);
1713 }
1714
1715 match &mut live_kit.microphone_track {
1716 LocalTrack::None => {
1717 if should_mute {
1718 None
1719 } else {
1720 Some(self.share_microphone(cx))
1721 }
1722 }
1723 LocalTrack::Pending { .. } => None,
1724 LocalTrack::Published {
1725 track_publication, ..
1726 } => {
1727 let guard = Tokio::handle(cx);
1728 if should_mute {
1729 track_publication.mute(cx)
1730 } else {
1731 track_publication.unmute(cx)
1732 }
1733 drop(guard);
1734
1735 None
1736 }
1737 }
1738 }
1739}
1740
1741fn spawn_room_connection(
1742 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
1743 cx: &mut Context<Room>,
1744) {
1745 if let Some(connection_info) = livekit_connection_info {
1746 cx.spawn(async move |this, cx| {
1747 let (room, mut events) =
1748 livekit::Room::connect(connection_info.server_url, connection_info.token, cx)
1749 .await?;
1750
1751 let weak_room = this.clone();
1752 this.update(cx, |this, cx| {
1753 let _handle_updates = cx.spawn(async move |this, cx| {
1754 while let Some(event) = events.next().await {
1755 if this
1756 .update(cx, |this, cx| {
1757 this.livekit_room_updated(event, cx).warn_on_err();
1758 })
1759 .is_err()
1760 {
1761 break;
1762 }
1763 }
1764 });
1765
1766 let muted_by_user = Room::mute_on_join(cx);
1767 this.live_kit = Some(LiveKitRoom {
1768 room: Rc::new(room),
1769 screen_track: LocalTrack::None,
1770 microphone_track: LocalTrack::None,
1771 input_lag_us: None,
1772 next_publish_id: 0,
1773 muted_by_user,
1774 deafened: false,
1775 speaking: false,
1776 _handle_updates,
1777 });
1778 this.diagnostics = Some(cx.new(|cx| CallDiagnostics::new(weak_room, cx)));
1779
1780 // Always open the microphone track on join, even when
1781 // `muted_by_user` is set. Note that the microphone will still
1782 // be muted, as it is still gated in `share_microphone` by
1783 // `muted_by_user`. For users that have `mute_on_join` enabled,
1784 // this moves the Bluetooth profile switch (A2DP -> HFP) (which
1785 // can cause 1-2 seconds of audio silence on some Bluetooth
1786 // headphones) from first unmute to channel join, where
1787 // instability is expected.
1788 if this.can_use_microphone() {
1789 this.share_microphone(cx)
1790 } else {
1791 Task::ready(Ok(()))
1792 }
1793 })?
1794 .await
1795 })
1796 .detach_and_log_err(cx);
1797 }
1798}
1799
1800struct LiveKitRoom {
1801 room: Rc<livekit::Room>,
1802 screen_track: LocalTrack<dyn ScreenCaptureStream>,
1803 microphone_track: LocalTrack<AudioStream>,
1804 /// Shared atomic storing the most recent input lag measurement in microseconds.
1805 /// Written by the audio capture/transmit pipeline, read here for diagnostics.
1806 input_lag_us: Option<Arc<AtomicU64>>,
1807 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1808 muted_by_user: bool,
1809 deafened: bool,
1810 speaking: bool,
1811 next_publish_id: usize,
1812 _handle_updates: Task<()>,
1813}
1814
1815impl LiveKitRoom {
1816 fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1817 let mut tracks_to_unpublish = Vec::new();
1818 if let LocalTrack::Published {
1819 track_publication, ..
1820 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1821 {
1822 tracks_to_unpublish.push(track_publication.sid());
1823 self.input_lag_us = None;
1824 cx.notify();
1825 }
1826
1827 if let LocalTrack::Published {
1828 track_publication, ..
1829 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1830 {
1831 tracks_to_unpublish.push(track_publication.sid());
1832 cx.notify();
1833 }
1834
1835 let participant = self.room.local_participant();
1836 cx.spawn(async move |_, cx| {
1837 for sid in tracks_to_unpublish {
1838 participant.unpublish_track(sid, cx).await.log_err();
1839 }
1840 })
1841 .detach();
1842 }
1843}
1844
1845#[derive(Default)]
1846enum LocalTrack<Stream: ?Sized> {
1847 #[default]
1848 None,
1849 Pending {
1850 publish_id: usize,
1851 },
1852 Published {
1853 track_publication: LocalTrackPublication,
1854 _stream: Box<Stream>,
1855 },
1856}
1857
1858#[derive(Copy, Clone, PartialEq, Eq)]
1859pub enum RoomStatus {
1860 Online,
1861 Rejoining,
1862 Offline,
1863}
1864
1865impl RoomStatus {
1866 pub fn is_offline(&self) -> bool {
1867 matches!(self, RoomStatus::Offline)
1868 }
1869
1870 pub fn is_online(&self) -> bool {
1871 matches!(self, RoomStatus::Online)
1872 }
1873}