1use crate::{
2 call_settings::CallSettings,
3 participant::{LocalParticipant, ParticipantLocation, RemoteParticipant},
4};
5use anyhow::{anyhow, Result};
6use audio::{Audio, Sound};
7use client::{
8 proto::{self, PeerId},
9 Client, ParticipantIndex, TypedEnvelope, User, UserStore,
10};
11use collections::{BTreeMap, HashMap, HashSet};
12use fs::Fs;
13use futures::{FutureExt, StreamExt};
14use gpui::{
15 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
16};
17use language::LanguageRegistry;
18use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate};
19use postage::{sink::Sink, stream::Stream, watch};
20use project::Project;
21use settings::Settings as _;
22use std::{future::Future, mem, sync::Arc, time::Duration};
23use util::{post_inc, ResultExt, TryFutureExt};
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub enum Event {
29 ParticipantLocationChanged {
30 participant_id: proto::PeerId,
31 },
32 RemoteVideoTracksChanged {
33 participant_id: proto::PeerId,
34 },
35 RemoteAudioTracksChanged {
36 participant_id: proto::PeerId,
37 },
38 RemoteProjectShared {
39 owner: Arc<User>,
40 project_id: u64,
41 worktree_root_names: Vec<String>,
42 },
43 RemoteProjectUnshared {
44 project_id: u64,
45 },
46 RemoteProjectJoined {
47 project_id: u64,
48 },
49 RemoteProjectInvitationDiscarded {
50 project_id: u64,
51 },
52 Left,
53}
54
55pub struct Room {
56 id: u64,
57 channel_id: Option<u64>,
58 live_kit: Option<LiveKitRoom>,
59 status: RoomStatus,
60 shared_projects: HashSet<WeakModel<Project>>,
61 joined_projects: HashSet<WeakModel<Project>>,
62 local_participant: LocalParticipant,
63 remote_participants: BTreeMap<u64, RemoteParticipant>,
64 pending_participants: Vec<Arc<User>>,
65 participant_user_ids: HashSet<u64>,
66 pending_call_count: usize,
67 leave_when_empty: bool,
68 client: Arc<Client>,
69 user_store: Model<UserStore>,
70 follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec<PeerId>>,
71 client_subscriptions: Vec<client::Subscription>,
72 _subscriptions: Vec<gpui::Subscription>,
73 room_update_completed_tx: watch::Sender<Option<()>>,
74 room_update_completed_rx: watch::Receiver<Option<()>>,
75 pending_room_update: Option<Task<()>>,
76 maintain_connection: Option<Task<Option<()>>>,
77}
78
79impl EventEmitter<Event> for Room {}
80
81impl Room {
82 pub fn channel_id(&self) -> Option<u64> {
83 self.channel_id
84 }
85
86 pub fn is_sharing_project(&self) -> bool {
87 !self.shared_projects.is_empty()
88 }
89
90 #[cfg(any(test, feature = "test-support"))]
91 pub fn is_connected(&self) -> bool {
92 if let Some(live_kit) = self.live_kit.as_ref() {
93 matches!(
94 *live_kit.room.status().borrow(),
95 live_kit_client::ConnectionState::Connected { .. }
96 )
97 } else {
98 false
99 }
100 }
101
102 fn new(
103 id: u64,
104 channel_id: Option<u64>,
105 live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
106 client: Arc<Client>,
107 user_store: Model<UserStore>,
108 cx: &mut ModelContext<Self>,
109 ) -> Self {
110 let live_kit_room = if let Some(connection_info) = live_kit_connection_info {
111 let room = live_kit_client::Room::new();
112 let mut status = room.status();
113 // Consume the initial status of the room.
114 let _ = status.try_recv();
115 let _maintain_room = cx.spawn(|this, mut cx| async move {
116 while let Some(status) = status.next().await {
117 let this = if let Some(this) = this.upgrade() {
118 this
119 } else {
120 break;
121 };
122
123 if status == live_kit_client::ConnectionState::Disconnected {
124 this.update(&mut cx, |this, cx| this.leave(cx).log_err())
125 .ok();
126 break;
127 }
128 }
129 });
130
131 let _handle_updates = cx.spawn({
132 let room = room.clone();
133 move |this, mut cx| async move {
134 let mut updates = room.updates();
135 while let Some(update) = updates.next().await {
136 let this = if let Some(this) = this.upgrade() {
137 this
138 } else {
139 break;
140 };
141
142 this.update(&mut cx, |this, cx| {
143 this.live_kit_room_updated(update, cx).log_err()
144 })
145 .ok();
146 }
147 }
148 });
149
150 let connect = room.connect(&connection_info.server_url, &connection_info.token);
151 cx.spawn(|this, mut cx| async move {
152 connect.await?;
153
154 let is_read_only = this
155 .update(&mut cx, |room, _| room.read_only())
156 .unwrap_or(true);
157
158 if !cx.update(|cx| Self::mute_on_join(cx))? && !is_read_only {
159 this.update(&mut cx, |this, cx| this.share_microphone(cx))?
160 .await?;
161 }
162
163 anyhow::Ok(())
164 })
165 .detach_and_log_err(cx);
166
167 Some(LiveKitRoom {
168 room,
169 screen_track: LocalTrack::None,
170 microphone_track: LocalTrack::None,
171 next_publish_id: 0,
172 muted_by_user: false,
173 deafened: false,
174 speaking: false,
175 _maintain_room,
176 _handle_updates,
177 })
178 } else {
179 None
180 };
181
182 let maintain_connection = cx.spawn({
183 let client = client.clone();
184 move |this, cx| Self::maintain_connection(this, client.clone(), cx).log_err()
185 });
186
187 Audio::play_sound(Sound::Joined, cx);
188
189 let (room_update_completed_tx, room_update_completed_rx) = watch::channel();
190
191 Self {
192 id,
193 channel_id,
194 live_kit: live_kit_room,
195 status: RoomStatus::Online,
196 shared_projects: Default::default(),
197 joined_projects: Default::default(),
198 participant_user_ids: Default::default(),
199 local_participant: Default::default(),
200 remote_participants: Default::default(),
201 pending_participants: Default::default(),
202 pending_call_count: 0,
203 client_subscriptions: vec![
204 client.add_message_handler(cx.weak_model(), Self::handle_room_updated)
205 ],
206 _subscriptions: vec![
207 cx.on_release(Self::released),
208 cx.on_app_quit(Self::app_will_quit),
209 ],
210 leave_when_empty: false,
211 pending_room_update: None,
212 client,
213 user_store,
214 follows_by_leader_id_project_id: Default::default(),
215 maintain_connection: Some(maintain_connection),
216 room_update_completed_tx,
217 room_update_completed_rx,
218 }
219 }
220
221 pub(crate) fn create(
222 called_user_id: u64,
223 initial_project: Option<Model<Project>>,
224 client: Arc<Client>,
225 user_store: Model<UserStore>,
226 cx: &mut AppContext,
227 ) -> Task<Result<Model<Self>>> {
228 cx.spawn(move |mut cx| async move {
229 let response = client.request(proto::CreateRoom {}).await?;
230 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
231 let room = cx.new_model(|cx| {
232 let mut room = Self::new(
233 room_proto.id,
234 None,
235 response.live_kit_connection_info,
236 client,
237 user_store,
238 cx,
239 );
240 if let Some(participant) = room_proto.participants.first() {
241 room.local_participant.role = participant.role()
242 }
243 room
244 })?;
245
246 let initial_project_id = if let Some(initial_project) = initial_project {
247 let initial_project_id = room
248 .update(&mut cx, |room, cx| {
249 room.share_project(initial_project.clone(), cx)
250 })?
251 .await?;
252 Some(initial_project_id)
253 } else {
254 None
255 };
256
257 match room
258 .update(&mut cx, |room, cx| {
259 room.leave_when_empty = true;
260 room.call(called_user_id, initial_project_id, cx)
261 })?
262 .await
263 {
264 Ok(()) => Ok(room),
265 Err(error) => Err(anyhow!("room creation failed: {:?}", error)),
266 }
267 })
268 }
269
270 pub(crate) async fn join_channel(
271 channel_id: u64,
272 client: Arc<Client>,
273 user_store: Model<UserStore>,
274 cx: AsyncAppContext,
275 ) -> Result<Model<Self>> {
276 Self::from_join_response(
277 client.request(proto::JoinChannel { channel_id }).await?,
278 client,
279 user_store,
280 cx,
281 )
282 }
283
284 pub(crate) async fn join(
285 room_id: u64,
286 client: Arc<Client>,
287 user_store: Model<UserStore>,
288 cx: AsyncAppContext,
289 ) -> Result<Model<Self>> {
290 Self::from_join_response(
291 client.request(proto::JoinRoom { id: room_id }).await?,
292 client,
293 user_store,
294 cx,
295 )
296 }
297
298 fn released(&mut self, cx: &mut AppContext) {
299 if self.status.is_online() {
300 self.leave_internal(cx).detach_and_log_err(cx);
301 }
302 }
303
304 fn app_will_quit(&mut self, cx: &mut ModelContext<Self>) -> impl Future<Output = ()> {
305 let task = if self.status.is_online() {
306 let leave = self.leave_internal(cx);
307 Some(cx.background_executor().spawn(async move {
308 leave.await.log_err();
309 }))
310 } else {
311 None
312 };
313
314 async move {
315 if let Some(task) = task {
316 task.await;
317 }
318 }
319 }
320
321 pub fn mute_on_join(cx: &AppContext) -> bool {
322 CallSettings::get_global(cx).mute_on_join || client::IMPERSONATE_LOGIN.is_some()
323 }
324
325 fn from_join_response(
326 response: proto::JoinRoomResponse,
327 client: Arc<Client>,
328 user_store: Model<UserStore>,
329 mut cx: AsyncAppContext,
330 ) -> Result<Model<Self>> {
331 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
332 let room = cx.new_model(|cx| {
333 Self::new(
334 room_proto.id,
335 response.channel_id,
336 response.live_kit_connection_info,
337 client,
338 user_store,
339 cx,
340 )
341 })?;
342 room.update(&mut cx, |room, cx| {
343 room.leave_when_empty = room.channel_id.is_none();
344 room.apply_room_update(room_proto, cx)?;
345 anyhow::Ok(())
346 })??;
347 Ok(room)
348 }
349
350 fn should_leave(&self) -> bool {
351 self.leave_when_empty
352 && self.pending_room_update.is_none()
353 && self.pending_participants.is_empty()
354 && self.remote_participants.is_empty()
355 && self.pending_call_count == 0
356 }
357
358 pub(crate) fn leave(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
359 cx.notify();
360 cx.emit(Event::Left);
361 self.leave_internal(cx)
362 }
363
364 fn leave_internal(&mut self, cx: &mut AppContext) -> Task<Result<()>> {
365 if self.status.is_offline() {
366 return Task::ready(Err(anyhow!("room is offline")));
367 }
368
369 log::info!("leaving room");
370 Audio::play_sound(Sound::Leave, cx);
371
372 self.clear_state(cx);
373
374 let leave_room = self.client.request(proto::LeaveRoom {});
375 cx.background_executor().spawn(async move {
376 leave_room.await?;
377 anyhow::Ok(())
378 })
379 }
380
381 pub(crate) fn clear_state(&mut self, cx: &mut AppContext) {
382 for project in self.shared_projects.drain() {
383 if let Some(project) = project.upgrade() {
384 project.update(cx, |project, cx| {
385 project.unshare(cx).log_err();
386 });
387 }
388 }
389 for project in self.joined_projects.drain() {
390 if let Some(project) = project.upgrade() {
391 project.update(cx, |project, cx| {
392 project.disconnected_from_host(cx);
393 project.close(cx);
394 });
395 }
396 }
397
398 self.status = RoomStatus::Offline;
399 self.remote_participants.clear();
400 self.pending_participants.clear();
401 self.participant_user_ids.clear();
402 self.client_subscriptions.clear();
403 self.live_kit.take();
404 self.pending_room_update.take();
405 self.maintain_connection.take();
406 }
407
408 async fn maintain_connection(
409 this: WeakModel<Self>,
410 client: Arc<Client>,
411 mut cx: AsyncAppContext,
412 ) -> Result<()> {
413 let mut client_status = client.status();
414 loop {
415 let _ = client_status.try_recv();
416 let is_connected = client_status.borrow().is_connected();
417 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
418 if !is_connected || client_status.next().await.is_some() {
419 log::info!("detected client disconnection");
420
421 this.upgrade()
422 .ok_or_else(|| anyhow!("room was dropped"))?
423 .update(&mut cx, |this, cx| {
424 this.status = RoomStatus::Rejoining;
425 cx.notify();
426 })?;
427
428 // Wait for client to re-establish a connection to the server.
429 {
430 let mut reconnection_timeout =
431 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
432 let client_reconnection = async {
433 let mut remaining_attempts = 3;
434 while remaining_attempts > 0 {
435 if client_status.borrow().is_connected() {
436 log::info!("client reconnected, attempting to rejoin room");
437
438 let Some(this) = this.upgrade() else { break };
439 match this.update(&mut cx, |this, cx| this.rejoin(cx)) {
440 Ok(task) => {
441 if task.await.log_err().is_some() {
442 return true;
443 } else {
444 remaining_attempts -= 1;
445 }
446 }
447 Err(_app_dropped) => return false,
448 }
449 } else if client_status.borrow().is_signed_out() {
450 return false;
451 }
452
453 log::info!(
454 "waiting for client status change, remaining attempts {}",
455 remaining_attempts
456 );
457 client_status.next().await;
458 }
459 false
460 }
461 .fuse();
462 futures::pin_mut!(client_reconnection);
463
464 futures::select_biased! {
465 reconnected = client_reconnection => {
466 if reconnected {
467 log::info!("successfully reconnected to room");
468 // If we successfully joined the room, go back around the loop
469 // waiting for future connection status changes.
470 continue;
471 }
472 }
473 _ = reconnection_timeout => {
474 log::info!("room reconnection timeout expired");
475 }
476 }
477 }
478
479 break;
480 }
481 }
482
483 // The client failed to re-establish a connection to the server
484 // or an error occurred while trying to re-join the room. Either way
485 // we leave the room and return an error.
486 if let Some(this) = this.upgrade() {
487 log::info!("reconnection failed, leaving room");
488 let _ = this.update(&mut cx, |this, cx| this.leave(cx))?;
489 }
490 Err(anyhow!(
491 "can't reconnect to room: client failed to re-establish connection"
492 ))
493 }
494
495 fn rejoin(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
496 let mut projects = HashMap::default();
497 let mut reshared_projects = Vec::new();
498 let mut rejoined_projects = Vec::new();
499 self.shared_projects.retain(|project| {
500 if let Some(handle) = project.upgrade() {
501 let project = handle.read(cx);
502 if let Some(project_id) = project.remote_id() {
503 projects.insert(project_id, handle.clone());
504 reshared_projects.push(proto::UpdateProject {
505 project_id,
506 worktrees: project.worktree_metadata_protos(cx),
507 });
508 return true;
509 }
510 }
511 false
512 });
513 self.joined_projects.retain(|project| {
514 if let Some(handle) = project.upgrade() {
515 let project = handle.read(cx);
516 if let Some(project_id) = project.remote_id() {
517 projects.insert(project_id, handle.clone());
518 rejoined_projects.push(proto::RejoinProject {
519 id: project_id,
520 worktrees: project
521 .worktrees()
522 .map(|worktree| {
523 let worktree = worktree.read(cx);
524 proto::RejoinWorktree {
525 id: worktree.id().to_proto(),
526 scan_id: worktree.completed_scan_id() as u64,
527 }
528 })
529 .collect(),
530 });
531 }
532 return true;
533 }
534 false
535 });
536
537 let response = self.client.request_envelope(proto::RejoinRoom {
538 id: self.id,
539 reshared_projects,
540 rejoined_projects,
541 });
542
543 cx.spawn(|this, mut cx| async move {
544 let response = response.await?;
545 let message_id = response.message_id;
546 let response = response.payload;
547 let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
548 this.update(&mut cx, |this, cx| {
549 this.status = RoomStatus::Online;
550 this.apply_room_update(room_proto, cx)?;
551
552 for reshared_project in response.reshared_projects {
553 if let Some(project) = projects.get(&reshared_project.id) {
554 project.update(cx, |project, cx| {
555 project.reshared(reshared_project, cx).log_err();
556 });
557 }
558 }
559
560 for rejoined_project in response.rejoined_projects {
561 if let Some(project) = projects.get(&rejoined_project.id) {
562 project.update(cx, |project, cx| {
563 project.rejoined(rejoined_project, message_id, cx).log_err();
564 });
565 }
566 }
567
568 anyhow::Ok(())
569 })?
570 })
571 }
572
573 pub fn id(&self) -> u64 {
574 self.id
575 }
576
577 pub fn status(&self) -> RoomStatus {
578 self.status
579 }
580
581 pub fn local_participant(&self) -> &LocalParticipant {
582 &self.local_participant
583 }
584
585 pub fn remote_participants(&self) -> &BTreeMap<u64, RemoteParticipant> {
586 &self.remote_participants
587 }
588
589 pub fn remote_participant_for_peer_id(&self, peer_id: PeerId) -> Option<&RemoteParticipant> {
590 self.remote_participants
591 .values()
592 .find(|p| p.peer_id == peer_id)
593 }
594
595 pub fn role_for_user(&self, user_id: u64) -> Option<proto::ChannelRole> {
596 self.remote_participants
597 .get(&user_id)
598 .map(|participant| participant.role)
599 }
600
601 pub fn local_participant_is_admin(&self) -> bool {
602 self.local_participant.role == proto::ChannelRole::Admin
603 }
604
605 pub fn set_participant_role(
606 &mut self,
607 user_id: u64,
608 role: proto::ChannelRole,
609 cx: &ModelContext<Self>,
610 ) -> Task<Result<()>> {
611 let client = self.client.clone();
612 let room_id = self.id;
613 let role = role.into();
614 cx.spawn(|_, _| async move {
615 client
616 .request(proto::SetRoomParticipantRole {
617 room_id,
618 user_id,
619 role,
620 })
621 .await
622 .map(|_| ())
623 })
624 }
625
626 pub fn pending_participants(&self) -> &[Arc<User>] {
627 &self.pending_participants
628 }
629
630 pub fn contains_participant(&self, user_id: u64) -> bool {
631 self.participant_user_ids.contains(&user_id)
632 }
633
634 pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] {
635 self.follows_by_leader_id_project_id
636 .get(&(leader_id, project_id))
637 .map_or(&[], |v| v.as_slice())
638 }
639
640 /// Returns the most 'active' projects, defined as most people in the project
641 pub fn most_active_project(&self, cx: &AppContext) -> Option<(u64, u64)> {
642 let mut project_hosts_and_guest_counts = HashMap::<u64, (Option<u64>, u32)>::default();
643 for participant in self.remote_participants.values() {
644 match participant.location {
645 ParticipantLocation::SharedProject { project_id } => {
646 project_hosts_and_guest_counts
647 .entry(project_id)
648 .or_default()
649 .1 += 1;
650 }
651 ParticipantLocation::External | ParticipantLocation::UnsharedProject => {}
652 }
653 for project in &participant.projects {
654 project_hosts_and_guest_counts
655 .entry(project.id)
656 .or_default()
657 .0 = Some(participant.user.id);
658 }
659 }
660
661 if let Some(user) = self.user_store.read(cx).current_user() {
662 for project in &self.local_participant.projects {
663 project_hosts_and_guest_counts
664 .entry(project.id)
665 .or_default()
666 .0 = Some(user.id);
667 }
668 }
669
670 project_hosts_and_guest_counts
671 .into_iter()
672 .filter_map(|(id, (host, guest_count))| Some((id, host?, guest_count)))
673 .max_by_key(|(_, _, guest_count)| *guest_count)
674 .map(|(id, host, _)| (id, host))
675 }
676
677 async fn handle_room_updated(
678 this: Model<Self>,
679 envelope: TypedEnvelope<proto::RoomUpdated>,
680 _: Arc<Client>,
681 mut cx: AsyncAppContext,
682 ) -> Result<()> {
683 let room = envelope
684 .payload
685 .room
686 .ok_or_else(|| anyhow!("invalid room"))?;
687 this.update(&mut cx, |this, cx| this.apply_room_update(room, cx))?
688 }
689
690 fn apply_room_update(
691 &mut self,
692 mut room: proto::Room,
693 cx: &mut ModelContext<Self>,
694 ) -> Result<()> {
695 // Filter ourselves out from the room's participants.
696 let local_participant_ix = room
697 .participants
698 .iter()
699 .position(|participant| Some(participant.user_id) == self.client.user_id());
700 let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix));
701
702 let pending_participant_user_ids = room
703 .pending_participants
704 .iter()
705 .map(|p| p.user_id)
706 .collect::<Vec<_>>();
707
708 let remote_participant_user_ids = room
709 .participants
710 .iter()
711 .map(|p| p.user_id)
712 .collect::<Vec<_>>();
713
714 let (remote_participants, pending_participants) =
715 self.user_store.update(cx, move |user_store, cx| {
716 (
717 user_store.get_users(remote_participant_user_ids, cx),
718 user_store.get_users(pending_participant_user_ids, cx),
719 )
720 });
721
722 self.pending_room_update = Some(cx.spawn(|this, mut cx| async move {
723 let (remote_participants, pending_participants) =
724 futures::join!(remote_participants, pending_participants);
725
726 this.update(&mut cx, |this, cx| {
727 this.participant_user_ids.clear();
728
729 if let Some(participant) = local_participant {
730 let role = participant.role();
731 this.local_participant.projects = participant.projects;
732 if this.local_participant.role != role {
733 this.local_participant.role = role;
734
735 if role == proto::ChannelRole::Guest {
736 for project in mem::take(&mut this.shared_projects) {
737 if let Some(project) = project.upgrade() {
738 this.unshare_project(project, cx).log_err();
739 }
740 }
741 this.local_participant.projects.clear();
742 if let Some(live_kit_room) = &mut this.live_kit {
743 live_kit_room.stop_publishing(cx);
744 }
745 }
746
747 this.joined_projects.retain(|project| {
748 if let Some(project) = project.upgrade() {
749 project.update(cx, |project, cx| project.set_role(role, cx));
750 true
751 } else {
752 false
753 }
754 });
755 }
756 } else {
757 this.local_participant.projects.clear();
758 }
759
760 if let Some(participants) = remote_participants.log_err() {
761 for (participant, user) in room.participants.into_iter().zip(participants) {
762 let Some(peer_id) = participant.peer_id else {
763 continue;
764 };
765 let participant_index = ParticipantIndex(participant.participant_index);
766 this.participant_user_ids.insert(participant.user_id);
767
768 let old_projects = this
769 .remote_participants
770 .get(&participant.user_id)
771 .into_iter()
772 .flat_map(|existing| &existing.projects)
773 .map(|project| project.id)
774 .collect::<HashSet<_>>();
775 let new_projects = participant
776 .projects
777 .iter()
778 .map(|project| project.id)
779 .collect::<HashSet<_>>();
780
781 for project in &participant.projects {
782 if !old_projects.contains(&project.id) {
783 cx.emit(Event::RemoteProjectShared {
784 owner: user.clone(),
785 project_id: project.id,
786 worktree_root_names: project.worktree_root_names.clone(),
787 });
788 }
789 }
790
791 for unshared_project_id in old_projects.difference(&new_projects) {
792 this.joined_projects.retain(|project| {
793 if let Some(project) = project.upgrade() {
794 project.update(cx, |project, cx| {
795 if project.remote_id() == Some(*unshared_project_id) {
796 project.disconnected_from_host(cx);
797 false
798 } else {
799 true
800 }
801 })
802 } else {
803 false
804 }
805 });
806 cx.emit(Event::RemoteProjectUnshared {
807 project_id: *unshared_project_id,
808 });
809 }
810
811 let role = participant.role();
812 let location = ParticipantLocation::from_proto(participant.location)
813 .unwrap_or(ParticipantLocation::External);
814 if let Some(remote_participant) =
815 this.remote_participants.get_mut(&participant.user_id)
816 {
817 remote_participant.peer_id = peer_id;
818 remote_participant.projects = participant.projects;
819 remote_participant.participant_index = participant_index;
820 if location != remote_participant.location
821 || role != remote_participant.role
822 {
823 remote_participant.location = location;
824 remote_participant.role = role;
825 cx.emit(Event::ParticipantLocationChanged {
826 participant_id: peer_id,
827 });
828 }
829 } else {
830 this.remote_participants.insert(
831 participant.user_id,
832 RemoteParticipant {
833 user: user.clone(),
834 participant_index,
835 peer_id,
836 projects: participant.projects,
837 location,
838 role,
839 muted: true,
840 speaking: false,
841 video_tracks: Default::default(),
842 audio_tracks: Default::default(),
843 },
844 );
845
846 Audio::play_sound(Sound::Joined, cx);
847
848 if let Some(live_kit) = this.live_kit.as_ref() {
849 let video_tracks =
850 live_kit.room.remote_video_tracks(&user.id.to_string());
851 let audio_tracks =
852 live_kit.room.remote_audio_tracks(&user.id.to_string());
853 let publications = live_kit
854 .room
855 .remote_audio_track_publications(&user.id.to_string());
856
857 for track in video_tracks {
858 this.live_kit_room_updated(
859 RoomUpdate::SubscribedToRemoteVideoTrack(track),
860 cx,
861 )
862 .log_err();
863 }
864
865 for (track, publication) in
866 audio_tracks.iter().zip(publications.iter())
867 {
868 this.live_kit_room_updated(
869 RoomUpdate::SubscribedToRemoteAudioTrack(
870 track.clone(),
871 publication.clone(),
872 ),
873 cx,
874 )
875 .log_err();
876 }
877 }
878 }
879 }
880
881 this.remote_participants.retain(|user_id, participant| {
882 if this.participant_user_ids.contains(user_id) {
883 true
884 } else {
885 for project in &participant.projects {
886 cx.emit(Event::RemoteProjectUnshared {
887 project_id: project.id,
888 });
889 }
890 false
891 }
892 });
893 }
894
895 if let Some(pending_participants) = pending_participants.log_err() {
896 this.pending_participants = pending_participants;
897 for participant in &this.pending_participants {
898 this.participant_user_ids.insert(participant.id);
899 }
900 }
901
902 this.follows_by_leader_id_project_id.clear();
903 for follower in room.followers {
904 let project_id = follower.project_id;
905 let (leader, follower) = match (follower.leader_id, follower.follower_id) {
906 (Some(leader), Some(follower)) => (leader, follower),
907
908 _ => {
909 log::error!("Follower message {follower:?} missing some state");
910 continue;
911 }
912 };
913
914 let list = this
915 .follows_by_leader_id_project_id
916 .entry((leader, project_id))
917 .or_insert(Vec::new());
918 if !list.contains(&follower) {
919 list.push(follower);
920 }
921 }
922
923 this.pending_room_update.take();
924 if this.should_leave() {
925 log::info!("room is empty, leaving");
926 let _ = this.leave(cx);
927 }
928
929 this.user_store.update(cx, |user_store, cx| {
930 let participant_indices_by_user_id = this
931 .remote_participants
932 .iter()
933 .map(|(user_id, participant)| (*user_id, participant.participant_index))
934 .collect();
935 user_store.set_participant_indices(participant_indices_by_user_id, cx);
936 });
937
938 this.check_invariants();
939 this.room_update_completed_tx.try_send(Some(())).ok();
940 cx.notify();
941 })
942 .ok();
943 }));
944
945 cx.notify();
946 Ok(())
947 }
948
949 pub fn room_update_completed(&mut self) -> impl Future<Output = ()> {
950 let mut done_rx = self.room_update_completed_rx.clone();
951 async move {
952 while let Some(result) = done_rx.next().await {
953 if result.is_some() {
954 break;
955 }
956 }
957 }
958 }
959
960 fn live_kit_room_updated(
961 &mut self,
962 update: RoomUpdate,
963 cx: &mut ModelContext<Self>,
964 ) -> Result<()> {
965 match update {
966 RoomUpdate::SubscribedToRemoteVideoTrack(track) => {
967 let user_id = track.publisher_id().parse()?;
968 let track_id = track.sid().to_string();
969 let participant = self
970 .remote_participants
971 .get_mut(&user_id)
972 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
973 participant.video_tracks.insert(track_id.clone(), track);
974 cx.emit(Event::RemoteVideoTracksChanged {
975 participant_id: participant.peer_id,
976 });
977 }
978
979 RoomUpdate::UnsubscribedFromRemoteVideoTrack {
980 publisher_id,
981 track_id,
982 } => {
983 let user_id = publisher_id.parse()?;
984 let participant = self
985 .remote_participants
986 .get_mut(&user_id)
987 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
988 participant.video_tracks.remove(&track_id);
989 cx.emit(Event::RemoteVideoTracksChanged {
990 participant_id: participant.peer_id,
991 });
992 }
993
994 RoomUpdate::ActiveSpeakersChanged { speakers } => {
995 let mut speaker_ids = speakers
996 .into_iter()
997 .filter_map(|speaker_sid| speaker_sid.parse().ok())
998 .collect::<Vec<u64>>();
999 speaker_ids.sort_unstable();
1000 for (sid, participant) in &mut self.remote_participants {
1001 if let Ok(_) = speaker_ids.binary_search(sid) {
1002 participant.speaking = true;
1003 } else {
1004 participant.speaking = false;
1005 }
1006 }
1007 if let Some(id) = self.client.user_id() {
1008 if let Some(room) = &mut self.live_kit {
1009 if let Ok(_) = speaker_ids.binary_search(&id) {
1010 room.speaking = true;
1011 } else {
1012 room.speaking = false;
1013 }
1014 }
1015 }
1016 }
1017
1018 RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => {
1019 let mut found = false;
1020 for participant in &mut self.remote_participants.values_mut() {
1021 for track in participant.audio_tracks.values() {
1022 if track.sid() == track_id {
1023 found = true;
1024 break;
1025 }
1026 }
1027 if found {
1028 participant.muted = muted;
1029 break;
1030 }
1031 }
1032 }
1033
1034 RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => {
1035 let user_id = track.publisher_id().parse()?;
1036 let track_id = track.sid().to_string();
1037 let participant = self
1038 .remote_participants
1039 .get_mut(&user_id)
1040 .ok_or_else(|| anyhow!("subscribed to track by unknown participant"))?;
1041 participant.audio_tracks.insert(track_id.clone(), track);
1042 participant.muted = publication.is_muted();
1043
1044 cx.emit(Event::RemoteAudioTracksChanged {
1045 participant_id: participant.peer_id,
1046 });
1047 }
1048
1049 RoomUpdate::UnsubscribedFromRemoteAudioTrack {
1050 publisher_id,
1051 track_id,
1052 } => {
1053 let user_id = publisher_id.parse()?;
1054 let participant = self
1055 .remote_participants
1056 .get_mut(&user_id)
1057 .ok_or_else(|| anyhow!("unsubscribed from track by unknown participant"))?;
1058 participant.audio_tracks.remove(&track_id);
1059 cx.emit(Event::RemoteAudioTracksChanged {
1060 participant_id: participant.peer_id,
1061 });
1062 }
1063
1064 RoomUpdate::LocalAudioTrackUnpublished { publication } => {
1065 log::info!("unpublished audio track {}", publication.sid());
1066 if let Some(room) = &mut self.live_kit {
1067 room.microphone_track = LocalTrack::None;
1068 }
1069 }
1070
1071 RoomUpdate::LocalVideoTrackUnpublished { publication } => {
1072 log::info!("unpublished video track {}", publication.sid());
1073 if let Some(room) = &mut self.live_kit {
1074 room.screen_track = LocalTrack::None;
1075 }
1076 }
1077
1078 RoomUpdate::LocalAudioTrackPublished { publication } => {
1079 log::info!("published audio track {}", publication.sid());
1080 }
1081
1082 RoomUpdate::LocalVideoTrackPublished { publication } => {
1083 log::info!("published video track {}", publication.sid());
1084 }
1085 }
1086
1087 cx.notify();
1088 Ok(())
1089 }
1090
1091 fn check_invariants(&self) {
1092 #[cfg(any(test, feature = "test-support"))]
1093 {
1094 for participant in self.remote_participants.values() {
1095 assert!(self.participant_user_ids.contains(&participant.user.id));
1096 assert_ne!(participant.user.id, self.client.user_id().unwrap());
1097 }
1098
1099 for participant in &self.pending_participants {
1100 assert!(self.participant_user_ids.contains(&participant.id));
1101 assert_ne!(participant.id, self.client.user_id().unwrap());
1102 }
1103
1104 assert_eq!(
1105 self.participant_user_ids.len(),
1106 self.remote_participants.len() + self.pending_participants.len()
1107 );
1108 }
1109 }
1110
1111 pub(crate) fn call(
1112 &mut self,
1113 called_user_id: u64,
1114 initial_project_id: Option<u64>,
1115 cx: &mut ModelContext<Self>,
1116 ) -> Task<Result<()>> {
1117 if self.status.is_offline() {
1118 return Task::ready(Err(anyhow!("room is offline")));
1119 }
1120
1121 cx.notify();
1122 let client = self.client.clone();
1123 let room_id = self.id;
1124 self.pending_call_count += 1;
1125 cx.spawn(move |this, mut cx| async move {
1126 let result = client
1127 .request(proto::Call {
1128 room_id,
1129 called_user_id,
1130 initial_project_id,
1131 })
1132 .await;
1133 this.update(&mut cx, |this, cx| {
1134 this.pending_call_count -= 1;
1135 if this.should_leave() {
1136 this.leave(cx).detach_and_log_err(cx);
1137 }
1138 })?;
1139 result?;
1140 Ok(())
1141 })
1142 }
1143
1144 pub fn join_project(
1145 &mut self,
1146 id: u64,
1147 language_registry: Arc<LanguageRegistry>,
1148 fs: Arc<dyn Fs>,
1149 cx: &mut ModelContext<Self>,
1150 ) -> Task<Result<Model<Project>>> {
1151 let client = self.client.clone();
1152 let user_store = self.user_store.clone();
1153 let role = self.local_participant.role;
1154 cx.emit(Event::RemoteProjectJoined { project_id: id });
1155 cx.spawn(move |this, mut cx| async move {
1156 let project = Project::remote(
1157 id,
1158 client,
1159 user_store,
1160 language_registry,
1161 fs,
1162 role,
1163 cx.clone(),
1164 )
1165 .await?;
1166
1167 this.update(&mut cx, |this, cx| {
1168 this.joined_projects.retain(|project| {
1169 if let Some(project) = project.upgrade() {
1170 !project.read(cx).is_disconnected()
1171 } else {
1172 false
1173 }
1174 });
1175 this.joined_projects.insert(project.downgrade());
1176 })?;
1177 Ok(project)
1178 })
1179 }
1180
1181 pub(crate) fn share_project(
1182 &mut self,
1183 project: Model<Project>,
1184 cx: &mut ModelContext<Self>,
1185 ) -> Task<Result<u64>> {
1186 if let Some(project_id) = project.read(cx).remote_id() {
1187 return Task::ready(Ok(project_id));
1188 }
1189
1190 let request = self.client.request(proto::ShareProject {
1191 room_id: self.id(),
1192 worktrees: project.read(cx).worktree_metadata_protos(cx),
1193 });
1194 cx.spawn(|this, mut cx| async move {
1195 let response = request.await?;
1196
1197 project.update(&mut cx, |project, cx| {
1198 project.shared(response.project_id, cx)
1199 })??;
1200
1201 // If the user's location is in this project, it changes from UnsharedProject to SharedProject.
1202 this.update(&mut cx, |this, cx| {
1203 this.shared_projects.insert(project.downgrade());
1204 let active_project = this.local_participant.active_project.as_ref();
1205 if active_project.map_or(false, |location| *location == project) {
1206 this.set_location(Some(&project), cx)
1207 } else {
1208 Task::ready(Ok(()))
1209 }
1210 })?
1211 .await?;
1212
1213 Ok(response.project_id)
1214 })
1215 }
1216
1217 pub(crate) fn unshare_project(
1218 &mut self,
1219 project: Model<Project>,
1220 cx: &mut ModelContext<Self>,
1221 ) -> Result<()> {
1222 let project_id = match project.read(cx).remote_id() {
1223 Some(project_id) => project_id,
1224 None => return Ok(()),
1225 };
1226
1227 self.client.send(proto::UnshareProject { project_id })?;
1228 project.update(cx, |this, cx| this.unshare(cx))?;
1229
1230 if self.local_participant.active_project == Some(project.downgrade()) {
1231 self.set_location(Some(&project), cx).detach_and_log_err(cx);
1232 }
1233 Ok(())
1234 }
1235
1236 pub(crate) fn set_location(
1237 &mut self,
1238 project: Option<&Model<Project>>,
1239 cx: &mut ModelContext<Self>,
1240 ) -> Task<Result<()>> {
1241 if self.status.is_offline() {
1242 return Task::ready(Err(anyhow!("room is offline")));
1243 }
1244
1245 let client = self.client.clone();
1246 let room_id = self.id;
1247 let location = if let Some(project) = project {
1248 self.local_participant.active_project = Some(project.downgrade());
1249 if let Some(project_id) = project.read(cx).remote_id() {
1250 proto::participant_location::Variant::SharedProject(
1251 proto::participant_location::SharedProject { id: project_id },
1252 )
1253 } else {
1254 proto::participant_location::Variant::UnsharedProject(
1255 proto::participant_location::UnsharedProject {},
1256 )
1257 }
1258 } else {
1259 self.local_participant.active_project = None;
1260 proto::participant_location::Variant::External(proto::participant_location::External {})
1261 };
1262
1263 cx.notify();
1264 cx.background_executor().spawn(async move {
1265 client
1266 .request(proto::UpdateParticipantLocation {
1267 room_id,
1268 location: Some(proto::ParticipantLocation {
1269 variant: Some(location),
1270 }),
1271 })
1272 .await?;
1273 Ok(())
1274 })
1275 }
1276
1277 pub fn is_screen_sharing(&self) -> bool {
1278 self.live_kit.as_ref().map_or(false, |live_kit| {
1279 !matches!(live_kit.screen_track, LocalTrack::None)
1280 })
1281 }
1282
1283 pub fn is_sharing_mic(&self) -> bool {
1284 self.live_kit.as_ref().map_or(false, |live_kit| {
1285 !matches!(live_kit.microphone_track, LocalTrack::None)
1286 })
1287 }
1288
1289 pub fn is_muted(&self, cx: &AppContext) -> bool {
1290 self.live_kit
1291 .as_ref()
1292 .and_then(|live_kit| match &live_kit.microphone_track {
1293 LocalTrack::None => Some(Self::mute_on_join(cx)),
1294 LocalTrack::Pending { muted, .. } => Some(*muted),
1295 LocalTrack::Published { muted, .. } => Some(*muted),
1296 })
1297 .unwrap_or(false)
1298 }
1299
1300 pub fn read_only(&self) -> bool {
1301 !(self.local_participant().role == proto::ChannelRole::Member
1302 || self.local_participant().role == proto::ChannelRole::Admin)
1303 }
1304
1305 pub fn is_speaking(&self) -> bool {
1306 self.live_kit
1307 .as_ref()
1308 .map_or(false, |live_kit| live_kit.speaking)
1309 }
1310
1311 pub fn is_deafened(&self) -> Option<bool> {
1312 self.live_kit.as_ref().map(|live_kit| live_kit.deafened)
1313 }
1314
1315 #[track_caller]
1316 pub fn share_microphone(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1317 if self.status.is_offline() {
1318 return Task::ready(Err(anyhow!("room is offline")));
1319 } else if self.is_sharing_mic() {
1320 return Task::ready(Err(anyhow!("microphone was already shared")));
1321 }
1322
1323 let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
1324 let publish_id = post_inc(&mut live_kit.next_publish_id);
1325 live_kit.microphone_track = LocalTrack::Pending {
1326 publish_id,
1327 muted: false,
1328 };
1329 cx.notify();
1330 publish_id
1331 } else {
1332 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1333 };
1334
1335 cx.spawn(move |this, mut cx| async move {
1336 let publish_track = async {
1337 let track = LocalAudioTrack::create();
1338 this.upgrade()
1339 .ok_or_else(|| anyhow!("room was dropped"))?
1340 .update(&mut cx, |this, _| {
1341 this.live_kit
1342 .as_ref()
1343 .map(|live_kit| live_kit.room.publish_audio_track(track))
1344 })?
1345 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1346 .await
1347 };
1348 let publication = publish_track.await;
1349 this.upgrade()
1350 .ok_or_else(|| anyhow!("room was dropped"))?
1351 .update(&mut cx, |this, cx| {
1352 let live_kit = this
1353 .live_kit
1354 .as_mut()
1355 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1356
1357 let (canceled, muted) = if let LocalTrack::Pending {
1358 publish_id: cur_publish_id,
1359 muted,
1360 } = &live_kit.microphone_track
1361 {
1362 (*cur_publish_id != publish_id, *muted)
1363 } else {
1364 (true, false)
1365 };
1366
1367 match publication {
1368 Ok(publication) => {
1369 if canceled {
1370 live_kit.room.unpublish_track(publication);
1371 } else {
1372 if muted {
1373 cx.background_executor()
1374 .spawn(publication.set_mute(muted))
1375 .detach();
1376 }
1377 live_kit.microphone_track = LocalTrack::Published {
1378 track_publication: publication,
1379 muted,
1380 };
1381 cx.notify();
1382 }
1383 Ok(())
1384 }
1385 Err(error) => {
1386 if canceled {
1387 Ok(())
1388 } else {
1389 live_kit.microphone_track = LocalTrack::None;
1390 cx.notify();
1391 Err(error)
1392 }
1393 }
1394 }
1395 })?
1396 })
1397 }
1398
1399 pub fn share_screen(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1400 if self.status.is_offline() {
1401 return Task::ready(Err(anyhow!("room is offline")));
1402 } else if self.is_screen_sharing() {
1403 return Task::ready(Err(anyhow!("screen was already shared")));
1404 }
1405
1406 let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
1407 let publish_id = post_inc(&mut live_kit.next_publish_id);
1408 live_kit.screen_track = LocalTrack::Pending {
1409 publish_id,
1410 muted: false,
1411 };
1412 cx.notify();
1413 (live_kit.room.display_sources(), publish_id)
1414 } else {
1415 return Task::ready(Err(anyhow!("live-kit was not initialized")));
1416 };
1417
1418 cx.spawn(move |this, mut cx| async move {
1419 let publish_track = async {
1420 let displays = displays.await?;
1421 let display = displays
1422 .first()
1423 .ok_or_else(|| anyhow!("no display found"))?;
1424 let track = LocalVideoTrack::screen_share_for_display(&display);
1425 this.upgrade()
1426 .ok_or_else(|| anyhow!("room was dropped"))?
1427 .update(&mut cx, |this, _| {
1428 this.live_kit
1429 .as_ref()
1430 .map(|live_kit| live_kit.room.publish_video_track(track))
1431 })?
1432 .ok_or_else(|| anyhow!("live-kit was not initialized"))?
1433 .await
1434 };
1435
1436 let publication = publish_track.await;
1437 this.upgrade()
1438 .ok_or_else(|| anyhow!("room was dropped"))?
1439 .update(&mut cx, |this, cx| {
1440 let live_kit = this
1441 .live_kit
1442 .as_mut()
1443 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1444
1445 let (canceled, muted) = if let LocalTrack::Pending {
1446 publish_id: cur_publish_id,
1447 muted,
1448 } = &live_kit.screen_track
1449 {
1450 (*cur_publish_id != publish_id, *muted)
1451 } else {
1452 (true, false)
1453 };
1454
1455 match publication {
1456 Ok(publication) => {
1457 if canceled {
1458 live_kit.room.unpublish_track(publication);
1459 } else {
1460 if muted {
1461 cx.background_executor()
1462 .spawn(publication.set_mute(muted))
1463 .detach();
1464 }
1465 live_kit.screen_track = LocalTrack::Published {
1466 track_publication: publication,
1467 muted,
1468 };
1469 cx.notify();
1470 }
1471
1472 Audio::play_sound(Sound::StartScreenshare, cx);
1473
1474 Ok(())
1475 }
1476 Err(error) => {
1477 if canceled {
1478 Ok(())
1479 } else {
1480 live_kit.screen_track = LocalTrack::None;
1481 cx.notify();
1482 Err(error)
1483 }
1484 }
1485 }
1486 })?
1487 })
1488 }
1489
1490 pub fn toggle_mute(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1491 let should_mute = !self.is_muted(cx);
1492 if let Some(live_kit) = self.live_kit.as_mut() {
1493 if matches!(live_kit.microphone_track, LocalTrack::None) {
1494 return Ok(self.share_microphone(cx));
1495 }
1496
1497 let (ret_task, old_muted) = live_kit.set_mute(should_mute, cx)?;
1498 live_kit.muted_by_user = should_mute;
1499
1500 if old_muted == true && live_kit.deafened == true {
1501 if let Some(task) = self.toggle_deafen(cx).ok() {
1502 task.detach();
1503 }
1504 }
1505
1506 Ok(ret_task)
1507 } else {
1508 Err(anyhow!("LiveKit not started"))
1509 }
1510 }
1511
1512 pub fn toggle_deafen(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
1513 if let Some(live_kit) = self.live_kit.as_mut() {
1514 (*live_kit).deafened = !live_kit.deafened;
1515
1516 let mut tasks = Vec::with_capacity(self.remote_participants.len());
1517 // Context notification is sent within set_mute itself.
1518 let mut mute_task = None;
1519 // When deafening, mute user's mic as well.
1520 // When undeafening, unmute user's mic unless it was manually muted prior to deafening.
1521 if live_kit.deafened || !live_kit.muted_by_user {
1522 mute_task = Some(live_kit.set_mute(live_kit.deafened, cx)?.0);
1523 };
1524 for participant in self.remote_participants.values() {
1525 for track in live_kit
1526 .room
1527 .remote_audio_track_publications(&participant.user.id.to_string())
1528 {
1529 let deafened = live_kit.deafened;
1530 tasks.push(cx.foreground_executor().spawn(track.set_enabled(!deafened)));
1531 }
1532 }
1533
1534 Ok(cx.foreground_executor().spawn(async move {
1535 if let Some(mute_task) = mute_task {
1536 mute_task.await?;
1537 }
1538 for task in tasks {
1539 task.await?;
1540 }
1541 Ok(())
1542 }))
1543 } else {
1544 Err(anyhow!("LiveKit not started"))
1545 }
1546 }
1547
1548 pub fn unshare_screen(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
1549 if self.status.is_offline() {
1550 return Err(anyhow!("room is offline"));
1551 }
1552
1553 let live_kit = self
1554 .live_kit
1555 .as_mut()
1556 .ok_or_else(|| anyhow!("live-kit was not initialized"))?;
1557 match mem::take(&mut live_kit.screen_track) {
1558 LocalTrack::None => Err(anyhow!("screen was not shared")),
1559 LocalTrack::Pending { .. } => {
1560 cx.notify();
1561 Ok(())
1562 }
1563 LocalTrack::Published {
1564 track_publication, ..
1565 } => {
1566 live_kit.room.unpublish_track(track_publication);
1567 cx.notify();
1568
1569 Audio::play_sound(Sound::StopScreenshare, cx);
1570 Ok(())
1571 }
1572 }
1573 }
1574
1575 #[cfg(any(test, feature = "test-support"))]
1576 pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
1577 self.live_kit
1578 .as_ref()
1579 .unwrap()
1580 .room
1581 .set_display_sources(sources);
1582 }
1583}
1584
1585struct LiveKitRoom {
1586 room: Arc<live_kit_client::Room>,
1587 screen_track: LocalTrack,
1588 microphone_track: LocalTrack,
1589 /// Tracks whether we're currently in a muted state due to auto-mute from deafening or manual mute performed by user.
1590 muted_by_user: bool,
1591 deafened: bool,
1592 speaking: bool,
1593 next_publish_id: usize,
1594 _maintain_room: Task<()>,
1595 _handle_updates: Task<()>,
1596}
1597
1598impl LiveKitRoom {
1599 fn set_mute(
1600 self: &mut LiveKitRoom,
1601 should_mute: bool,
1602 cx: &mut ModelContext<Room>,
1603 ) -> Result<(Task<Result<()>>, bool)> {
1604 if !should_mute {
1605 // clear user muting state.
1606 self.muted_by_user = false;
1607 }
1608
1609 let (result, old_muted) = match &mut self.microphone_track {
1610 LocalTrack::None => Err(anyhow!("microphone was not shared")),
1611 LocalTrack::Pending { muted, .. } => {
1612 let old_muted = *muted;
1613 *muted = should_mute;
1614 cx.notify();
1615 Ok((Task::Ready(Some(Ok(()))), old_muted))
1616 }
1617 LocalTrack::Published {
1618 track_publication,
1619 muted,
1620 } => {
1621 let old_muted = *muted;
1622 *muted = should_mute;
1623 cx.notify();
1624 Ok((
1625 cx.background_executor()
1626 .spawn(track_publication.set_mute(*muted)),
1627 old_muted,
1628 ))
1629 }
1630 }?;
1631
1632 if old_muted != should_mute {
1633 if should_mute {
1634 Audio::play_sound(Sound::Mute, cx);
1635 } else {
1636 Audio::play_sound(Sound::Unmute, cx);
1637 }
1638 }
1639
1640 Ok((result, old_muted))
1641 }
1642
1643 fn stop_publishing(&mut self, cx: &mut ModelContext<Room>) {
1644 if let LocalTrack::Published {
1645 track_publication, ..
1646 } = mem::replace(&mut self.microphone_track, LocalTrack::None)
1647 {
1648 self.room.unpublish_track(track_publication);
1649 cx.notify();
1650 }
1651
1652 if let LocalTrack::Published {
1653 track_publication, ..
1654 } = mem::replace(&mut self.screen_track, LocalTrack::None)
1655 {
1656 self.room.unpublish_track(track_publication);
1657 cx.notify();
1658 }
1659 }
1660}
1661
1662enum LocalTrack {
1663 None,
1664 Pending {
1665 publish_id: usize,
1666 muted: bool,
1667 },
1668 Published {
1669 track_publication: LocalTrackPublication,
1670 muted: bool,
1671 },
1672}
1673
1674impl Default for LocalTrack {
1675 fn default() -> Self {
1676 Self::None
1677 }
1678}
1679
1680#[derive(Copy, Clone, PartialEq, Eq)]
1681pub enum RoomStatus {
1682 Online,
1683 Rejoining,
1684 Offline,
1685}
1686
1687impl RoomStatus {
1688 pub fn is_offline(&self) -> bool {
1689 matches!(self, RoomStatus::Offline)
1690 }
1691
1692 pub fn is_online(&self) -> bool {
1693 matches!(self, RoomStatus::Online)
1694 }
1695}