1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, RemoteParticipant},
4};
5use anyhow::{Context as _, Result, anyhow};
6use audio::{Audio, Sound};
7use client::{
8 ChannelId, Client, ParticipantIndex, TypedEnvelope, User, UserStore,
9 proto::{self, PeerId},
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use feature_flags::FeatureFlagAppExt;
13use fs::Fs;
14use futures::StreamExt;
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, FutureExt as _,
17 ScreenCaptureSource, ScreenCaptureStream, Task, Timeout, WeakEntity,
18};
19use gpui_tokio::Tokio;
20use language::LanguageRegistry;
21use livekit::{LocalTrackPublication, ParticipantIdentity, RoomEvent};
22use livekit_client::{self as livekit, AudioStream, TrackSid};
23use postage::{sink::Sink, stream::Stream, watch};
24use project::Project;
25use settings::Settings as _;
26use std::sync::atomic::AtomicU64;
27use std::{future::Future, mem, rc::Rc, sync::Arc, time::Duration, time::Instant};
28
29use super::diagnostics::CallDiagnostics;
30use util::{ResultExt, TryFutureExt, paths::PathStyle, post_inc};
31use workspace::ParticipantLocation;
32
33pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
34
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub enum Event {
37 RoomJoined {
38 channel_id: Option<ChannelId>,
39 },
40 ParticipantLocationChanged {
41 participant_id: proto::PeerId,
42 },
43 RemoteVideoTracksChanged {
44 participant_id: proto::PeerId,
45 },
46 RemoteVideoTrackUnsubscribed {
47 sid: TrackSid,
48 },
49 RemoteAudioTracksChanged {
50 participant_id: proto::PeerId,
51 },
52 RemoteProjectShared {
53 owner: Arc<User>,
54 project_id: u64,
55 worktree_root_names: Vec<String>,
56 },
57 RemoteProjectUnshared {
58 project_id: u64,
59 },
60 RemoteProjectJoined {
61 project_id: u64,
62 },
63 RemoteProjectInvitationDiscarded {
64 project_id: u64,
65 },
66 RoomLeft {
67 channel_id: Option<ChannelId>,
68 },
69}
70
71pub struct Room {
72 id: u64,
73 channel_id: Option<ChannelId>,
74 live_kit: Option<LiveKitRoom>,
75 diagnostics: Option<Entity<CallDiagnostics>>,
76 status: RoomStatus,
77 shared_projects: HashSet<WeakEntity<Project>>,
78 joined_projects: HashSet<WeakEntity<Project>>,
79 local_participant: LocalParticipant,
80 remote_participants: BTreeMap<u64, RemoteParticipant>,
81 pending_participants: Vec<Arc<User>>,
82 participant_user_ids: HashSet<u64>,
83 pending_call_count: usize,
84 leave_when_empty: bool,
85 client: Arc<Client>,
86 user_store: Entity<UserStore>,
87 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
88 client_subscriptions: Vec<client::Subscription>,
89 _subscriptions: Vec<gpui::Subscription>,
90 room_update_completed_tx: watch::Sender<Option<()>>,
91 room_update_completed_rx: watch::Receiver<Option<()>>,
92 pending_room_update: Option<Task<()>>,
93 maintain_connection: Option<Task<Option<()>>>,
94 created: Instant,
95}
96
97impl EventEmitter<Event> for Room {}
98
99impl Room {
100 pub fn channel_id(&self) -> Option<ChannelId> {
101 self.channel_id
102 }
103
104 pub fn is_sharing_project(&self) -> bool {
105 !self.shared_projects.is_empty()
106 }
107
108 pub fn is_connected(&self, _: &App) -> bool {
109 if let Some(live_kit) = self.live_kit.as_ref() {
110 live_kit.room.connection_state() == livekit::ConnectionState::Connected
111 } else {
112 false
113 }
114 }
115
116 fn new(
117 id: u64,
118 channel_id: Option<ChannelId>,
119 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
120 client: Arc<Client>,
121 user_store: Entity<UserStore>,
122 cx: &mut Context<Self>,
123 ) -> Self {
124 spawn_room_connection(livekit_connection_info, cx);
125
126 let maintain_connection = cx.spawn({
127 let client = client.clone();
128 async move |this, cx| {
129 Self::maintain_connection(this, client.clone(), cx)
130 .log_err()
131 .await
132 }
133 });
134
135 Audio::play_sound(Sound::Joined, cx);
136
137 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
138
139 Self {
140 id,
141 channel_id,
142 live_kit: None,
143 diagnostics: None,
144 status: RoomStatus::Online,
145 shared_projects: Default::default(),
146 joined_projects: Default::default(),
147 participant_user_ids: Default::default(),
148 local_participant: Default::default(),
149 remote_participants: Default::default(),
150 pending_participants: Default::default(),
151 pending_call_count: 0,
152 client_subscriptions: vec![
153 client.add_message_handler(cx.weak_entity(), Self::handle_room_updated),
154 ],
155 _subscriptions: vec![
156 cx.on_release(Self::released),
157 cx.on_app_quit(Self::app_will_quit),
158 ],
159 leave_when_empty: false,
160 pending_room_update: None,
161 client,
162 user_store,
163 follows_by_leader_id_project_id: Default::default(),
164 maintain_connection: Some(maintain_connection),
165 room_update_completed_tx,
166 room_update_completed_rx,
167 created: cx.background_executor().now(),
168 }
169 }
170
171 pub(crate) fn create(
172 called_user_id: u64,
173 initial_project: Option<Entity<Project>>,
174 client: Arc<Client>,
175 user_store: Entity<UserStore>,
176 cx: &mut App,
177 ) -> Task<Result<Entity<Self>>> {
178 cx.spawn(async move |cx| {
179 let response = client.request(proto::CreateRoom {}).await?;
180 let room_proto = response.room.context("invalid room")?;
181 let room = cx.new(|cx| {
182 let mut room = Self::new(
183 room_proto.id,
184 None,
185 response.live_kit_connection_info,
186 client,
187 user_store,
188 cx,
189 );
190 if let Some(participant) = room_proto.participants.first() {
191 room.local_participant.role = participant.role()
192 }
193 room
194 });
195
196 let initial_project_id = if let Some(initial_project) = initial_project {
197 let initial_project_id = room
198 .update(cx, |room, cx| {
199 room.share_project(initial_project.clone(), cx)
200 })
201 .await?;
202 Some(initial_project_id)
203 } else {
204 None
205 };
206
207 let did_join = room
208 .update(cx, |room, cx| {
209 room.leave_when_empty = true;
210 room.call(called_user_id, initial_project_id, cx)
211 })
212 .await;
213 match did_join {
214 Ok(()) => Ok(room),
215 Err(error) => Err(error.context("room creation failed")),
216 }
217 })
218 }
219
220 pub(crate) async fn join_channel(
221 channel_id: ChannelId,
222 client: Arc<Client>,
223 user_store: Entity<UserStore>,
224 cx: AsyncApp,
225 ) -> Result<Entity<Self>> {
226 Self::from_join_response(
227 client
228 .request(proto::JoinChannel {
229 channel_id: channel_id.0,
230 })
231 .await?,
232 client,
233 user_store,
234 cx,
235 )
236 }
237
238 pub(crate) async fn join(
239 room_id: u64,
240 client: Arc<Client>,
241 user_store: Entity<UserStore>,
242 cx: AsyncApp,
243 ) -> Result<Entity<Self>> {
244 Self::from_join_response(
245 client.request(proto::JoinRoom { id: room_id }).await?,
246 client,
247 user_store,
248 cx,
249 )
250 }
251
252 fn released(&mut self, cx: &mut App) {
253 if self.status.is_online() {
254 self.leave_internal(cx).detach_and_log_err(cx);
255 }
256 }
257
258 fn app_will_quit(&mut self, cx: &mut Context<Self>) -> impl Future<Output = ()> + use<> {
259 let task = if self.status.is_online() {
260 let leave = self.leave_internal(cx);
261 Some(cx.background_spawn(async move {
262 leave.await.log_err();
263 }))
264 } else {
265 None
266 };
267
268 async move {
269 if let Some(task) = task {
270 task.await;
271 }
272 }
273 }
274
275 pub fn mute_on_join(cx: &App) -> bool {
276 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
277 }
278
279 fn from_join_response(
280 response: proto::JoinRoomResponse,
281 client: Arc<Client>,
282 user_store: Entity<UserStore>,
283 mut cx: AsyncApp,
284 ) -> Result<Entity<Self>> {
285 let room_proto = response.room.context("invalid room")?;
286 let room = cx.new(|cx| {
287 Self::new(
288 room_proto.id,
289 response.channel_id.map(ChannelId),
290 response.live_kit_connection_info,
291 client,
292 user_store,
293 cx,
294 )
295 });
296 room.update(&mut cx, |room, cx| {
297 room.leave_when_empty = room.channel_id.is_none();
298 room.apply_room_update(room_proto, cx)?;
299 anyhow::Ok(())
300 })?;
301 Ok(room)
302 }
303
304 fn should_leave(&self) -> bool {
305 self.leave_when_empty
306 && self.pending_room_update.is_none()
307 && self.pending_participants.is_empty()
308 && self.remote_participants.is_empty()
309 && self.pending_call_count == 0
310 }
311
312 pub(crate) fn leave(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
313 cx.notify();
314 self.emit_video_track_unsubscribed_events(cx);
315 self.leave_internal(cx)
316 }
317
318 fn leave_internal(&mut self, cx: &mut App) -> Task<Result<()>> {
319 if self.status.is_offline() {
320 return Task::ready(Err(anyhow!("room is offline")));
321 }
322
323 log::info!("leaving room");
324 Audio::play_sound(Sound::Leave, cx);
325
326 self.clear_state(cx);
327
328 let leave_room = self.client.request(proto::LeaveRoom {});
329 cx.background_spawn(async move {
330 leave_room.await?;
331 anyhow::Ok(())
332 })
333 }
334
335 pub(crate) fn clear_state(&mut self, cx: &mut App) {
336 for project in self.shared_projects.drain() {
337 if let Some(project) = project.upgrade() {
338 project.update(cx, |project, cx| {
339 project.unshare(cx).log_err();
340 });
341 }
342 }
343 for project in self.joined_projects.drain() {
344 if let Some(project) = project.upgrade() {
345 project.update(cx, |project, cx| {
346 project.disconnected_from_host(cx);
347 project.close(cx);
348 });
349 }
350 }
351
352 self.status = RoomStatus::Offline;
353 self.remote_participants.clear();
354 self.pending_participants.clear();
355 self.participant_user_ids.clear();
356 self.client_subscriptions.clear();
357 self.live_kit.take();
358 self.diagnostics.take();
359 self.pending_room_update.take();
360 self.maintain_connection.take();
361 }
362
363 fn emit_video_track_unsubscribed_events(&self, cx: &mut Context<Self>) {
364 for participant in self.remote_participants.values() {
365 for sid in participant.video_tracks.keys() {
366 cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: sid.clone() });
367 }
368 }
369 }
370
371 async fn maintain_connection(
372 this: WeakEntity<Self>,
373 client: Arc<Client>,
374 cx: &mut AsyncApp,
375 ) -> Result<()> {
376 let mut client_status = client.status();
377 loop {
378 let _ = client_status.try_recv();
379 let is_connected = client_status.borrow().is_connected();
380 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
381 if !is_connected || client_status.next().await.is_some() {
382 log::info!("detected client disconnection");
383
384 this.upgrade()
385 .context("room was dropped")?
386 .update(cx, |this, cx| {
387 this.status = RoomStatus::Rejoining;
388 cx.notify();
389 });
390
391 // Wait for client to re-establish a connection to the server.
392 let executor = cx.background_executor().clone();
393 let client_reconnection = async {
394 let mut remaining_attempts = 3;
395 while remaining_attempts > 0 {
396 if client_status.borrow().is_connected() {
397 log::info!("client reconnected, attempting to rejoin room");
398
399 let Some(this) = this.upgrade() else { break };
400 let task = this.update(cx, |this, cx| this.rejoin(cx));
401 if task.await.log_err().is_some() {
402 return true;
403 } else {
404 remaining_attempts -= 1;
405 }
406 } else if client_status.borrow().is_signed_out() {
407 return false;
408 }
409
410 log::info!(
411 "waiting for client status change, remaining attempts {}",
412 remaining_attempts
413 );
414 client_status.next().await;
415 }
416 false
417 };
418
419 match client_reconnection
420 .with_timeout(RECONNECT_TIMEOUT, &executor)
421 .await
422 {
423 Ok(true) => {
424 log::info!("successfully reconnected to room");
425 // If we successfully joined the room, go back around the loop
426 // waiting for future connection status changes.
427 continue;
428 }
429 Ok(false) => break,
430 Err(Timeout) => {
431 log::info!("room reconnection timeout expired");
432 break;
433 }
434 }
435 }
436 }
437
438 // The client failed to re-establish a connection to the server
439 // or an error occurred while trying to re-join the room. Either way
440 // we leave the room and return an error.
441 if let Some(this) = this.upgrade() {
442 log::info!("reconnection failed, leaving room");
443 this.update(cx, |this, cx| this.leave(cx)).await?;
444 }
445 anyhow::bail!("can't reconnect to room: client failed to re-establish connection");
446 }
447
448 fn rejoin(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
449 let mut projects = HashMap::default();
450 let mut reshared_projects = Vec::new();
451 let mut rejoined_projects = Vec::new();
452 self.shared_projects.retain(|project| {
453 if let Some(handle) = project.upgrade() {
454 let project = handle.read(cx);
455 if let Some(project_id) = project.remote_id() {
456 projects.insert(project_id, handle.clone());
457 reshared_projects.push(proto::UpdateProject {
458 project_id,
459 worktrees: project.worktree_metadata_protos(cx),
460 });
461 return true;
462 }
463 }
464 false
465 });
466 self.joined_projects.retain(|project| {
467 if let Some(handle) = project.upgrade() {
468 let project = handle.read(cx);
469 if let Some(project_id) = project.remote_id() {
470 projects.insert(project_id, handle.clone());
471 let mut worktrees = Vec::new();
472 let mut repositories = Vec::new();
473 for worktree in project.worktrees(cx) {
474 let worktree = worktree.read(cx);
475 worktrees.push(proto::RejoinWorktree {
476 id: worktree.id().to_proto(),
477 scan_id: worktree.completed_scan_id() as u64,
478 });
479 }
480 for (entry_id, repository) in project.repositories(cx) {
481 let repository = repository.read(cx);
482 repositories.push(proto::RejoinRepository {
483 id: entry_id.to_proto(),
484 scan_id: repository.scan_id,
485 });
486 }
487
488 rejoined_projects.push(proto::RejoinProject {
489 id: project_id,
490 worktrees,
491 repositories,
492 });
493 }
494 return true;
495 }
496 false
497 });
498
499 let response = self.client.request_envelope(proto::RejoinRoom {
500 id: self.id,
501 reshared_projects,
502 rejoined_projects,
503 });
504
505 cx.spawn(async move |this, cx| {
506 let response = response.await?;
507 let message_id = response.message_id;
508 let response = response.payload;
509 let room_proto = response.room.context("invalid room")?;
510 this.update(cx, |this, cx| {
511 this.status = RoomStatus::Online;
512 this.apply_room_update(room_proto, cx)?;
513
514 for reshared_project in response.reshared_projects {
515 if let Some(project) = projects.get(&reshared_project.id) {
516 project.update(cx, |project, cx| {
517 project.reshared(reshared_project, cx).log_err();
518 });
519 }
520 }
521
522 for rejoined_project in response.rejoined_projects {
523 if let Some(project) = projects.get(&rejoined_project.id) {
524 project.update(cx, |project, cx| {
525 project.rejoined(rejoined_project, message_id, cx).log_err();
526 });
527 }
528 }
529
530 anyhow::Ok(())
531 })?
532 })
533 }
534
535 pub fn id(&self) -> u64 {
536 self.id
537 }
538
539 pub fn room_id(&self) -> impl Future<Output = Option<String>> + 'static {
540 let room = self.live_kit.as_ref().map(|lk| lk.room.clone());
541 async move {
542 let room = room?;
543 let sid = room.sid().await;
544 let name = room.name();
545 Some(format!("{} (sid: {sid})", name))
546 }
547 }
548
549 pub fn get_stats(&self, cx: &App) -> Task<Option<livekit::SessionStats>> {
550 match self.live_kit.as_ref() {
551 Some(lk) => {
552 let task = lk.room.stats_task(cx);
553 cx.background_executor()
554 .spawn(async move { task.await.ok() })
555 }
556 None => Task::ready(None),
557 }
558 }
559
560 pub fn input_lag(&self) -> Option<Duration> {
561 let us = self
562 .live_kit
563 .as_ref()?
564 .input_lag_us
565 .as_ref()?
566 .load(std::sync::atomic::Ordering::Relaxed);
567 if us > 0 {
568 Some(Duration::from_micros(us))
569 } else {
570 None
571 }
572 }
573
574 pub fn diagnostics(&self) -> Option<&Entity<CallDiagnostics>> {
575 self.diagnostics.as_ref()
576 }
577
578 pub fn connection_quality(&self) -> livekit::ConnectionQuality {
579 self.live_kit
580 .as_ref()
581 .map(|lk| lk.room.local_participant().connection_quality())
582 .unwrap_or(livekit::ConnectionQuality::Lost)
583 }
584
585 pub fn status(&self) -> RoomStatus {
586 self.status
587 }
588
589 pub fn local_participant(&self) -> &LocalParticipant {
590 &self.local_participant
591 }
592
593 pub fn local_participant_user(&self, cx: &App) -> Option<Arc<User>> {
594 self.user_store.read(cx).current_user()
595 }
596
597 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
598 &self.remote_participants
599 }
600
601 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
602 self.remote_participants
603 .values()
604 .find(|p| p.peer_id == peer_id)
605 }
606
607 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
608 self.remote_participants
609 .get(&user_id)
610 .map(|participant| participant.role)
611 }
612
613 pub fn contains_guests(&self) -> bool {
614 self.local_participant.role == proto::ChannelRole::Guest
615 || self
616 .remote_participants
617 .values()
618 .any(|p| p.role == proto::ChannelRole::Guest)
619 }
620
621 pub fn local_participant_is_admin(&self) -> bool {
622 self.local_participant.role == proto::ChannelRole::Admin
623 }
624
625 pub fn local_participant_is_guest(&self) -> bool {
626 self.local_participant.role == proto::ChannelRole::Guest
627 }
628
629 pub fn set_participant_role(
630 &mut self,
631 user_id: u64,
632 role: proto::ChannelRole,
633 cx: &Context<Self>,
634 ) -> Task<Result<()>> {
635 let client = self.client.clone();
636 let room_id = self.id;
637 let role = role.into();
638 cx.spawn(async move |_, _| {
639 client
640 .request(proto::SetRoomParticipantRole {
641 room_id,
642 user_id,
643 role,
644 })
645 .await
646 .map(|_| ())
647 })
648 }
649
650 pub fn pending_participants(&self) -> &[Arc<User>] {
651 &self.pending_participants
652 }
653
654 pub fn contains_participant(&self, user_id: u64) -> bool {
655 self.participant_user_ids.contains(&user_id)
656 }
657
658 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
659 self.follows_by_leader_id_project_id
660 .get(&(leader_id, project_id))
661 .map_or(&[], |v| v.as_slice())
662 }
663
664 /// Returns the most 'active' projects, defined as most people in the project
665 pub fn most_active_project(&self, cx: &App) -> Option<(u64, u64)> {
666 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
667 for participant in self.remote_participants.values() {
668 match participant.location {
669 ParticipantLocation::SharedProject { project_id } => {
670 project_hosts_and_guest_counts
671 .entry(project_id)
672 .or_default()
673 .1 += 1;
674 }
675 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
676 }
677 for project in &participant.projects {
678 project_hosts_and_guest_counts
679 .entry(project.id)
680 .or_default()
681 .0 = Some(participant.user.id);
682 }
683 }
684
685 if let Some(user) = self.user_store.read(cx).current_user() {
686 for project in &self.local_participant.projects {
687 project_hosts_and_guest_counts
688 .entry(project.id)
689 .or_default()
690 .0 = Some(user.id);
691 }
692 }
693
694 project_hosts_and_guest_counts
695 .into_iter()
696 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
697 .max_by_key(|(_, _, guest_count)| *guest_count)
698 .map(|(id, host, _)| (id, host))
699 }
700
701 async fn handle_room_updated(
702 this: Entity<Self>,
703 envelope: TypedEnvelope<proto::RoomUpdated>,
704 mut cx: AsyncApp,
705 ) -> Result<()> {
706 let room = envelope.payload.room.context("invalid room")?;
707 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))
708 }
709
710 fn apply_room_update(&mut self, room: proto::Room, cx: &mut Context<Self>) -> Result<()> {
711 log::trace!(
712 "client {:?}. room update: {:?}",
713 self.client.user_id(),
714 &room
715 );
716
717 self.pending_room_update = Some(self.start_room_connection(room, cx));
718
719 cx.notify();
720 Ok(())
721 }
722
723 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> + use<> {
724 let mut done_rx = self.room_update_completed_rx.clone();
725 async move {
726 while let Some(result) = done_rx.next().await {
727 if result.is_some() {
728 break;
729 }
730 }
731 }
732 }
733
734 fn start_room_connection(&self, mut room: proto::Room, cx: &mut Context<Self>) -> Task<()> {
735 // Filter ourselves out from the room's participants.
736 let local_participant_ix = room
737 .participants
738 .iter()
739 .position(|participant| Some(participant.user_id) == self.client.user_id());
740 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
741
742 let pending_participant_user_ids = room
743 .pending_participants
744 .iter()
745 .map(|p| p.user_id)
746 .collect::<Vec<_>>();
747
748 let remote_participant_user_ids = room
749 .participants
750 .iter()
751 .map(|p| p.user_id)
752 .collect::<Vec<_>>();
753
754 let (remote_participants, pending_participants) =
755 self.user_store.update(cx, move |user_store, cx| {
756 (
757 user_store.get_users(remote_participant_user_ids, cx),
758 user_store.get_users(pending_participant_user_ids, cx),
759 )
760 });
761 cx.spawn(async move |this, cx| {
762 let (remote_participants, pending_participants) =
763 futures::join!(remote_participants, pending_participants);
764
765 this.update(cx, |this, cx| {
766 this.participant_user_ids.clear();
767
768 if let Some(participant) = local_participant {
769 let role = participant.role();
770 this.local_participant.projects = participant.projects;
771 if this.local_participant.role != role {
772 this.local_participant.role = role;
773
774 if role == proto::ChannelRole::Guest {
775 for project in mem::take(&mut this.shared_projects) {
776 if let Some(project) = project.upgrade() {
777 this.unshare_project(project, cx).log_err();
778 }
779 }
780 this.local_participant.projects.clear();
781 if let Some(livekit_room) = &mut this.live_kit {
782 livekit_room.stop_publishing(cx);
783 }
784 }
785
786 this.joined_projects.retain(|project| {
787 if let Some(project) = project.upgrade() {
788 project.update(cx, |project, cx| project.set_role(role, cx));
789 true
790 } else {
791 false
792 }
793 });
794 }
795 } else {
796 this.local_participant.projects.clear();
797 }
798
799 let livekit_participants = this
800 .live_kit
801 .as_ref()
802 .map(|live_kit| live_kit.room.remote_participants());
803
804 if let Some(participants) = remote_participants.log_err() {
805 for (participant, user) in room.participants.into_iter().zip(participants) {
806 let Some(peer_id) = participant.peer_id else {
807 continue;
808 };
809 let participant_index = ParticipantIndex(participant.participant_index);
810 this.participant_user_ids.insert(participant.user_id);
811
812 let old_projects = this
813 .remote_participants
814 .get(&participant.user_id)
815 .into_iter()
816 .flat_map(|existing| &existing.projects)
817 .map(|project| project.id)
818 .collect::<HashSet<_>>();
819 let new_projects = participant
820 .projects
821 .iter()
822 .map(|project| project.id)
823 .collect::<HashSet<_>>();
824
825 for project in &participant.projects {
826 if !old_projects.contains(&project.id) {
827 cx.emit(Event::RemoteProjectShared {
828 owner: user.clone(),
829 project_id: project.id,
830 worktree_root_names: project.worktree_root_names.clone(),
831 });
832 }
833 }
834
835 for unshared_project_id in old_projects.difference(&new_projects) {
836 this.joined_projects.retain(|project| {
837 if let Some(project) = project.upgrade() {
838 project.update(cx, |project, cx| {
839 if project.remote_id() == Some(*unshared_project_id) {
840 project.disconnected_from_host(cx);
841 false
842 } else {
843 true
844 }
845 })
846 } else {
847 false
848 }
849 });
850 cx.emit(Event::RemoteProjectUnshared {
851 project_id: *unshared_project_id,
852 });
853 }
854
855 let role = participant.role();
856 let location = ParticipantLocation::from_proto(participant.location)
857 .unwrap_or(ParticipantLocation::External);
858 if let Some(remote_participant) =
859 this.remote_participants.get_mut(&participant.user_id)
860 {
861 remote_participant.peer_id = peer_id;
862 remote_participant.projects = participant.projects;
863 remote_participant.participant_index = participant_index;
864 if location != remote_participant.location
865 || role != remote_participant.role
866 {
867 remote_participant.location = location;
868 remote_participant.role = role;
869 cx.emit(Event::ParticipantLocationChanged {
870 participant_id: peer_id,
871 });
872 }
873 } else {
874 this.remote_participants.insert(
875 participant.user_id,
876 RemoteParticipant {
877 user: user.clone(),
878 participant_index,
879 peer_id,
880 projects: participant.projects,
881 location,
882 role,
883 muted: true,
884 speaking: false,
885 video_tracks: Default::default(),
886 audio_tracks: Default::default(),
887 },
888 );
889
890 // When joining a room start_room_connection gets
891 // called but we have already played the join sound.
892 // Dont play extra sounds over that.
893 if this.created.elapsed() > Duration::from_millis(100) {
894 if let proto::ChannelRole::Guest = role {
895 Audio::play_sound(Sound::GuestJoined, cx);
896 } else {
897 Audio::play_sound(Sound::Joined, cx);
898 }
899 }
900
901 if let Some(livekit_participants) = &livekit_participants
902 && let Some(livekit_participant) = livekit_participants
903 .get(&ParticipantIdentity(user.id.to_string()))
904 {
905 for publication in
906 livekit_participant.track_publications().into_values()
907 {
908 if let Some(track) = publication.track() {
909 this.livekit_room_updated(
910 RoomEvent::TrackSubscribed {
911 track,
912 publication,
913 participant: livekit_participant.clone(),
914 },
915 cx,
916 )
917 .warn_on_err();
918 }
919 }
920 }
921 }
922 }
923
924 this.remote_participants.retain(|user_id, participant| {
925 if this.participant_user_ids.contains(user_id) {
926 true
927 } else {
928 for project in &participant.projects {
929 cx.emit(Event::RemoteProjectUnshared {
930 project_id: project.id,
931 });
932 }
933 for sid in participant.video_tracks.keys() {
934 cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: sid.clone() });
935 }
936 false
937 }
938 });
939 }
940
941 if let Some(pending_participants) = pending_participants.log_err() {
942 this.pending_participants = pending_participants;
943 for participant in &this.pending_participants {
944 this.participant_user_ids.insert(participant.id);
945 }
946 }
947
948 this.follows_by_leader_id_project_id.clear();
949 for follower in room.followers {
950 let project_id = follower.project_id;
951 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
952 (Some(leader), Some(follower)) => (leader, follower),
953
954 _ => {
955 log::error!("Follower message {follower:?} missing some state");
956 continue;
957 }
958 };
959
960 let list = this
961 .follows_by_leader_id_project_id
962 .entry((leader, project_id))
963 .or_default();
964 if !list.contains(&follower) {
965 list.push(follower);
966 }
967 }
968
969 this.pending_room_update.take();
970 if this.should_leave() {
971 log::info!("room is empty, leaving");
972 this.leave(cx).detach();
973 }
974
975 this.user_store.update(cx, |user_store, cx| {
976 let participant_indices_by_user_id = this
977 .remote_participants
978 .iter()
979 .map(|(user_id, participant)| (*user_id, participant.participant_index))
980 .collect();
981 user_store.set_participant_indices(participant_indices_by_user_id, cx);
982 });
983
984 this.check_invariants();
985 this.room_update_completed_tx.try_send(Some(())).ok();
986 cx.notify();
987 })
988 .ok();
989 })
990 }
991
992 fn livekit_room_updated(&mut self, event: RoomEvent, cx: &mut Context<Self>) -> Result<()> {
993 log::trace!(
994 "client {:?}. livekit event: {:?}",
995 self.client.user_id(),
996 &event
997 );
998
999 match event {
1000 RoomEvent::TrackSubscribed {
1001 track,
1002 participant,
1003 publication,
1004 } => {
1005 let user_id = participant.identity().0.parse()?;
1006 let track_id = track.sid();
1007 let participant =
1008 self.remote_participants
1009 .get_mut(&user_id)
1010 .with_context(|| {
1011 format!(
1012 "{:?} subscribed to track by unknown participant {user_id}",
1013 self.client.user_id()
1014 )
1015 })?;
1016 if self.live_kit.as_ref().is_none_or(|kit| kit.deafened) && publication.is_audio() {
1017 publication.set_enabled(false, cx);
1018 }
1019 match track {
1020 livekit_client::RemoteTrack::Audio(track) => {
1021 cx.emit(Event::RemoteAudioTracksChanged {
1022 participant_id: participant.peer_id,
1023 });
1024 if let Some(live_kit) = self.live_kit.as_ref() {
1025 let stream = live_kit.room.play_remote_audio_track(&track, cx)?;
1026 participant.audio_tracks.insert(track_id, (track, stream));
1027 participant.muted = publication.is_muted();
1028 }
1029 }
1030 livekit_client::RemoteTrack::Video(track) => {
1031 cx.emit(Event::RemoteVideoTracksChanged {
1032 participant_id: participant.peer_id,
1033 });
1034 participant.video_tracks.insert(track_id, track);
1035 }
1036 }
1037 }
1038
1039 RoomEvent::TrackUnsubscribed {
1040 track, participant, ..
1041 } => {
1042 let user_id = participant.identity().0.parse()?;
1043 let participant =
1044 self.remote_participants
1045 .get_mut(&user_id)
1046 .with_context(|| {
1047 format!(
1048 "{:?}, unsubscribed from track by unknown participant {user_id}",
1049 self.client.user_id()
1050 )
1051 })?;
1052 match track {
1053 livekit_client::RemoteTrack::Audio(track) => {
1054 participant.audio_tracks.remove(&track.sid());
1055 participant.muted = true;
1056 cx.emit(Event::RemoteAudioTracksChanged {
1057 participant_id: participant.peer_id,
1058 });
1059 }
1060 livekit_client::RemoteTrack::Video(track) => {
1061 participant.video_tracks.remove(&track.sid());
1062 cx.emit(Event::RemoteVideoTracksChanged {
1063 participant_id: participant.peer_id,
1064 });
1065 cx.emit(Event::RemoteVideoTrackUnsubscribed { sid: track.sid() });
1066 }
1067 }
1068 }
1069
1070 RoomEvent::ActiveSpeakersChanged { speakers } => {
1071 let mut speaker_ids = speakers
1072 .into_iter()
1073 .filter_map(|speaker| speaker.identity().0.parse().ok())
1074 .collect::<Vec<u64>>();
1075 speaker_ids.sort_unstable();
1076 for (sid, participant) in &mut self.remote_participants {
1077 participant.speaking = speaker_ids.binary_search(sid).is_ok();
1078 }
1079 if let Some(id) = self.client.user_id()
1080 && let Some(room) = &mut self.live_kit
1081 {
1082 room.speaking = speaker_ids.binary_search(&id).is_ok();
1083 }
1084 }
1085
1086 RoomEvent::TrackMuted {
1087 participant,
1088 publication,
1089 }
1090 | RoomEvent::TrackUnmuted {
1091 participant,
1092 publication,
1093 } => {
1094 let mut found = false;
1095 let user_id = participant.identity().0.parse()?;
1096 let track_id = publication.sid();
1097 if let Some(participant) = self.remote_participants.get_mut(&user_id) {
1098 for (track, _) in participant.audio_tracks.values() {
1099 if track.sid() == track_id {
1100 found = true;
1101 break;
1102 }
1103 }
1104 if found {
1105 participant.muted = publication.is_muted();
1106 }
1107 }
1108 }
1109
1110 RoomEvent::LocalTrackUnpublished { publication, .. } => {
1111 log::info!("unpublished track {}", publication.sid());
1112 if let Some(room) = &mut self.live_kit {
1113 if let LocalTrack::Published {
1114 track_publication, ..
1115 } = &room.microphone_track
1116 && track_publication.sid() == publication.sid()
1117 {
1118 room.microphone_track = LocalTrack::None;
1119 }
1120 if let LocalTrack::Published {
1121 track_publication, ..
1122 } = &room.screen_track
1123 && track_publication.sid() == publication.sid()
1124 {
1125 room.screen_track = LocalTrack::None;
1126 }
1127 }
1128 }
1129
1130 RoomEvent::LocalTrackPublished { publication, .. } => {
1131 log::info!("published track {:?}", publication.sid());
1132 }
1133
1134 RoomEvent::Disconnected { reason } => {
1135 log::info!("disconnected from room: {reason:?}");
1136 self.leave(cx).detach_and_log_err(cx);
1137 }
1138 _ => {}
1139 }
1140
1141 cx.notify();
1142 Ok(())
1143 }
1144
1145 fn check_invariants(&self) {
1146 #[cfg(any(test, feature = "test-support"))]
1147 {
1148 for participant in self.remote_participants.values() {
1149 assert!(self.participant_user_ids.contains(&participant.user.id));
1150 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1151 }
1152
1153 for participant in &self.pending_participants {
1154 assert!(self.participant_user_ids.contains(&participant.id));
1155 assert_ne!(participant.id, self.client.user_id().unwrap());
1156 }
1157
1158 assert_eq!(
1159 self.participant_user_ids.len(),
1160 self.remote_participants.len() + self.pending_participants.len()
1161 );
1162 }
1163 }
1164
1165 pub(crate) fn call(
1166 &mut self,
1167 called_user_id: u64,
1168 initial_project_id: Option<u64>,
1169 cx: &mut Context<Self>,
1170 ) -> Task<Result<()>> {
1171 if self.status.is_offline() {
1172 return Task::ready(Err(anyhow!("room is offline")));
1173 }
1174
1175 cx.notify();
1176 let client = self.client.clone();
1177 let room_id = self.id;
1178 self.pending_call_count += 1;
1179 cx.spawn(async move |this, cx| {
1180 let result = client
1181 .request(proto::Call {
1182 room_id,
1183 called_user_id,
1184 initial_project_id,
1185 })
1186 .await;
1187 this.update(cx, |this, cx| {
1188 this.pending_call_count -= 1;
1189 if this.should_leave() {
1190 this.leave(cx).detach_and_log_err(cx);
1191 }
1192 })?;
1193 result?;
1194 Ok(())
1195 })
1196 }
1197
1198 pub fn join_project(
1199 &mut self,
1200 id: u64,
1201 language_registry: Arc<LanguageRegistry>,
1202 fs: Arc<dyn Fs>,
1203 cx: &mut Context<Self>,
1204 ) -> Task<Result<Entity<Project>>> {
1205 let client = self.client.clone();
1206 let user_store = self.user_store.clone();
1207 cx.emit(Event::RemoteProjectJoined { project_id: id });
1208 cx.spawn(async move |this, cx| {
1209 let project =
1210 Project::in_room(id, client, user_store, language_registry, fs, cx.clone()).await?;
1211
1212 this.update(cx, |this, cx| {
1213 this.joined_projects.retain(|project| {
1214 if let Some(project) = project.upgrade() {
1215 !project.read(cx).is_disconnected(cx)
1216 } else {
1217 false
1218 }
1219 });
1220 this.joined_projects.insert(project.downgrade());
1221 })?;
1222 Ok(project)
1223 })
1224 }
1225
1226 pub fn share_project(
1227 &mut self,
1228 project: Entity<Project>,
1229 cx: &mut Context<Self>,
1230 ) -> Task<Result<u64>> {
1231 if let Some(project_id) = project.read(cx).remote_id() {
1232 return Task::ready(Ok(project_id));
1233 }
1234
1235 let request = self.client.request(proto::ShareProject {
1236 room_id: self.id(),
1237 worktrees: project.read(cx).worktree_metadata_protos(cx),
1238 is_ssh_project: project.read(cx).is_via_remote_server(),
1239 windows_paths: Some(project.read(cx).path_style(cx) == PathStyle::Windows),
1240 });
1241
1242 cx.spawn(async move |this, cx| {
1243 let response = request.await?;
1244
1245 project.update(cx, |project, cx| project.shared(response.project_id, cx))?;
1246
1247 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1248 this.update(cx, |this, cx| {
1249 this.shared_projects.insert(project.downgrade());
1250 let active_project = this.local_participant.active_project.as_ref();
1251 if active_project.is_some_and(|location| *location == project) {
1252 this.set_location(Some(&project), cx)
1253 } else {
1254 Task::ready(Ok(()))
1255 }
1256 })?
1257 .await?;
1258
1259 Ok(response.project_id)
1260 })
1261 }
1262
1263 pub(crate) fn unshare_project(
1264 &mut self,
1265 project: Entity<Project>,
1266 cx: &mut Context<Self>,
1267 ) -> Result<()> {
1268 let project_id = match project.read(cx).remote_id() {
1269 Some(project_id) => project_id,
1270 None => return Ok(()),
1271 };
1272
1273 self.client.send(proto::UnshareProject { project_id })?;
1274 project.update(cx, |this, cx| this.unshare(cx))?;
1275
1276 if self.local_participant.active_project == Some(project.downgrade()) {
1277 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1278 }
1279 Ok(())
1280 }
1281
1282 pub(crate) fn set_location(
1283 &mut self,
1284 project: Option<&Entity<Project>>,
1285 cx: &mut Context<Self>,
1286 ) -> Task<Result<()>> {
1287 if self.status.is_offline() {
1288 return Task::ready(Err(anyhow!("room is offline")));
1289 }
1290
1291 let client = self.client.clone();
1292 let room_id = self.id;
1293 let location = if let Some(project) = project {
1294 self.local_participant.active_project = Some(project.downgrade());
1295 if let Some(project_id) = project.read(cx).remote_id() {
1296 proto::participant_location::Variant::SharedProject(
1297 proto::participant_location::SharedProject { id: project_id },
1298 )
1299 } else {
1300 proto::participant_location::Variant::UnsharedProject(
1301 proto::participant_location::UnsharedProject {},
1302 )
1303 }
1304 } else {
1305 self.local_participant.active_project = None;
1306 proto::participant_location::Variant::External(proto::participant_location::External {})
1307 };
1308
1309 cx.notify();
1310 cx.background_spawn(async move {
1311 client
1312 .request(proto::UpdateParticipantLocation {
1313 room_id,
1314 location: Some(proto::ParticipantLocation {
1315 variant: Some(location),
1316 }),
1317 })
1318 .await?;
1319 Ok(())
1320 })
1321 }
1322
1323 pub fn is_sharing_screen(&self) -> bool {
1324 self.live_kit
1325 .as_ref()
1326 .is_some_and(|live_kit| !matches!(live_kit.screen_track, LocalTrack::None))
1327 }
1328
1329 pub fn shared_screen_id(&self) -> Option<u64> {
1330 self.live_kit.as_ref().and_then(|lk| match lk.screen_track {
1331 LocalTrack::Published { ref _stream, .. } => {
1332 _stream.metadata().ok().map(|meta| meta.id)
1333 }
1334 _ => None,
1335 })
1336 }
1337
1338 pub fn is_sharing_mic(&self) -> bool {
1339 self.live_kit
1340 .as_ref()
1341 .is_some_and(|live_kit| !matches!(live_kit.microphone_track, LocalTrack::None))
1342 }
1343
1344 pub fn is_muted(&self) -> bool {
1345 self.live_kit.as_ref().is_some_and(|live_kit| {
1346 matches!(live_kit.microphone_track, LocalTrack::None)
1347 || live_kit.muted_by_user
1348 || live_kit.deafened
1349 })
1350 }
1351
1352 pub fn muted_by_user(&self) -> bool {
1353 self.live_kit
1354 .as_ref()
1355 .is_some_and(|live_kit| live_kit.muted_by_user)
1356 }
1357
1358 pub fn is_speaking(&self) -> bool {
1359 self.live_kit
1360 .as_ref()
1361 .is_some_and(|live_kit| live_kit.speaking)
1362 }
1363
1364 pub fn is_deafened(&self) -> Option<bool> {
1365 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1366 }
1367
1368 pub fn can_use_microphone(&self) -> bool {
1369 use proto::ChannelRole::*;
1370
1371 match self.local_participant.role {
1372 Admin | Member | Talker => true,
1373 Guest | Banned => false,
1374 }
1375 }
1376
1377 pub fn can_share_projects(&self) -> bool {
1378 use proto::ChannelRole::*;
1379 match self.local_participant.role {
1380 Admin | Member => true,
1381 Guest | Banned | Talker => false,
1382 }
1383 }
1384
1385 #[track_caller]
1386 pub fn share_microphone(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1387 if self.status.is_offline() {
1388 return Task::ready(Err(anyhow!("room is offline")));
1389 }
1390
1391 let (room, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1392 let publish_id = post_inc(&mut live_kit.next_publish_id);
1393 live_kit.microphone_track = LocalTrack::Pending { publish_id };
1394 cx.notify();
1395 (live_kit.room.clone(), publish_id)
1396 } else {
1397 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1398 };
1399
1400 let is_staff = cx.is_staff();
1401 let user_name = self
1402 .user_store
1403 .read(cx)
1404 .current_user()
1405 .and_then(|user| user.name.clone())
1406 .unwrap_or_else(|| "unknown".to_string());
1407
1408 cx.spawn(async move |this, cx| {
1409 let publication = room
1410 .publish_local_microphone_track(user_name, is_staff, cx)
1411 .await;
1412 this.update(cx, |this, cx| {
1413 let live_kit = this
1414 .live_kit
1415 .as_mut()
1416 .context("live-kit was not initialized")?;
1417
1418 let canceled = if let LocalTrack::Pending {
1419 publish_id: cur_publish_id,
1420 } = &live_kit.microphone_track
1421 {
1422 *cur_publish_id != publish_id
1423 } else {
1424 true
1425 };
1426
1427 match publication {
1428 Ok((publication, stream, input_lag_us)) => {
1429 if canceled {
1430 cx.spawn(async move |_, cx| {
1431 room.unpublish_local_track(publication.sid(), cx).await
1432 })
1433 .detach_and_log_err(cx)
1434 } else {
1435 if live_kit.muted_by_user || live_kit.deafened {
1436 publication.mute(cx);
1437 }
1438 live_kit.input_lag_us = Some(input_lag_us);
1439 live_kit.microphone_track = LocalTrack::Published {
1440 track_publication: publication,
1441 _stream: Box::new(stream),
1442 };
1443 cx.notify();
1444 }
1445 Ok(())
1446 }
1447 Err(error) => {
1448 if canceled {
1449 Ok(())
1450 } else {
1451 live_kit.microphone_track = LocalTrack::None;
1452 cx.notify();
1453 Err(error)
1454 }
1455 }
1456 }
1457 })?
1458 })
1459 }
1460
1461 pub fn share_screen(
1462 &mut self,
1463 source: Rc<dyn ScreenCaptureSource>,
1464 cx: &mut Context<Self>,
1465 ) -> Task<Result<()>> {
1466 if self.status.is_offline() {
1467 return Task::ready(Err(anyhow!("room is offline")));
1468 }
1469 if self.is_sharing_screen() {
1470 return Task::ready(Err(anyhow!("screen was already shared")));
1471 }
1472
1473 let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1474 let publish_id = post_inc(&mut live_kit.next_publish_id);
1475 live_kit.screen_track = LocalTrack::Pending { publish_id };
1476 cx.notify();
1477 (live_kit.room.local_participant(), publish_id)
1478 } else {
1479 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1480 };
1481
1482 cx.spawn(async move |this, cx| {
1483 let publication = participant.publish_screenshare_track(&*source, cx).await;
1484
1485 this.update(cx, |this, cx| {
1486 let live_kit = this
1487 .live_kit
1488 .as_mut()
1489 .context("live-kit was not initialized")?;
1490
1491 let canceled = if let LocalTrack::Pending {
1492 publish_id: cur_publish_id,
1493 } = &live_kit.screen_track
1494 {
1495 *cur_publish_id != publish_id
1496 } else {
1497 true
1498 };
1499
1500 match publication {
1501 Ok((publication, stream)) => {
1502 if canceled {
1503 cx.spawn(async move |_, cx| {
1504 participant.unpublish_track(publication.sid(), cx).await
1505 })
1506 .detach()
1507 } else {
1508 live_kit.screen_track = LocalTrack::Published {
1509 track_publication: publication,
1510 _stream: stream,
1511 };
1512 cx.notify();
1513 }
1514
1515 Audio::play_sound(Sound::StartScreenshare, cx);
1516 Ok(())
1517 }
1518 Err(error) => {
1519 if canceled {
1520 Ok(())
1521 } else {
1522 live_kit.screen_track = LocalTrack::None;
1523 cx.notify();
1524 Err(error)
1525 }
1526 }
1527 }
1528 })?
1529 })
1530 }
1531
1532 #[cfg(target_os = "linux")]
1533 pub fn share_screen_wayland(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1534 log::info!("will screenshare on wayland");
1535 if self.status.is_offline() {
1536 return Task::ready(Err(anyhow!("room is offline")));
1537 }
1538 if self.is_sharing_screen() {
1539 return Task::ready(Err(anyhow!("screen was already shared")));
1540 }
1541
1542 let (participant, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1543 let publish_id = post_inc(&mut live_kit.next_publish_id);
1544 live_kit.screen_track = LocalTrack::Pending { publish_id };
1545 cx.notify();
1546 (live_kit.room.local_participant(), publish_id)
1547 } else {
1548 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1549 };
1550
1551 cx.spawn(async move |this, cx| {
1552 let publication = participant.publish_screenshare_track_wayland(cx).await;
1553
1554 this.update(cx, |this, cx| {
1555 let live_kit = this
1556 .live_kit
1557 .as_mut()
1558 .context("live-kit was not initialized")?;
1559
1560 let canceled = if let LocalTrack::Pending {
1561 publish_id: cur_publish_id,
1562 } = &live_kit.screen_track
1563 {
1564 *cur_publish_id != publish_id
1565 } else {
1566 true
1567 };
1568
1569 match publication {
1570 Ok((publication, stream, failure_rx)) => {
1571 if canceled {
1572 cx.spawn(async move |_, cx| {
1573 participant.unpublish_track(publication.sid(), cx).await
1574 })
1575 .detach()
1576 } else {
1577 cx.spawn(async move |this, cx| {
1578 if failure_rx.await.is_ok() {
1579 log::warn!("Wayland capture died, auto-unsharing screen");
1580 let _ =
1581 this.update(cx, |this, cx| this.unshare_screen(false, cx));
1582 }
1583 })
1584 .detach();
1585
1586 live_kit.screen_track = LocalTrack::Published {
1587 track_publication: publication,
1588 _stream: stream,
1589 };
1590 cx.notify();
1591 }
1592
1593 Audio::play_sound(Sound::StartScreenshare, cx);
1594 Ok(())
1595 }
1596 Err(error) => {
1597 if canceled {
1598 Ok(())
1599 } else {
1600 live_kit.screen_track = LocalTrack::None;
1601 cx.notify();
1602 Err(error)
1603 }
1604 }
1605 }
1606 })?
1607 })
1608 }
1609
1610 pub fn toggle_mute(&mut self, cx: &mut Context<Self>) {
1611 if let Some(live_kit) = self.live_kit.as_mut() {
1612 // When unmuting, undeafen if the user was deafened before.
1613 let was_deafened = live_kit.deafened;
1614 if live_kit.muted_by_user
1615 || live_kit.deafened
1616 || matches!(live_kit.microphone_track, LocalTrack::None)
1617 {
1618 live_kit.muted_by_user = false;
1619 live_kit.deafened = false;
1620 } else {
1621 live_kit.muted_by_user = true;
1622 }
1623 let muted = live_kit.muted_by_user;
1624 let should_undeafen = was_deafened && !live_kit.deafened;
1625
1626 if let Some(task) = self.set_mute(muted, cx) {
1627 task.detach_and_log_err(cx);
1628 }
1629
1630 if should_undeafen {
1631 self.set_deafened(false, cx);
1632 }
1633 }
1634 }
1635
1636 pub fn toggle_deafen(&mut self, cx: &mut Context<Self>) {
1637 if let Some(live_kit) = self.live_kit.as_mut() {
1638 // When deafening, mute the microphone if it was not already muted.
1639 // When un-deafening, unmute the microphone, unless it was explicitly muted.
1640 let deafened = !live_kit.deafened;
1641 live_kit.deafened = deafened;
1642 let should_change_mute = !live_kit.muted_by_user;
1643
1644 self.set_deafened(deafened, cx);
1645
1646 if should_change_mute && let Some(task) = self.set_mute(deafened, cx) {
1647 task.detach_and_log_err(cx);
1648 }
1649 }
1650 }
1651
1652 pub fn unshare_screen(&mut self, play_sound: bool, cx: &mut Context<Self>) -> Result<()> {
1653 anyhow::ensure!(!self.status.is_offline(), "room is offline");
1654
1655 let live_kit = self
1656 .live_kit
1657 .as_mut()
1658 .context("live-kit was not initialized")?;
1659 match mem::take(&mut live_kit.screen_track) {
1660 LocalTrack::None => anyhow::bail!("screen was not shared"),
1661 LocalTrack::Pending { .. } => {
1662 cx.notify();
1663 Ok(())
1664 }
1665 LocalTrack::Published {
1666 track_publication, ..
1667 } => {
1668 {
1669 let local_participant = live_kit.room.local_participant();
1670 let sid = track_publication.sid();
1671 cx.spawn(async move |_, cx| local_participant.unpublish_track(sid, cx).await)
1672 .detach_and_log_err(cx);
1673 cx.notify();
1674 }
1675
1676 if play_sound {
1677 Audio::play_sound(Sound::StopScreenshare, cx);
1678 }
1679
1680 Ok(())
1681 }
1682 }
1683 }
1684
1685 fn set_deafened(&mut self, deafened: bool, cx: &mut Context<Self>) -> Option<()> {
1686 {
1687 let live_kit = self.live_kit.as_mut()?;
1688 cx.notify();
1689 for (_, participant) in live_kit.room.remote_participants() {
1690 for (_, publication) in participant.track_publications() {
1691 if publication.is_audio() {
1692 publication.set_enabled(!deafened, cx);
1693 }
1694 }
1695 }
1696 }
1697
1698 None
1699 }
1700
1701 fn set_mute(&mut self, should_mute: bool, cx: &mut Context<Room>) -> Option<Task<Result<()>>> {
1702 let live_kit = self.live_kit.as_mut()?;
1703 cx.notify();
1704
1705 if should_mute {
1706 Audio::play_sound(Sound::Mute, cx);
1707 } else {
1708 Audio::play_sound(Sound::Unmute, cx);
1709 }
1710
1711 match &mut live_kit.microphone_track {
1712 LocalTrack::None => {
1713 if should_mute {
1714 None
1715 } else {
1716 Some(self.share_microphone(cx))
1717 }
1718 }
1719 LocalTrack::Pending { .. } => None,
1720 LocalTrack::Published {
1721 track_publication, ..
1722 } => {
1723 let guard = Tokio::handle(cx);
1724 if should_mute {
1725 track_publication.mute(cx)
1726 } else {
1727 track_publication.unmute(cx)
1728 }
1729 drop(guard);
1730
1731 None
1732 }
1733 }
1734 }
1735}
1736
1737fn spawn_room_connection(
1738 livekit_connection_info: Option<proto::LiveKitConnectionInfo>,
1739 cx: &mut Context<Room>,
1740) {
1741 if let Some(connection_info) = livekit_connection_info {
1742 cx.spawn(async move |this, cx| {
1743 let (room, mut events) =
1744 livekit::Room::connect(connection_info.server_url, connection_info.token, cx)
1745 .await?;
1746
1747 let weak_room = this.clone();
1748 this.update(cx, |this, cx| {
1749 let _handle_updates = cx.spawn(async move |this, cx| {
1750 while let Some(event) = events.next().await {
1751 if this
1752 .update(cx, |this, cx| {
1753 this.livekit_room_updated(event, cx).warn_on_err();
1754 })
1755 .is_err()
1756 {
1757 break;
1758 }
1759 }
1760 });
1761
1762 let muted_by_user = Room::mute_on_join(cx);
1763 this.live_kit = Some(LiveKitRoom {
1764 room: Rc::new(room),
1765 screen_track: LocalTrack::None,
1766 microphone_track: LocalTrack::None,
1767 input_lag_us: None,
1768 next_publish_id: 0,
1769 muted_by_user,
1770 deafened: false,
1771 speaking: false,
1772 _handle_updates,
1773 });
1774 this.diagnostics = Some(cx.new(|cx| CallDiagnostics::new(weak_room, cx)));
1775
1776 // Always open the microphone track on join, even when
1777 // `muted_by_user` is set. Note that the microphone will still
1778 // be muted, as it is still gated in `share_microphone` by
1779 // `muted_by_user`. For users that have `mute_on_join` enabled,
1780 // this moves the Bluetooth profile switch (A2DP -> HFP) (which
1781 // can cause 1-2 seconds of audio silence on some Bluetooth
1782 // headphones) from first unmute to channel join, where
1783 // instability is expected.
1784 if this.can_use_microphone() {
1785 this.share_microphone(cx)
1786 } else {
1787 Task::ready(Ok(()))
1788 }
1789 })?
1790 .await
1791 })
1792 .detach_and_log_err(cx);
1793 }
1794}
1795
1796struct LiveKitRoom {
1797 room: Rc<livekit::Room>,
1798 screen_track: LocalTrack<dyn ScreenCaptureStream>,
1799 microphone_track: LocalTrack<AudioStream>,
1800 /// Shared atomic storing the most recent input lag measurement in microseconds.
1801 /// Written by the audio capture/transmit pipeline, read here for diagnostics.
1802 input_lag_us: Option<Arc<AtomicU64>>,
1803 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1804 muted_by_user: bool,
1805 deafened: bool,
1806 speaking: bool,
1807 next_publish_id: usize,
1808 _handle_updates: Task<()>,
1809}
1810
1811impl LiveKitRoom {
1812 fn stop_publishing(&mut self, cx: &mut Context<Room>) {
1813 let mut tracks_to_unpublish = Vec::new();
1814 if let LocalTrack::Published {
1815 track_publication, ..
1816 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1817 {
1818 tracks_to_unpublish.push(track_publication.sid());
1819 self.input_lag_us = None;
1820 cx.notify();
1821 }
1822
1823 if let LocalTrack::Published {
1824 track_publication, ..
1825 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1826 {
1827 tracks_to_unpublish.push(track_publication.sid());
1828 cx.notify();
1829 }
1830
1831 let participant = self.room.local_participant();
1832 cx.spawn(async move |_, cx| {
1833 for sid in tracks_to_unpublish {
1834 participant.unpublish_track(sid, cx).await.log_err();
1835 }
1836 })
1837 .detach();
1838 }
1839}
1840
1841#[derive(Default)]
1842enum LocalTrack<Stream: ?Sized> {
1843 #[default]
1844 None,
1845 Pending {
1846 publish_id: usize,
1847 },
1848 Published {
1849 track_publication: LocalTrackPublication,
1850 _stream: Box<Stream>,
1851 },
1852}
1853
1854#[derive(Copy, Clone, PartialEq, Eq)]
1855pub enum RoomStatus {
1856 Online,
1857 Rejoining,
1858 Offline,
1859}
1860
1861impl RoomStatus {
1862 pub fn is_offline(&self) -> bool {
1863 matches!(self, RoomStatus::Offline)
1864 }
1865
1866 pub fn is_online(&self) -> bool {
1867 matches!(self, RoomStatus::Online)
1868 }
1869}